package io.gridgo.connector.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import io.gridgo.bean.BArray;
import io.gridgo.bean.BValue;
import io.gridgo.connector.impl.AbstractProducer;
import io.gridgo.connector.rabbitmq.RabbitMQProducer;
import io.gridgo.connector.rabbitmq.RabbitMQQueueConfig;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.execution.ExecutionStrategy;
import io.gridgo.framework.execution.impl.DefaultExecutionStrategy;
import io.gridgo.framework.support.Message;
import io.gridgo.framework.support.Payload;
import io.gridgo.framework.support.generators.impl.TimeBasedIdGenerator;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.joo.promise4j.Deferred;
import org.joo.promise4j.Promise;
import org.joo.promise4j.impl.AsyncDeferredObject;
import org.joo.promise4j.impl.CompletableDeferredObject;

/* loaded from: input_file:io/gridgo/connector/rabbitmq/impl/AbstractRabbitMQProducer.class */
public abstract class AbstractRabbitMQProducer extends AbstractProducer implements RabbitMQProducer {
    private static final TimeBasedIdGenerator TIME_BASED_ID_GENERATOR = new TimeBasedIdGenerator();
    private static final ExecutionStrategy DEFAULT_EXECUTION_STRATEGY = new DefaultExecutionStrategy();
    private final Connection connection;
    private final RabbitMQQueueConfig queueConfig;
    private Channel channel;
    private String responseQueue;
    private final String uniqueIdentifier;
    private final Map<String, Deferred<Message, Exception>> correlationIdToDeferredMap;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRabbitMQProducer(@NonNull ConnectorContext connectorContext, @NonNull Connection connection, @NonNull RabbitMQQueueConfig rabbitMQQueueConfig, @NonNull String str) {
        super(connectorContext);
        this.correlationIdToDeferredMap = new NonBlockingHashMap();
        if (connectorContext == null) {
            throw new NullPointerException("context is marked @NonNull but is null");
        }
        if (connection == null) {
            throw new NullPointerException("connection is marked @NonNull but is null");
        }
        if (rabbitMQQueueConfig == null) {
            throw new NullPointerException("queueConfig is marked @NonNull but is null");
        }
        if (str == null) {
            throw new NullPointerException("uniqueIdentifier is marked @NonNull but is null");
        }
        this.connection = connection;
        this.queueConfig = rabbitMQQueueConfig;
        this.uniqueIdentifier = str;
    }

    private void _send(Message message, Deferred<Message, Exception> deferred) {
        String string = ((BValue) message.getRoutingId().orElse(BValue.ofEmpty())).getString();
        ((ExecutionStrategy) getContext().getProducerExecutionStrategy().orElse(DEFAULT_EXECUTION_STRATEGY)).execute(() -> {
            publish(buildRequestBody(message.getPayload()), null, string);
            if (deferred != null) {
                deferred.resolve((Object) null);
            }
        });
    }

    protected byte[] buildRequestBody(Payload payload) {
        return BArray.ofSequence(new Object[]{payload.getId().orElse(null), payload.getHeaders(), payload.getBody()}).toBytes();
    }

    public final Promise<Message, Exception> call(Message message) {
        if (!this.queueConfig.isRpc()) {
            throw new UnsupportedOperationException("Cannot make a call on non-rpc rabbitmq producer, use rpc=true in connector endpoint");
        }
        Optional routingId = message.getRoutingId();
        String string = routingId.isPresent() ? ((BValue) routingId.get()).getString() : null;
        String string2 = ((BValue) TIME_BASED_ID_GENERATOR.generateId().orElseThrow()).getString();
        AMQP.BasicProperties createBasicProperties = createBasicProperties(string2);
        byte[] buildRequestBody = buildRequestBody(message.getPayload());
        Deferred<Message, Exception> createDeferred = createDeferred();
        this.correlationIdToDeferredMap.put(string2, createDeferred);
        createDeferred.promise().always((deferredStatus, message2, exc) -> {
            this.correlationIdToDeferredMap.remove(string2);
        });
        ((ExecutionStrategy) getContext().getProducerExecutionStrategy().orElse(DEFAULT_EXECUTION_STRATEGY)).execute(() -> {
            try {
                publish(buildRequestBody, createBasicProperties, string);
            } catch (Exception e) {
                createDeferred.reject(e);
            }
        });
        return createDeferred.promise();
    }

    protected AMQP.BasicProperties createBasicProperties(String str) {
        return new AMQP.BasicProperties.Builder().correlationId(str).replyTo(this.responseQueue).build();
    }

    protected Deferred<Message, Exception> createDeferred() {
        return new CompletableDeferredObject();
    }

    protected String generateName() {
        return null;
    }

    public boolean isCallSupported() {
        return true;
    }

    private void onResponse(String str, Delivery delivery) {
        Deferred<Message, Exception> deferred = this.correlationIdToDeferredMap.get(delivery.getProperties().getCorrelationId());
        if (deferred == null) {
            return;
        }
        getContext().getCallbackInvokerStrategy().execute(() -> {
            try {
                deferred.resolve(Message.parse(delivery.getBody()));
            } catch (Exception e) {
                deferred.reject(e);
            }
        });
    }

    protected void onStart() {
        this.channel = initChannel(this.connection);
        if (getQueueConfig().isRpc()) {
            this.responseQueue = initResponseQueue(this::onResponse);
        }
    }

    protected void onStop() {
        closeChannel();
    }

    public final void send(@NonNull Message message) {
        if (message == null) {
            throw new NullPointerException("request is marked @NonNull but is null");
        }
        _send(message, null);
    }

    public final Promise<Message, Exception> sendWithAck(@NonNull Message message) {
        if (message == null) {
            throw new NullPointerException("message is marked @NonNull but is null");
        }
        AsyncDeferredObject asyncDeferredObject = new AsyncDeferredObject();
        _send(message, asyncDeferredObject);
        return asyncDeferredObject.promise();
    }

    @Override // io.gridgo.connector.rabbitmq.RabbitMQChannelLifeCycle
    public RabbitMQQueueConfig getQueueConfig() {
        return this.queueConfig;
    }

    @Override // io.gridgo.connector.rabbitmq.RabbitMQChannelLifeCycle
    public Channel getChannel() {
        return this.channel;
    }

    public String getResponseQueue() {
        return this.responseQueue;
    }

    protected String getUniqueIdentifier() {
        return this.uniqueIdentifier;
    }
}
