package nstream.adapter.jms;

import java.util.function.Consumer;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import nstream.adapter.common.NstreamAgent;
import swim.concurrent.AbstractTask;

/* loaded from: input_file:nstream/adapter/jms/JMSIngestTask.class */
public class JMSIngestTask extends AbstractTask implements JMSSource {
    private final NstreamAgent agentContext;
    private final ShareableRetryingConnection connectionContainer;
    private final DestinationProvider destinationProvider;
    private final Consumer<Message> listener;
    private Connection connection;
    private volatile boolean isRunning = false;

    public JMSIngestTask(NstreamAgent nstreamAgent, ShareableRetryingConnection shareableRetryingConnection, DestinationProvider destinationProvider, Consumer<Message> consumer) {
        this.agentContext = nstreamAgent;
        this.connectionContainer = shareableRetryingConnection;
        this.destinationProvider = destinationProvider;
        this.listener = consumer;
        this.agentContext.asyncStage().task(this);
    }

    @Override // nstream.adapter.jms.JMSSource
    public void start() {
        this.isRunning = true;
        cue();
    }

    @Override // nstream.adapter.jms.JMSSource
    public void stop() {
        this.isRunning = false;
    }

    public void runTask() {
        if (this.connection == null) {
            obtainConnectionIfRunning();
        }
        while (this.connection != null && this.isRunning) {
            try {
                receiveLoop(this.connection);
            } catch (JMSException e) {
                this.agentContext.debug("JMS Connection exception while consuming");
                obtainConnectionIfRunning();
            }
        }
    }

    private void obtainConnectionIfRunning() {
        if (this.isRunning) {
            this.agentContext.debug("Attempting to obtain a fresh JMS Connection...");
            this.connection = this.connectionContainer.waitForNewConnection(this.connection);
            if (this.connection == null) {
                this.agentContext.warn("Could not obtain fresh JMS Connection - stopping");
            } else {
                this.agentContext.debug("Obtained fresh JMS Connection");
            }
        }
    }

    private void receiveLoop(Connection connection) throws JMSException {
        Session session = null;
        MessageConsumer messageConsumer = null;
        try {
            session = connection.createSession(false, 1);
            messageConsumer = session.createConsumer(this.destinationProvider.get(session));
            while (this.isRunning) {
                Message receive = messageConsumer.receive(1000L);
                if (receive != null) {
                    this.listener.accept(receive);
                }
            }
            closeMessageConsumer(messageConsumer);
            closeSession(session);
        } catch (Throwable th) {
            closeMessageConsumer(messageConsumer);
            closeSession(session);
            throw th;
        }
    }

    private void closeSession(Session session) {
        if (session == null) {
            return;
        }
        try {
            session.close();
        } catch (JMSException e) {
        }
    }

    private void closeMessageConsumer(MessageConsumer messageConsumer) {
        if (messageConsumer == null) {
            return;
        }
        try {
            messageConsumer.close();
        } catch (JMSException e) {
        }
    }

    public boolean taskWillBlock() {
        return true;
    }
}
