package dev.aherscu.qa.jgiven.rabbitmq.utils;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.impl.ForgivingExceptionHandler;
import dev.aherscu.qa.jgiven.rabbitmq.model.Message;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jooq.lambda.Unchecked;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dev/aherscu/qa/jgiven/rabbitmq/utils/QueueHandler.class */
public class QueueHandler<K, V> implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(QueueHandler.class);
    public final Channel channel;
    public final String queue;
    public final Function<Message<V>, K> indexingBy;
    public final Function<byte[], V> consumingBy;
    public final Function<V, byte[]> publishingBy;
    private final ConcurrentMap<K, Message<V>> recievedMessages = new ConcurrentHashMap();
    private String consumerTag;

    /* loaded from: input_file:dev/aherscu/qa/jgiven/rabbitmq/utils/QueueHandler$QueueHandlerBuilder.class */
    public static abstract class QueueHandlerBuilder<K, V, C extends QueueHandler<K, V>, B extends QueueHandlerBuilder<K, V, C, B>> {
        private Channel channel;
        private String queue;
        private Function<Message<V>, K> indexingBy;
        private Function<byte[], V> consumingBy;
        private Function<V, byte[]> publishingBy;
        private String consumerTag;

        protected abstract B self();

        public abstract C build();

        public B channel(Channel channel) {
            this.channel = channel;
            return self();
        }

        public B queue(String str) {
            this.queue = str;
            return self();
        }

        public B indexingBy(Function<Message<V>, K> function) {
            this.indexingBy = function;
            return self();
        }

        public B consumingBy(Function<byte[], V> function) {
            this.consumingBy = function;
            return self();
        }

        public B publishingBy(Function<V, byte[]> function) {
            this.publishingBy = function;
            return self();
        }

        public B consumerTag(String str) {
            this.consumerTag = str;
            return self();
        }

        public String toString() {
            return "QueueHandler.QueueHandlerBuilder(channel=" + this.channel + ", queue=" + this.queue + ", indexingBy=" + this.indexingBy + ", consumingBy=" + this.consumingBy + ", publishingBy=" + this.publishingBy + ", consumerTag=" + this.consumerTag + ")";
        }
    }

    /* loaded from: input_file:dev/aherscu/qa/jgiven/rabbitmq/utils/QueueHandler$QueueHandlerBuilderImpl.class */
    private static final class QueueHandlerBuilderImpl<K, V> extends QueueHandlerBuilder<K, V, QueueHandler<K, V>, QueueHandlerBuilderImpl<K, V>> {
        private QueueHandlerBuilderImpl() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // dev.aherscu.qa.jgiven.rabbitmq.utils.QueueHandler.QueueHandlerBuilder
        public QueueHandlerBuilderImpl<K, V> self() {
            return this;
        }

        @Override // dev.aherscu.qa.jgiven.rabbitmq.utils.QueueHandler.QueueHandlerBuilder
        public QueueHandler<K, V> build() {
            return new QueueHandler<>(this);
        }
    }

    public static ConnectionFactory connectionFactoryFrom(String str) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri(str);
        connectionFactory.setExceptionHandler(new ForgivingExceptionHandler());
        return connectionFactory;
    }

    public String consume() {
        this.channel.basicQos(16);
        String basicConsume = this.channel.basicConsume(this.queue, new DefaultConsumer(this.channel) { // from class: dev.aherscu.qa.jgiven.rabbitmq.utils.QueueHandler.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                try {
                    Message<V> build = Message.builder().content(QueueHandler.this.consumingBy.apply(bArr)).properties(basicProperties).build();
                    K apply = QueueHandler.this.indexingBy.apply(build);
                    QueueHandler.log.trace("received {}", apply);
                    QueueHandler.this.recievedMessages.put(apply, build);
                    QueueHandler.this.channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    QueueHandler.log.warn("skipping unknown type {}", e.getMessage());
                    QueueHandler.this.channel.basicReject(envelope.getDeliveryTag(), true);
                }
            }
        });
        this.consumerTag = basicConsume;
        return basicConsume;
    }

    public String cancel() throws IOException {
        if (Objects.isNull(this.consumerTag)) {
            throw new IOException("consumer not started");
        }
        log.debug("cancelling {}", this.consumerTag);
        this.channel.basicCancel(this.consumerTag);
        return this.consumerTag;
    }

    public Map<K, Message<V>> recievedMessages() {
        return Collections.unmodifiableMap(this.recievedMessages);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            cancel();
        } catch (Exception e) {
            log.error("while closing got {}", e.getMessage());
            throw e;
        }
    }

    public void publish(Stream<Message<V>> stream) {
        ((Stream) stream.parallel()).forEach(Unchecked.consumer(message -> {
            this.channel.basicPublish("", this.queue, message.properties, (byte[]) this.publishingBy.apply(message.content));
        }));
    }

    public void publishValues(Stream<V> stream) {
        publish(stream.map(obj -> {
            return Message.builder().content(obj).build();
        }));
    }

    protected QueueHandler(QueueHandlerBuilder<K, V, ?, ?> queueHandlerBuilder) {
        this.channel = ((QueueHandlerBuilder) queueHandlerBuilder).channel;
        this.queue = ((QueueHandlerBuilder) queueHandlerBuilder).queue;
        this.indexingBy = ((QueueHandlerBuilder) queueHandlerBuilder).indexingBy;
        this.consumingBy = ((QueueHandlerBuilder) queueHandlerBuilder).consumingBy;
        this.publishingBy = ((QueueHandlerBuilder) queueHandlerBuilder).publishingBy;
        this.consumerTag = ((QueueHandlerBuilder) queueHandlerBuilder).consumerTag;
    }

    public static <K, V> QueueHandlerBuilder<K, V, ?, ?> builder() {
        return new QueueHandlerBuilderImpl();
    }
}
