/*
 * Decompiled with CFR 0.152.
 */
package me.kisoft.easybus.rabbitmq;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import lombok.NonNull;
import me.kisoft.easybus.BackingBus;
import me.kisoft.easybus.Listener;
import me.kisoft.easybus.memory.MemoryBackingBusImpl;
import me.kisoft.easybus.rabbitmq.ExchangeName;
import me.kisoft.easybus.rabbitmq.ExchangeType;
import me.kisoft.easybus.rabbitmq.QueueName;
import me.kisoft.easybus.rabbitmq.RoutingKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RabbitMQBackingBusImpl
extends BackingBus {
    protected static final Logger log = LoggerFactory.getLogger(RabbitMQBackingBusImpl.class);
    @NonNull
    private final Connection connection;
    private final ObjectMapper mapper;
    private final Set<String> exchangeSet = new HashSet<String>();
    private final ReentrantLock declarationLock = new ReentrantLock();
    private final ScheduledExecutorService rebindingExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "rabbitmq-binding-pool"));
    private final MemoryBackingBusImpl memoryBusImpl;
    private final boolean allowUpdate;
    private final int maxPrefetch;
    private final boolean requeue;
    private final int retries;
    private final int retryThresholdMillis;

    public void post(Object object) {
        try (Channel channel = this.connection.createChannel();){
            String exchangeName = this.getExcahngeName(object);
            BuiltinExchangeType type = this.getExchangeType(object);
            this.verifyOrUpdateExchange(exchangeName, type);
            log.debug("Published Message to  Exchange {}", (Object)exchangeName);
            channel.basicPublish(this.getExcahngeName(object.getClass()), "all", null, this.mapper.writer().writeValueAsBytes(object));
        }
        catch (IOException | TimeoutException ex) {
            throw new RuntimeException(ex);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void verifyOrUpdateExchange(String exchangeName, BuiltinExchangeType type) throws IOException, TimeoutException {
        block27: {
            if (!this.exchangeSet.contains(exchangeName)) {
                this.declarationLock.lock();
                try {
                    boolean exchangeExists;
                    if (this.exchangeSet.contains(exchangeName)) break block27;
                    try (Channel verificationChannel = this.connection.createChannel();){
                        verificationChannel.exchangeDeclarePassive(exchangeName);
                        log.debug("Exchange {} already exists", (Object)exchangeName);
                        exchangeExists = true;
                    }
                    catch (IOException ex) {
                        exchangeExists = false;
                    }
                    boolean exchangeNeedsUpdate = false;
                    if (!exchangeExists) {
                        try (Channel creationChannel = this.connection.createChannel();){
                            creationChannel.exchangeDeclare(exchangeName, type);
                            log.debug("Declared Exchange {}", (Object)exchangeName);
                            this.exchangeSet.add(exchangeName);
                            exchangeNeedsUpdate = false;
                        }
                        catch (IOException ex) {
                            exchangeNeedsUpdate = true;
                        }
                    }
                    if (!exchangeNeedsUpdate || !this.allowUpdate) break block27;
                    try (Channel updateChannel = this.connection.createChannel();){
                        updateChannel.exchangeDelete(exchangeName, true);
                        updateChannel.exchangeDeclare(exchangeName, type);
                        this.exchangeSet.add(exchangeName);
                    }
                }
                finally {
                    this.declarationLock.unlock();
                }
            }
        }
    }

    public void clear() {
        this.memoryBusImpl.clear();
    }

    public void close() throws Exception {
        this.memoryBusImpl.close();
        this.connection.close();
    }

    protected BuiltinExchangeType getExchangeType(Object object) {
        return this.getExchangeType(object.getClass());
    }

    protected BuiltinExchangeType getExchangeType(Class clazz) {
        ExchangeType annotation = clazz.getAnnotation(ExchangeType.class);
        if (annotation == null || annotation.value() == null) {
            return BuiltinExchangeType.FANOUT;
        }
        return annotation.value();
    }

    protected String getQueueName(Object object) {
        return this.getQueueName(object.getClass());
    }

    protected String getQueueName(Class clazz) {
        QueueName queueName = clazz.getAnnotation(QueueName.class);
        if (queueName != null) {
            return queueName.value();
        }
        return clazz.getSimpleName();
    }

    protected String getExcahngeName(Object object) {
        return this.getExcahngeName(object.getClass());
    }

    protected String getExcahngeName(Class clazz) {
        ExchangeName exchangeName = clazz.getAnnotation(ExchangeName.class);
        if (exchangeName != null) {
            return exchangeName.value();
        }
        return clazz.getSimpleName();
    }

    protected Set<String> getRoutingKeys(Object object) {
        return this.getRoutingKeys(object.getClass());
    }

    protected Set<String> getRoutingKeys(Class clazz) {
        RoutingKey[] routingKeys = (RoutingKey[])clazz.getAnnotationsByType(RoutingKey.class);
        if (routingKeys == null || routingKeys.length == 0) {
            return Set.of("#");
        }
        return Arrays.stream(routingKeys).map(RoutingKey::value).distinct().collect(Collectors.toSet());
    }

    private void doAddListener(Class eventClass, Listener eventListener, int retry, int maxRetries) {
        if (retry < 1 || maxRetries < 1) {
            this.rebindingExecutor.schedule(() -> this.doAddListener(eventClass, eventListener, 1, 1), 50L, TimeUnit.MILLISECONDS);
            return;
        }
        if (retry > maxRetries) {
            log.error("Failure to add listener {} for event {} : too many retries({}/{})", new Object[]{eventListener, eventClass, retry, maxRetries});
        }
        try {
            log.warn("Attempting to add listener {} for event {} : attempt ({}/{})", new Object[]{eventListener, eventClass, retry, maxRetries});
            String exchangeName = this.getExcahngeName(eventClass);
            BuiltinExchangeType type = this.getExchangeType(eventClass);
            String queueName = this.getQueueName(eventListener);
            Set<String> routingKeys = this.getRoutingKeys(eventListener);
            Channel channel = this.connection.createChannel();
            channel.basicQos(this.maxPrefetch, false);
            this.verifyOrUpdateExchange(exchangeName, type);
            String queue = channel.queueDeclare(queueName, false, false, false, null).getQueue();
            for (String routingKey : routingKeys) {
                channel.queueBind(queue, exchangeName, routingKey);
            }
            RabbitMQBackingBusConsumer consumer = new RabbitMQBackingBusConsumer(channel, eventClass, eventListener);
            channel.basicConsume(queueName, (Consumer)consumer);
            log.warn("Successfully added listener {} for event {} : attempt ({}/{})", new Object[]{eventListener, eventClass, retry, maxRetries});
        }
        catch (Throwable ex) {
            log.warn("Failed to add listener {} for event {} : {}, trying again", new Object[]{eventListener, eventClass, ex.getMessage()});
            this.rebindingExecutor.schedule(() -> this.doAddListener(eventClass, eventListener, retry + 1, maxRetries), (long)(retry * this.retryThresholdMillis), TimeUnit.MILLISECONDS);
        }
    }

    protected void addListener(Class eventClass, Listener listener) {
        this.doAddListener(eventClass, listener, 1, this.retries);
    }

    private static ObjectMapper $default$mapper() {
        return new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
    }

    private static MemoryBackingBusImpl $default$memoryBusImpl() {
        return new MemoryBackingBusImpl();
    }

    private static boolean $default$allowUpdate() {
        return true;
    }

    private static int $default$maxPrefetch() {
        return 10;
    }

    private static boolean $default$requeue() {
        return true;
    }

    private static int $default$retries() {
        return 3;
    }

    private static int $default$retryThresholdMillis() {
        return 3000;
    }

    RabbitMQBackingBusImpl(@NonNull Connection connection, ObjectMapper mapper, MemoryBackingBusImpl memoryBusImpl, boolean allowUpdate, int maxPrefetch, boolean requeue, int retries, int retryThresholdMillis) {
        if (connection == null) {
            throw new NullPointerException("connection is marked non-null but is null");
        }
        this.connection = connection;
        this.mapper = mapper;
        this.memoryBusImpl = memoryBusImpl;
        this.allowUpdate = allowUpdate;
        this.maxPrefetch = maxPrefetch;
        this.requeue = requeue;
        this.retries = retries;
        this.retryThresholdMillis = retryThresholdMillis;
    }

    public static RabbitMQBackingBusImplBuilder builder() {
        return new RabbitMQBackingBusImplBuilder();
    }

    public static class RabbitMQBackingBusImplBuilder {
        private Connection connection;
        private boolean mapper$set;
        private ObjectMapper mapper$value;
        private boolean memoryBusImpl$set;
        private MemoryBackingBusImpl memoryBusImpl$value;
        private boolean allowUpdate$set;
        private boolean allowUpdate$value;
        private boolean maxPrefetch$set;
        private int maxPrefetch$value;
        private boolean requeue$set;
        private boolean requeue$value;
        private boolean retries$set;
        private int retries$value;
        private boolean retryThresholdMillis$set;
        private int retryThresholdMillis$value;

        RabbitMQBackingBusImplBuilder() {
        }

        public RabbitMQBackingBusImplBuilder connection(@NonNull Connection connection) {
            if (connection == null) {
                throw new NullPointerException("connection is marked non-null but is null");
            }
            this.connection = connection;
            return this;
        }

        public RabbitMQBackingBusImplBuilder mapper(ObjectMapper mapper) {
            this.mapper$value = mapper;
            this.mapper$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder memoryBusImpl(MemoryBackingBusImpl memoryBusImpl) {
            this.memoryBusImpl$value = memoryBusImpl;
            this.memoryBusImpl$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder allowUpdate(boolean allowUpdate) {
            this.allowUpdate$value = allowUpdate;
            this.allowUpdate$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder maxPrefetch(int maxPrefetch) {
            this.maxPrefetch$value = maxPrefetch;
            this.maxPrefetch$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder requeue(boolean requeue) {
            this.requeue$value = requeue;
            this.requeue$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder retries(int retries) {
            this.retries$value = retries;
            this.retries$set = true;
            return this;
        }

        public RabbitMQBackingBusImplBuilder retryThresholdMillis(int retryThresholdMillis) {
            this.retryThresholdMillis$value = retryThresholdMillis;
            this.retryThresholdMillis$set = true;
            return this;
        }

        public RabbitMQBackingBusImpl build() {
            ObjectMapper mapper$value = this.mapper$value;
            if (!this.mapper$set) {
                mapper$value = RabbitMQBackingBusImpl.$default$mapper();
            }
            MemoryBackingBusImpl memoryBusImpl$value = this.memoryBusImpl$value;
            if (!this.memoryBusImpl$set) {
                memoryBusImpl$value = RabbitMQBackingBusImpl.$default$memoryBusImpl();
            }
            boolean allowUpdate$value = this.allowUpdate$value;
            if (!this.allowUpdate$set) {
                allowUpdate$value = RabbitMQBackingBusImpl.$default$allowUpdate();
            }
            int maxPrefetch$value = this.maxPrefetch$value;
            if (!this.maxPrefetch$set) {
                maxPrefetch$value = RabbitMQBackingBusImpl.$default$maxPrefetch();
            }
            boolean requeue$value = this.requeue$value;
            if (!this.requeue$set) {
                requeue$value = RabbitMQBackingBusImpl.$default$requeue();
            }
            int retries$value = this.retries$value;
            if (!this.retries$set) {
                retries$value = RabbitMQBackingBusImpl.$default$retries();
            }
            int retryThresholdMillis$value = this.retryThresholdMillis$value;
            if (!this.retryThresholdMillis$set) {
                retryThresholdMillis$value = RabbitMQBackingBusImpl.$default$retryThresholdMillis();
            }
            return new RabbitMQBackingBusImpl(this.connection, mapper$value, memoryBusImpl$value, allowUpdate$value, maxPrefetch$value, requeue$value, retries$value, retryThresholdMillis$value);
        }

        public String toString() {
            return "RabbitMQBackingBusImpl.RabbitMQBackingBusImplBuilder(connection=" + this.connection + ", mapper$value=" + this.mapper$value + ", memoryBusImpl$value=" + this.memoryBusImpl$value + ", allowUpdate$value=" + this.allowUpdate$value + ", maxPrefetch$value=" + this.maxPrefetch$value + ", requeue$value=" + this.requeue$value + ", retries$value=" + this.retries$value + ", retryThresholdMillis$value=" + this.retryThresholdMillis$value + ")";
        }
    }

    protected class RabbitMQBackingBusConsumer
    extends DefaultConsumer {
        private ExecutorService executor;
        private final Class eventClass;
        private final Listener eventListener;
        private final String exchangeName;
        private final String queueName;
        private final ObjectReader reader;

        public RabbitMQBackingBusConsumer(Channel channel, Class eventClass, Listener eventListener) {
            super(channel);
            this.eventClass = eventClass;
            this.eventListener = eventListener;
            this.exchangeName = RabbitMQBackingBusImpl.this.getExcahngeName(eventClass);
            this.queueName = RabbitMQBackingBusImpl.this.getQueueName(eventListener);
            this.reader = RabbitMQBackingBusImpl.this.mapper.reader().forType(eventClass);
        }

        public void handleConsumeOk(String consumerTag) {
            log.info("Adding Consumer {} for Queue {} exchange {}", new Object[]{consumerTag, this.queueName, this.exchangeName});
            this.executor = Executors.newFixedThreadPool(RabbitMQBackingBusImpl.this.maxPrefetch, new NamedIngestorThreadFactory(String.format("queue-%s", this.queueName)));
            RabbitMQBackingBusImpl.this.memoryBusImpl.addListener(this.eventClass, this.eventListener);
            log.info("Added Consumer {} for Queue {} exchange {}", new Object[]{consumerTag, this.queueName, this.exchangeName});
        }

        public void handleCancel(String consumerTag) throws IOException {
            log.warn("Force Cancelling Consumer for Listener {} , event {}", (Object)this.queueName, (Object)this.exchangeName);
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(RabbitMQBackingBusImpl.this.retryThresholdMillis * RabbitMQBackingBusImpl.this.retries, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ex) {
                log.warn("Exception while awaiting excutor termination : {} ", (Object)ex.getMessage());
            }
            log.warn("Force Cancelled Consumer for Listener {} , event {}", (Object)this.queueName, (Object)this.exchangeName);
        }

        public void handleCancelOk(String consumerTag) {
            log.warn("Cancelling Consumer for Listener {} , event {}", (Object)this.queueName, (Object)this.exchangeName);
            if (this.getChannel().isOpen()) {
                try {
                    this.getChannel().close();
                }
                catch (IOException | TimeoutException ex) {
                    log.error("Exception while attempting to close consumer : {}", (Object)ex.getMessage());
                }
            }
            log.warn("Cancelled Consumer for Listener {} , event {}", (Object)this.queueName, (Object)this.exchangeName);
        }

        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
            log.info("Consumer for Queue(Event) Listener {} was shutdown : {}", (Object)this.queueName, (Object)sig.getMessage());
            this.executor.shutdownNow();
            if (sig.isHardError()) {
                log.warn("Consumer for Queue(Event) Listener {} was closed abnormaly : {}", (Object)this.queueName, (Object)sig.getReason());
            } else {
                log.warn("Consumer for Queue(Event) Listener {} was closed normally : {}", (Object)this.queueName, (Object)sig.getReason());
            }
            try {
                this.executor.awaitTermination(RabbitMQBackingBusImpl.this.retryThresholdMillis * RabbitMQBackingBusImpl.this.retries, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ex) {
                log.warn("Exception while awaiting excutor termination : {} ", (Object)ex.getMessage());
            }
            if (sig.isInitiatedByApplication()) {
                log.warn("Consumer for Queue(Event) Listener {} was shutdown permanently by applicaiton : {}", (Object)this.queueName, (Object)sig.getMessage());
                return;
            }
            RabbitMQBackingBusImpl.this.rebindingExecutor.schedule(() -> {
                log.warn("Attempting to rebind Consumer for Queue(Event) Listener {}", (Object)this.queueName);
                RabbitMQBackingBusImpl.this.doAddListener(this.eventClass, this.eventListener, 1, RabbitMQBackingBusImpl.this.retries);
            }, (long)RabbitMQBackingBusImpl.this.retryThresholdMillis, TimeUnit.MILLISECONDS);
        }

        public void handleDelivery(String string, Envelope envlp, AMQP.BasicProperties bp, byte[] bytes) throws IOException {
            byte[] body = bytes;
            long deliveryTag = envlp.getDeliveryTag();
            try {
                this.executor.submit(() -> {
                    Object receivedEvent;
                    boolean doAck = false;
                    log.trace("Received Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, deliveryTag});
                    try {
                        receivedEvent = this.reader.readValue(body);
                    }
                    catch (Throwable ex) {
                        log.warn("Error Decoding message from Exchange {}, class {} : {} ", new Object[]{this.exchangeName, this.eventClass, ex.getMessage()});
                        doAck = true;
                        receivedEvent = null;
                    }
                    if (receivedEvent != null) {
                        try {
                            RabbitMQBackingBusImpl.this.memoryBusImpl.post(receivedEvent);
                            doAck = true;
                            receivedEvent = null;
                        }
                        catch (Throwable ex) {
                            try {
                                log.warn("Failure when processing event of type {}, Listener {} : {}", new Object[]{this.eventClass, this.eventListener, ex.getMessage()});
                                doAck = false;
                                receivedEvent = null;
                            }
                            catch (Throwable throwable) {
                                receivedEvent = null;
                                log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, deliveryTag});
                                throw throwable;
                            }
                            log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, deliveryTag});
                        }
                        log.trace("Finished Receiving Message from Exchange {} Queue {} with Delivery Tag {}", new Object[]{this.exchangeName, this.queueName, deliveryTag});
                    }
                    try {
                        if (doAck) {
                            this.getChannel().basicAck(deliveryTag, false);
                        } else {
                            this.getChannel().basicNack(deliveryTag, false, RabbitMQBackingBusImpl.this.requeue);
                        }
                    }
                    catch (IOException ex) {
                        log.error("RabbitMQ Exception when processing Message from Exchange {} Queue {} with Delivery Tag {} : {}", new Object[]{this.exchangeName, this.queueName, deliveryTag, ex.getMessage()});
                    }
                    catch (Throwable ex) {
                        log.error("Exception when processing Message from Exchange {} Queue {} with Delivery Tag {} : {}", new Object[]{this.exchangeName, this.queueName, deliveryTag, ex.getMessage()});
                    }
                });
            }
            catch (RejectedExecutionException ex) {
                log.warn("Could not schedule message for processing : {}", (Object)ex.getMessage());
                this.getChannel().basicNack(deliveryTag, false, true);
            }
        }
    }

    protected static class NamedIngestorThreadFactory
    implements ThreadFactory {
        private AtomicInteger threadNumber = new AtomicInteger(1);
        private static AtomicInteger poolNumber = new AtomicInteger(1);
        private final String namePrefix;
        private final int pool;

        public NamedIngestorThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
            this.pool = poolNumber.getAndIncrement();
        }

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, String.format("%s pool-%s ingestor-%s", this.namePrefix, this.pool, this.threadNumber.getAndIncrement()));
        }
    }
}

