package nstream.adapter.nats;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;
import nstream.adapter.common.ext.NatsIngressSettings;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.common.schedule.DeferrableException;
import swim.concurrent.TaskRef;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/nats/NatsIngestingAgent.class */
public abstract class NatsIngestingAgent extends IngestorMetricsAgent<NatsIngressSettings, Message> {
    protected volatile TaskRef longPollTask;
    protected volatile Connection natsConnection;
    protected volatile Subscription sub;
    protected volatile String natsSubject;

    protected TaskRef longPollTask() {
        return this.longPollTask;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public NatsIngressSettings m2parseIngressSettings(Value value) {
        NatsIngressSettings natsIngressSettings = (NatsIngressSettings) NatsIngressSettings.form().cast(value);
        return natsIngressSettings == null ? NatsIngressSettings.defaultSettings() : natsIngressSettings;
    }

    protected void stageReception() {
        try {
            loadSettings(NatsAdapterUtils.ingressSettingsFromProp(getProp("natsIngressConf")));
            this.natsConnection = (Connection) ProvisionLoader.getProvision(this.ingressSettings.getConnectionProvisionName()).value();
            this.natsSubject = this.ingressSettings.getNatsIngressSubjectName();
            this.sub = this.natsConnection.subscribe(this.natsSubject);
            this.longPollTask = prepareLoop(this::longPollTask, this::longPollLoop);
            this.longPollTask.cue();
        } catch (Exception e) {
            throw new RuntimeException(nodeUri() + ": Failed to stage reception", e);
        }
    }

    protected void longPollLoop() throws DeferrableException {
        try {
            Message nextMessage = this.sub.nextMessage(1000L);
            if (nextMessage != null) {
                ingestOrContinue(nextMessage);
            }
        } catch (InterruptedException e) {
            throw new DeferrableException(nodeUri() + ": interrupted", e);
        }
    }
}
