package org.iris_events.consumer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.iris_events.annotations.ExchangeType;
import org.iris_events.consumer.QueueDeclarator;
import org.iris_events.context.IrisContext;
import org.iris_events.runtime.ExchangeNameProvider;
import org.iris_events.runtime.QueueNameProvider;
import org.iris_events.runtime.channel.ChannelService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iris_events/consumer/Consumer.class */
public class Consumer implements RecoveryListener {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    private static final String RPC_EXCHANGE_SUFFIX = "rpc";
    private final IrisContext context;
    private final ChannelService channelService;
    private final DeliverCallbackProvider deliverCallbackProvider;
    private final QueueNameProvider queueNameProvider;
    private final ExchangeNameProvider exchangeNameProvider;
    private final QueueDeclarator queueDeclarator;
    private final ExchangeDeclarator exchangeDeclarator;
    private DeliverCallback callback;
    private String channelId = UUID.randomUUID().toString();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.iris_events.consumer.Consumer$1, reason: invalid class name */
    /* loaded from: input_file:org/iris_events/consumer/Consumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$iris_events$annotations$ExchangeType = new int[ExchangeType.values().length];

        static {
            try {
                $SwitchMap$org$iris_events$annotations$ExchangeType[ExchangeType.DIRECT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$iris_events$annotations$ExchangeType[ExchangeType.TOPIC.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$iris_events$annotations$ExchangeType[ExchangeType.FANOUT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public Consumer(IrisContext irisContext, ChannelService channelService, DeliverCallbackProvider deliverCallbackProvider, QueueNameProvider queueNameProvider, ExchangeNameProvider exchangeNameProvider, QueueDeclarator queueDeclarator, ExchangeDeclarator exchangeDeclarator) {
        this.context = irisContext;
        this.channelService = channelService;
        this.deliverCallbackProvider = deliverCallbackProvider;
        this.queueNameProvider = queueNameProvider;
        this.exchangeNameProvider = exchangeNameProvider;
        this.queueDeclarator = queueDeclarator;
        this.exchangeDeclarator = exchangeDeclarator;
    }

    public void initChannel() throws IOException {
        Recoverable orCreateChannelById = this.channelService.getOrCreateChannelById(this.channelId);
        this.callback = this.deliverCallbackProvider.createDeliverCallback(orCreateChannelById);
        ExchangeType exchangeType = this.context.exchangeType();
        validateBindingKeys(this.context.getBindingKeys(), exchangeType);
        declareTopology(orCreateChannelById, exchangeType);
        if (orCreateChannelById instanceof Recoverable) {
            orCreateChannelById.addRecoveryListener(this);
        }
    }

    public DeliverCallback getCallback() {
        return this.callback;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IrisContext getContext() {
        return this.context;
    }

    private void declareTopology(Channel channel, ExchangeType exchangeType) throws IOException {
        HashMap hashMap = new HashMap();
        String exchangeName = getExchangeName();
        String queueName = this.queueNameProvider.getQueueName(this.context);
        boolean isConsumerOnEveryInstance = this.context.isConsumerOnEveryInstance();
        channel.basicQos(this.context.getPrefetch());
        long ttl = this.context.getTtl();
        if (ttl >= 0) {
            hashMap.put("x-message-ttl", Long.valueOf(ttl));
        }
        if (this.context.isRpc()) {
            hashMap.put("x-message-ttl", 2000);
            String rpcRequestExchangeName = this.exchangeNameProvider.getRpcRequestExchangeName(this.context.getName());
            String rpcResponseExchangeName = this.exchangeNameProvider.getRpcResponseExchangeName(this.context.getRpcResponseEventName());
            log.info(String.format("Declaring topology for RPC consumer.\nrequestExchange: %s\nresponseExchange: %s\nrequestQueue: %s\n", rpcRequestExchangeName, rpcResponseExchangeName, queueName));
            declareQueue(channel, isConsumerOnEveryInstance, queueName, hashMap);
            this.exchangeDeclarator.declareExchange(rpcRequestExchangeName, ExchangeType.FANOUT, false);
            channel.queueBind(queueName, rpcRequestExchangeName, queueName);
            this.exchangeDeclarator.declareExchange(rpcResponseExchangeName, ExchangeType.DIRECT, false);
            channel.basicConsume(queueName, false, this.callback, str -> {
                log.warn("Channel canceled for {}", queueName);
            }, (str2, shutdownSignalException) -> {
                reInitChannel(shutdownSignalException, queueName, str2);
            });
            log.info("consumer (RPC) started on queue '{}' --> {} binding key(s): {}", new Object[]{queueName, exchangeName, queueName});
            return;
        }
        Optional<String> deadLetterQueueName = this.context.getDeadLetterQueueName();
        if (deadLetterQueueName.isPresent()) {
            declareAndBindDeadLetterQueue(channel, deadLetterQueueName.get());
            hashMap.put("x-dead-letter-routing-key", this.context.getDeadLetterRoutingKey(queueName));
            hashMap.put("x-dead-letter-exchange", this.context.getDeadLetterExchangeName().orElseThrow());
        }
        declareQueue(channel, isConsumerOnEveryInstance, queueName, hashMap);
        this.exchangeDeclarator.declareExchange(exchangeName, exchangeType, this.context.isFrontendMessage());
        List<String> bindingKeys = getBindingKeys(exchangeType);
        Iterator<String> it = bindingKeys.iterator();
        while (it.hasNext()) {
            channel.queueBind(queueName, exchangeName, it.next());
        }
        channel.basicConsume(queueName, false, this.callback, str3 -> {
            log.warn("Channel canceled for {}", queueName);
        }, (str4, shutdownSignalException2) -> {
            reInitChannel(shutdownSignalException2, queueName, str4);
        });
        log.info("consumer started on queue '{}' --> {} binding key(s): {}", new Object[]{queueName, exchangeName, String.join(", ", bindingKeys)});
    }

    private String getExchangeName() {
        return this.context.getName();
    }

    private void reInitChannel(ShutdownSignalException shutdownSignalException, String str, String str2) {
        log.warn("Channel shut down for with signal:{}, queue: {}, consumer: {}", new Object[]{shutdownSignalException, str, str2});
        try {
            this.channelService.removeChannel(this.channelId);
            this.channelId = UUID.randomUUID().toString();
            initChannel();
        } catch (IOException e) {
            log.error(String.format("Could not re-initialize channel for queue %s", str), e);
        }
    }

    private void declareQueue(Channel channel, boolean z, String str, Map<String, Object> map) throws IOException {
        this.queueDeclarator.declareQueueWithRecreateOnConflict(channel, new QueueDeclarator.QueueDeclarationDetails(str, getDurable(), false, z || this.context.isAutoDelete(), map));
    }

    private void declareAndBindDeadLetterQueue(Channel channel, String str) throws IOException {
        if (this.context.isCustomDeadLetterQueue()) {
            channel.exchangeDeclare(str, BuiltinExchangeType.TOPIC, true);
            this.queueDeclarator.declareQueueWithRecreateOnConflict(channel, new QueueDeclarator.QueueDeclarationDetails(str, true, false, false, null));
            channel.queueBind(str, str, "#");
        }
    }

    private boolean getDurable() {
        if (this.context.isFrontendMessage()) {
            return false;
        }
        return this.context.isDurable();
    }

    private List<String> getBindingKeys(ExchangeType exchangeType) {
        String name = this.context.getName();
        if (this.context.isFrontendMessage()) {
            return List.of("#." + name);
        }
        switch (AnonymousClass1.$SwitchMap$org$iris_events$annotations$ExchangeType[exchangeType.ordinal()]) {
            case 1:
            case 2:
                return this.context.getBindingKeys();
            case 3:
                return List.of("#." + name);
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    private static void validateBindingKeys(List<String> list, ExchangeType exchangeType) {
        if (exchangeType == ExchangeType.FANOUT) {
            return;
        }
        if (list == null || list.size() == 0) {
            throw new IllegalArgumentException("Binding key(s) are required when declaring a " + exchangeType.name() + " type exchange.");
        }
        if (exchangeType == ExchangeType.DIRECT && list.size() > 1) {
            throw new IllegalArgumentException("Exactly one binding key is required when declaring a direct type exchange.");
        }
    }

    public void handleRecovery(Recoverable recoverable) {
        log.info("handleRecovery called for consumer {}", this.context.getName());
        try {
            initChannel();
        } catch (IOException e) {
            log.error(String.format("Failed handling recovery for consumer %s", this.context.getName()), e);
            throw new RuntimeException(e);
        }
    }

    public void handleRecoveryStarted(Recoverable recoverable) {
        log.info("handleRecoveryStarted for consumer {}", this.context.getName());
    }
}
