package nstream.adapter.jms;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import nstream.adapter.common.ext.JmsIngressSettings;
import nstream.adapter.common.ingress.IngestorMetricsAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/jms/JmsIngestingAgent.class */
public abstract class JmsIngestingAgent extends IngestorMetricsAgent<JmsIngressSettings, Message> {
    protected volatile Connection connection;
    protected volatile Session session;
    protected MessageConsumer consumer;

    protected void assignConnection(Connection connection) {
        this.connection = connection;
    }

    protected void assignDestination(String str) {
        if (this.connection == null) {
            throw new RuntimeException(nodeUri() + ": can't assign destination without assigned connection");
        }
        if (this.session != null) {
            throw new RuntimeException(nodeUri() + ": destination already assigned");
        }
        try {
            this.connection.start();
            this.session = this.connection.createSession(false, 1);
            this.consumer = this.session.createConsumer(JmsAdapterUtils.assembleDestination(str, this.session));
            this.consumer.setMessageListener(obj -> {
                this.ingestOrCancel(obj);
            });
        } catch (Exception e) {
            throw new RuntimeException(nodeUri() + ": failed to assign destination", e);
        }
    }

    protected void cancel() {
        execute(this::closeSession);
    }

    protected void closeSession() {
        if (this.session == null) {
            return;
        }
        try {
            this.session.close();
            this.consumer = null;
            this.session = null;
        } catch (Exception e) {
            throw new RuntimeException(nodeUri() + ": exception observed in closing session", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void didFailIngest(Message message, Exception exc) {
        try {
            didFail(new RuntimeException(nodeUri() + ": message with ID=" + message.getJMSMessageID() + " triggered fatal exception; stopping ", exc));
        } catch (JMSException e) {
            didFail(new RuntimeException(nodeUri() + ": message triggered fatal exception; unable to get message ID; stopping ", exc));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public JmsIngressSettings m3parseIngressSettings(Value value) {
        JmsIngressSettings jmsIngressSettings = (JmsIngressSettings) JmsIngressSettings.form().cast(value);
        return jmsIngressSettings == null ? JmsIngressSettings.defaultSettings() : jmsIngressSettings;
    }

    protected void stageReception() {
        loadSettings("jmsIngressConf");
        assignConnection((Connection) ProvisionLoader.getProvision(this.ingressSettings.connectionProvisionName()).value());
        assignDestination(this.ingressSettings.destination());
        info(nodeUri() + ": successfully staged consumer for reception");
    }
}
