package io.hoplin;

import com.google.common.collect.ArrayListMultimap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.hoplin.json.JsonCodec;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/DefaultQueueConsumer.class */
public class DefaultQueueConsumer extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(DefaultQueueConsumer.class);
    private final QueueOptions queueOptions;
    private ArrayListMultimap<Class, MethodReference> handlers;
    private Executor executor;
    private JsonCodec codec;
    private final ConsumerErrorStrategy consumerErrorStrategy;

    /* loaded from: input_file:io/hoplin/DefaultQueueConsumer$MethodReference.class */
    private class MethodReference<T> {
        Class<T> root;
        BiConsumer<T, MessageContext> handler;

        private MethodReference() {
        }
    }

    public DefaultQueueConsumer(Channel channel, QueueOptions queueOptions) {
        this(channel, queueOptions, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    }

    public DefaultQueueConsumer(Channel channel, QueueOptions queueOptions, Executor executor) {
        super(channel);
        this.handlers = ArrayListMultimap.create();
        this.queueOptions = (QueueOptions) Objects.requireNonNull(queueOptions);
        this.executor = (Executor) Objects.requireNonNull(executor);
        this.codec = new JsonCodec();
        this.consumerErrorStrategy = new DefaultConsumerErrorStrategy(channel);
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        AckStrategy strategy;
        Class<?> typeAsClass;
        int i;
        MessageContext create = MessageContext.create(str, envelope, basicProperties);
        try {
            strategy = ackFromOptions(this.queueOptions);
            MessagePayload messagePayload = (MessagePayload) this.codec.deserialize(bArr, MessagePayload.class);
            Object payload = messagePayload.getPayload();
            typeAsClass = messagePayload.getTypeAsClass();
            List<MethodReference> list = this.handlers.get(typeAsClass);
            ArrayList arrayList = new ArrayList();
            i = 0;
            for (MethodReference methodReference : list) {
                try {
                    BiConsumer biConsumer = methodReference.handler;
                    if (methodReference.root == typeAsClass) {
                        i++;
                        execute(create, payload, biConsumer);
                    } else {
                        Optional safeCast = safeCast(payload, typeAsClass);
                        if (safeCast.isPresent()) {
                            i++;
                            execute(create, safeCast.get(), biConsumer);
                        }
                    }
                } catch (Exception e) {
                    arrayList.add(e);
                    log.error("Handler error for message  : " + messagePayload, e);
                }
            }
        } catch (Exception e2) {
            log.error("Unable to process message", e2);
            try {
                strategy = this.consumerErrorStrategy.handleConsumerError(create, e2);
            } catch (Exception e3) {
                log.error("Exception in error strategy", e3);
                strategy = AcknowledgmentStrategies.BASIC_ACK.strategy();
            }
        }
        if (i == 0) {
            throw new HoplinRuntimeException("No handlers defined for type : " + typeAsClass);
        }
        acknowledge(getChannel(), create, strategy);
    }

    private void execute(MessageContext messageContext, Object obj, BiConsumer biConsumer) {
        biConsumer.accept(obj, messageContext);
    }

    private <S, T> Optional<T> safeCast(S s, Class<T> cls) {
        return cls.isInstance(s) ? Optional.of(cls.cast(s)) : Optional.empty();
    }

    private void acknowledge(Channel channel, MessageContext messageContext, AckStrategy ackStrategy) {
        try {
            String messageId = messageContext.getProperties().getMessageId();
            long deliveryTag = messageContext.getReceivedInfo().getDeliveryTag();
            log.info("Acking [messageId, deliveryTag] : {}, {}", messageId, Long.valueOf(deliveryTag));
            ackStrategy.accept(channel, Long.valueOf(deliveryTag));
        } catch (Exception e) {
            log.error("Unable to ACK ", e);
        }
    }

    private AckStrategy ackFromOptions(QueueOptions queueOptions) {
        return queueOptions.isAutoAck() ? AcknowledgmentStrategies.NOOP.strategy() : AcknowledgmentStrategies.BASIC_ACK.strategy();
    }

    public synchronized <T> void addHandler(Class<T> cls, BiConsumer<T, MessageContext> biConsumer) {
        Objects.requireNonNull(cls);
        Objects.requireNonNull(biConsumer);
        Class<T> cls2 = cls;
        while (true) {
            Class<T> cls3 = cls2;
            if (cls3 == Object.class) {
                return;
            }
            MethodReference methodReference = new MethodReference();
            methodReference.handler = biConsumer;
            methodReference.root = cls;
            this.handlers.put(cls3, methodReference);
            cls2 = cls3.getSuperclass();
        }
    }

    public void handleCancel(String str) {
        throw new HoplinRuntimeException("Not yet implemented");
    }
}
