/*
 * Decompiled with CFR 0.152.
 */
package dev.aherscu.qa.jgiven.rabbitmq.utils;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.impl.ForgivingExceptionHandler;
import dev.aherscu.qa.jgiven.rabbitmq.model.Message;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jooq.lambda.Unchecked;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueHandler<K, V>
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(QueueHandler.class);
    public final Channel channel;
    public final String queue;
    public final Function<Message<V>, K> indexingBy;
    public final Function<byte[], V> consumingBy;
    public final Function<V, byte[]> publishingBy;
    private final ConcurrentMap<K, Message<V>> recievedMessages = new ConcurrentHashMap<K, Message<V>>();
    private String consumerTag;

    public static ConnectionFactory connectionFactoryFrom(String uri) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(uri);
        connectionFactory.setExceptionHandler((ExceptionHandler)new ForgivingExceptionHandler());
        return connectionFactory;
    }

    public String consume() {
        this.channel.basicQos(16);
        this.consumerTag = this.channel.basicConsume(this.queue, (Consumer)new DefaultConsumer(this.channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                Object key;
                Message message;
                try {
                    message = Message.builder().content(QueueHandler.this.consumingBy.apply(body)).properties(properties).build();
                    key = QueueHandler.this.indexingBy.apply(message);
                    log.trace("received {}", key);
                }
                catch (Exception e) {
                    log.warn("skipping unknown type {}", (Object)e.getMessage());
                    QueueHandler.this.channel.basicReject(envelope.getDeliveryTag(), true);
                    return;
                }
                QueueHandler.this.recievedMessages.put(key, message);
                QueueHandler.this.channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
        return this.consumerTag;
    }

    public String cancel() throws IOException {
        if (Objects.isNull(this.consumerTag)) {
            throw new IOException("consumer not started");
        }
        log.debug("cancelling {}", (Object)this.consumerTag);
        this.channel.basicCancel(this.consumerTag);
        return this.consumerTag;
    }

    public Map<K, Message<V>> recievedMessages() {
        return Collections.unmodifiableMap(this.recievedMessages);
    }

    @Override
    public void close() throws IOException {
        try {
            this.cancel();
        }
        catch (Exception e) {
            log.error("while closing got {}", (Object)e.getMessage());
            throw e;
        }
    }

    public void publish(Stream<Message<V>> messages) {
        ((Stream)messages.parallel()).forEach(Unchecked.consumer(message -> this.channel.basicPublish("", this.queue, message.properties, this.publishingBy.apply(message.content))));
    }

    public void publishValues(Stream<V> values) {
        this.publish(values.map(value -> Message.builder().content(value).build()));
    }

    protected QueueHandler(QueueHandlerBuilder<K, V, ?, ?> b) {
        this.channel = ((QueueHandlerBuilder)b).channel;
        this.queue = ((QueueHandlerBuilder)b).queue;
        this.indexingBy = ((QueueHandlerBuilder)b).indexingBy;
        this.consumingBy = ((QueueHandlerBuilder)b).consumingBy;
        this.publishingBy = ((QueueHandlerBuilder)b).publishingBy;
        this.consumerTag = ((QueueHandlerBuilder)b).consumerTag;
    }

    public static <K, V> QueueHandlerBuilder<K, V, ?, ?> builder() {
        return new QueueHandlerBuilderImpl();
    }

    private static final class QueueHandlerBuilderImpl<K, V>
    extends QueueHandlerBuilder<K, V, QueueHandler<K, V>, QueueHandlerBuilderImpl<K, V>> {
        private QueueHandlerBuilderImpl() {
        }

        @Override
        protected QueueHandlerBuilderImpl<K, V> self() {
            return this;
        }

        @Override
        public QueueHandler<K, V> build() {
            return new QueueHandler(this);
        }
    }

    public static abstract class QueueHandlerBuilder<K, V, C extends QueueHandler<K, V>, B extends QueueHandlerBuilder<K, V, C, B>> {
        private Channel channel;
        private String queue;
        private Function<Message<V>, K> indexingBy;
        private Function<byte[], V> consumingBy;
        private Function<V, byte[]> publishingBy;
        private String consumerTag;

        public B channel(Channel channel) {
            this.channel = channel;
            return this.self();
        }

        public B queue(String queue) {
            this.queue = queue;
            return this.self();
        }

        public B indexingBy(Function<Message<V>, K> indexingBy) {
            this.indexingBy = indexingBy;
            return this.self();
        }

        public B consumingBy(Function<byte[], V> consumingBy) {
            this.consumingBy = consumingBy;
            return this.self();
        }

        public B publishingBy(Function<V, byte[]> publishingBy) {
            this.publishingBy = publishingBy;
            return this.self();
        }

        public B consumerTag(String consumerTag) {
            this.consumerTag = consumerTag;
            return this.self();
        }

        protected abstract B self();

        public abstract C build();

        public String toString() {
            return "QueueHandler.QueueHandlerBuilder(channel=" + this.channel + ", queue=" + this.queue + ", indexingBy=" + this.indexingBy + ", consumingBy=" + this.consumingBy + ", publishingBy=" + this.publishingBy + ", consumerTag=" + this.consumerTag + ")";
        }
    }
}

