package io.gridgo.connector.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Delivery;
import io.gridgo.bean.BArray;
import io.gridgo.bean.BObject;
import io.gridgo.bean.BValue;
import io.gridgo.connector.impl.AbstractConsumer;
import io.gridgo.connector.rabbitmq.RabbitMQConsumer;
import io.gridgo.connector.rabbitmq.RabbitMQQueueConfig;
import io.gridgo.connector.support.config.ConnectorContext;
import io.gridgo.framework.support.Message;
import io.gridgo.framework.support.Payload;
import java.io.IOException;
import lombok.NonNull;
import org.joo.promise4j.Deferred;
import org.joo.promise4j.DeferredStatus;
import org.joo.promise4j.impl.CompletableDeferredObject;

/* loaded from: input_file:io/gridgo/connector/rabbitmq/impl/AbstractRabbitMQConsumer.class */
public abstract class AbstractRabbitMQConsumer extends AbstractConsumer implements RabbitMQConsumer {
    private final Connection connection;
    private final RabbitMQQueueConfig queueConfig;
    private Channel channel;
    private final String uniqueIdentifier;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRabbitMQConsumer(ConnectorContext connectorContext, @NonNull Connection connection, @NonNull RabbitMQQueueConfig rabbitMQQueueConfig, @NonNull String str) {
        super(connectorContext);
        if (connection == null) {
            throw new NullPointerException("connection is marked @NonNull but is null");
        }
        if (rabbitMQQueueConfig == null) {
            throw new NullPointerException("queueConfig is marked @NonNull but is null");
        }
        if (str == null) {
            throw new NullPointerException("uniqueIdentifier is marked @NonNull but is null");
        }
        this.connection = connection;
        this.queueConfig = rabbitMQQueueConfig;
        this.uniqueIdentifier = str;
    }

    protected String generateName() {
        return "consumer." + getUniqueIdentifier();
    }

    protected void onStart() {
        this.channel = initChannel(this.connection);
        subscibe(this::onDelivery, this::onCancel);
    }

    protected void onStop() {
        closeChannel();
        this.channel = null;
    }

    private void onCancel(String str) {
        getLogger().info("Cancelled " + str);
    }

    private void sendResponse(Exception exc, AMQP.BasicProperties basicProperties) {
        BObject ofEmpty = BObject.ofEmpty();
        ofEmpty.setAny("status", 500);
        sendResponse(createMessage(ofEmpty, BValue.of(("Internal server error: " + exc.getMessage()) == null ? "unknown message" : exc.getMessage())), basicProperties);
    }

    private void sendResponse(Message message, AMQP.BasicProperties basicProperties) {
        Payload payload = message.getPayload();
        try {
            getChannel().basicPublish("", basicProperties.getReplyTo(), basicProperties, BArray.newFromSequence(new Object[]{(BValue) payload.getId().orElse(null), payload.getHeaders(), payload.getBody()}).toBytes());
        } catch (IOException e) {
            getLogger().error("Cannot send response to caller: " + message, e);
        }
    }

    protected Deferred<Message, Exception> createDeferred() {
        return new CompletableDeferredObject();
    }

    private void onDelivery(String str, @NonNull Delivery delivery) {
        Deferred<Message, Exception> createDeferred;
        if (delivery == null) {
            throw new NullPointerException("delivery is marked @NonNull but is null");
        }
        AMQP.BasicProperties properties = delivery.getProperties();
        long deliveryTag = delivery.getEnvelope().getDeliveryTag();
        try {
            Message parseMessage = parseMessage(delivery.getBody());
            String correlationId = properties == null ? null : properties.getCorrelationId();
            if (correlationId == null && getQueueConfig().isAutoAck()) {
                createDeferred = null;
            } else {
                createDeferred = createDeferred();
                if (!this.queueConfig.isAutoAck()) {
                    boolean isAckOnFail = getQueueConfig().isAckOnFail();
                    createDeferred.promise().always((deferredStatus, message, exc) -> {
                        if (deferredStatus == DeferredStatus.RESOLVED || isAckOnFail) {
                            sendAck(deliveryTag);
                        }
                    });
                }
                if (correlationId != null) {
                    createDeferred.promise().done(message2 -> {
                        sendResponse(message2, properties);
                    }).fail(exc2 -> {
                        sendResponse(exc2, properties);
                    });
                }
            }
            publish(parseMessage, createDeferred);
        } catch (Exception e) {
            getLogger().error("Error while parse delivery body into message (ack will be sent automatically)", e);
            sendAck(deliveryTag);
            sendResponse(e, properties);
        }
    }

    private void sendAck(long j) {
        try {
            getChannel().basicAck(j, getQueueConfig().isMultipleAck());
        } catch (IOException e) {
            throw new RuntimeException("Cannot send ack for delivery tag: " + j, e);
        }
    }

    @Override // io.gridgo.connector.rabbitmq.RabbitMQChannelLifeCycle
    public RabbitMQQueueConfig getQueueConfig() {
        return this.queueConfig;
    }

    @Override // io.gridgo.connector.rabbitmq.RabbitMQChannelLifeCycle
    public Channel getChannel() {
        return this.channel;
    }

    protected String getUniqueIdentifier() {
        return this.uniqueIdentifier;
    }
}
