package nstream.adapter.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import nstream.adapter.common.DutyException;
import nstream.adapter.common.egress.PublishingAgent;

/* loaded from: input_file:nstream/adapter/rabbitmq/RabbitMqPublishingAgent.class */
public abstract class RabbitMqPublishingAgent<V> extends PublishingAgent<V, RabbitMqPublishable, Void> {
    protected Connection rabbitMqConnection;
    protected volatile Channel rabbitMqChannel;
    protected volatile RabbitMqProducerStaticParams rabbitMqProducerStaticParams;

    protected void assignConnection(Connection connection) {
        this.rabbitMqConnection = connection;
    }

    protected void assignChannelAndProducer(Properties properties, Runnable runnable) {
        if (this.rabbitMqConnection == null) {
            throw new RuntimeException(nodeUri() + ": can't assign channel with unassigned connection");
        }
        if (this.rabbitMqChannel != null) {
            throw new RuntimeException(nodeUri() + ": channel already assigned");
        }
        performDuty(() -> {
            try {
                this.rabbitMqChannel = this.rabbitMqConnection.createChannel();
                this.rabbitMqProducerStaticParams = RabbitMqProducerStaticParams.fromProperties(properties);
                runnable.run();
            } catch (IOException e) {
                throw new DutyException(nodeUri() + ": failed to assign channel", e, true);
            }
        });
    }

    protected String exchangeName(V v) {
        if (this.rabbitMqProducerStaticParams == null || !this.rabbitMqProducerStaticParams.exchange().isPresent()) {
            throw new UnsupportedOperationException(nodeUri() + ": exchangeName() must be configured");
        }
        return this.rabbitMqProducerStaticParams.exchange().get();
    }

    protected String routingKey(V v) {
        if (this.rabbitMqProducerStaticParams == null || !this.rabbitMqProducerStaticParams.exchange().isPresent()) {
            throw new UnsupportedOperationException(nodeUri() + ": routingKey() must be configured");
        }
        return this.rabbitMqProducerStaticParams.exchange().get();
    }

    protected boolean mandatory(V v) {
        return this.rabbitMqProducerStaticParams != null && this.rabbitMqProducerStaticParams.mandatory();
    }

    protected AMQP.BasicProperties basicProperties(V v) {
        return null;
    }

    protected abstract byte[] bodyFromState(V v);

    protected RabbitMqPublishable createPublishable(V v) {
        return new RabbitMqPublishable(exchangeName(v), routingKey(v), mandatory(v), basicProperties(v), bodyFromState(v));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Void publish(RabbitMqPublishable rabbitMqPublishable) throws DutyException {
        if (this.rabbitMqChannel == null) {
            throw new DutyException(nodeUri() + ": can't publish before assigning channel", false);
        }
        try {
            this.rabbitMqChannel.basicPublish(rabbitMqPublishable.exchange(), rabbitMqPublishable.routingKey(), rabbitMqPublishable.mandatory(), rabbitMqPublishable.basicProperties(), rabbitMqPublishable.body());
            return null;
        } catch (Exception e) {
            throw new DutyException(nodeUri() + ": failed to publish " + new String(rabbitMqPublishable.body(), StandardCharsets.UTF_8), e, false);
        }
    }

    public void didStart() {
        System.out.println(nodeUri() + ": didStart");
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* renamed from: createPublishable, reason: collision with other method in class */
    protected /* bridge */ /* synthetic */ Object m7createPublishable(Object obj) {
        return createPublishable((RabbitMqPublishingAgent<V>) obj);
    }
}
