package nstream.adapter.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import nstream.adapter.common.ingress.IngestorAgent;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.common.schedule.DeferrableException;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/rabbitmq/RabbitMqIngestingAgent.class */
public abstract class RabbitMqIngestingAgent extends IngestorAgent<RabbitMqIngressSettings, Response> {
    protected Connection rabbitMqConnection;
    protected volatile Channel rabbitMqChannel;
    protected volatile String rabbitMqConsumerTag;

    /* loaded from: input_file:nstream/adapter/rabbitmq/RabbitMqIngestingAgent$Response.class */
    public static class Response {
        public final byte[] body;
        public final String consumerTag;
        public final Envelope envelope;
        public final AMQP.BasicProperties properties;

        public Response(byte[] bArr, String str, Envelope envelope, AMQP.BasicProperties basicProperties) {
            this.body = bArr;
            this.consumerTag = str;
            this.envelope = envelope;
            this.properties = basicProperties;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignConnection(Connection connection) {
        this.rabbitMqConnection = connection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assignChannelAndConsumer(Properties properties, Runnable runnable) {
        if (this.rabbitMqConnection == null) {
            throw new RuntimeException(nodeUri() + ": can't assign channel with unassigned connection");
        }
        if (this.rabbitMqChannel != null) {
            throw new RuntimeException(nodeUri() + ": channel already assigned");
        }
        try {
            this.rabbitMqChannel = this.rabbitMqConnection.createChannel();
            try {
                this.rabbitMqConsumerTag = RabbitMqConsumerLoader.loadConsumer(this.rabbitMqChannel, properties, new DefaultConsumer(this.rabbitMqChannel) { // from class: nstream.adapter.rabbitmq.RabbitMqIngestingAgent.1
                    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                        RabbitMqIngestingAgent.this.ingestOrUnsubscribe(bArr, str, envelope, basicProperties);
                    }
                });
                runnable.run();
            } catch (IOException e) {
                unassignChannelAndConsumer();
                throw new RuntimeException(nodeUri() + ": failed to assign channel", e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(nodeUri() + ": failed to assign channel", e2);
        }
    }

    protected void unassignChannelAndConsumer() {
        this.rabbitMqConsumerTag = null;
        try {
            this.rabbitMqChannel.close();
        } catch (IOException | TimeoutException e) {
        }
        this.rabbitMqChannel = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: parseIngressSettings, reason: merged with bridge method [inline-methods] */
    public RabbitMqIngressSettings m5parseIngressSettings(Value value) {
        return RabbitMqAdapterUtils.ingressSettingsFromProp(value);
    }

    protected void ingestOrUnsubscribe(byte[] bArr, String str, Envelope envelope, AMQP.BasicProperties basicProperties) {
        try {
            ingest(new Response(bArr, str, envelope, basicProperties));
        } catch (Exception e) {
            didFail(new RuntimeException(nodeUri() + ": message triggered fatal exception; stopping consumer. Message info: consumerTag=" + str + ", envelope=" + envelope + ", basicProperties=" + basicProperties, e));
            unassignChannelAndConsumer();
        } catch (DeferrableException e2) {
            handleDeferrableException(e2);
        }
    }

    protected void stageReception() {
        loadSettings("rabbitMqIngressConf");
        assignConnection((Connection) ProvisionLoader.getProvision(((RabbitMqIngressSettings) this.ingressSettings).connectionProvisionName()).value());
        assignChannelAndConsumer((Properties) ProvisionLoader.getProvision(((RabbitMqIngressSettings) this.ingressSettings).consumerPropertiesProvisionName()).value(), () -> {
            info(nodeUri() + ": successfully staged consumer");
        });
    }

    public void didStart() {
        System.out.println(nodeUri() + ": didStart");
        submit(this::stageReception);
    }
}
