package io.hoplin;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import io.hoplin.json.JsonCodec;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hoplin/DefaultRabbitMQClient.class */
public class DefaultRabbitMQClient implements RabbitMQClient {
    private static final Logger log = LoggerFactory.getLogger(DefaultRabbitMQClient.class);
    private RabbitMQOptions options;
    private DefaultQueueConsumer consumer;
    private ConnectionProvider provider = create();
    private Channel channel = this.provider.acquire();
    private JsonCodec codec = new JsonCodec();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/hoplin/DefaultRabbitMQClient$ThrowableChannel.class */
    public interface ThrowableChannel<T> {
        T handle(Channel channel) throws Exception;
    }

    public DefaultRabbitMQClient(RabbitMQOptions rabbitMQOptions) {
        this.options = (RabbitMQOptions) Objects.requireNonNull(rabbitMQOptions, "Options are required and can't be null");
        this.channel.addReturnListener(new UnroutableMessageReturnListener(rabbitMQOptions));
    }

    private ConnectionProvider create() {
        try {
            ConnectionProvider create = ConnectionProvider.create(this.options);
            if (create.connect()) {
                return create;
            }
            throw new IllegalStateException("Unable to connect to broker : " + this.options);
        } catch (IOException | TimeoutException e) {
            throw new HoplinRuntimeException("Unable to connect to broker", e);
        }
    }

    @Override // io.hoplin.RabbitMQClient
    public <T> void basicConsume(String str, Class<T> cls, Consumer<T> consumer) {
        basicConsume(str, QueueOptions.of(true), cls, consumer);
    }

    @Override // io.hoplin.RabbitMQClient
    public <T> void basicConsume(String str, Class<T> cls, BiConsumer<T, MessageContext> biConsumer) {
        basicConsume(str, QueueOptions.of(true), cls, biConsumer);
    }

    @Override // io.hoplin.RabbitMQClient
    public synchronized <T> void basicConsume(String str, QueueOptions queueOptions, Class<T> cls, BiConsumer<T, MessageContext> biConsumer) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(cls);
        Objects.requireNonNull(biConsumer);
        Objects.requireNonNull(queueOptions);
        try {
            if (this.consumer == null) {
                boolean isAutoAck = queueOptions.isAutoAck();
                int prefetchCount = queueOptions.getPrefetchCount();
                boolean isPublisherConfirms = queueOptions.isPublisherConfirms();
                log.info("basicConsume autoAck : {} ", Boolean.valueOf(isAutoAck));
                log.info("basicConsume prefetchCount : {} ", Integer.valueOf(prefetchCount));
                log.info("basicConsume publisherConfirms : {} ", Boolean.valueOf(isPublisherConfirms));
                if (isPublisherConfirms) {
                    this.channel.confirmSelect();
                    this.channel.addConfirmListener(this::confirmedAck, this::confirmedNack);
                }
                this.consumer = new DefaultQueueConsumer(this.channel, queueOptions);
                this.channel.basicQos(prefetchCount);
                String basicConsume = this.channel.basicConsume(str, isAutoAck, this.consumer);
                if (log.isDebugEnabled()) {
                    log.debug("Assigned consumer tag : {}", basicConsume);
                }
            }
            this.consumer.addHandler(cls, biConsumer);
        } catch (IOException e) {
            log.error("Unable to subscribe messages", e);
            throw new HoplinRuntimeException("Unable to subscribe messages", e);
        }
    }

    @Override // io.hoplin.RabbitMQClient
    public synchronized <T> void basicConsume(String str, QueueOptions queueOptions, Class<T> cls, Consumer<T> consumer) {
        basicConsume(str, queueOptions, cls, (obj, messageContext) -> {
            consumer.accept(obj);
        });
    }

    private void confirmedAck(long j, boolean z) {
        log.info("Confirmed ACK :: {}", Long.valueOf(j));
    }

    private void confirmedNack(long j, boolean z) {
        log.info("Confirmed NACK :: {}", Long.valueOf(j));
    }

    @Override // io.hoplin.RabbitMQClient
    public void exchangeDeclare(String str, String str2, boolean z, boolean z2) {
        exchangeDeclare(str, str2, z, z2, Collections.emptyMap());
    }

    @Override // io.hoplin.RabbitMQClient
    public void exchangeDeclare(String str, String str2, boolean z, boolean z2, Map<String, Object> map) {
        with(channel -> {
            channel.exchangeDeclare(str, str2, z, z2, map);
            return null;
        });
    }

    @Override // io.hoplin.RabbitMQClient
    public void queueDeclare(String str, boolean z, boolean z2, boolean z3) {
        queueDeclare(str, z, z2, z3, Collections.emptyMap());
    }

    @Override // io.hoplin.RabbitMQClient
    public AMQP.Queue.DeclareOk queueDeclare(String str, boolean z, boolean z2, boolean z3, Map<String, Object> map) {
        return (AMQP.Queue.DeclareOk) with(channel -> {
            return channel.queueDeclare(str, z, z2, z3, map);
        });
    }

    @Override // io.hoplin.RabbitMQClient
    public void queueBind(String str, String str2, String str3) {
        with(channel -> {
            channel.queueBind(str, str2, str3);
            return null;
        });
    }

    @Override // io.hoplin.RabbitMQClient
    public String queueDeclareTemporary() {
        return (String) with(channel -> {
            return channel.queueDeclare().getQueue();
        });
    }

    @Override // io.hoplin.RabbitMQClient
    public void disconnect() throws IOException {
        if (this.provider != null) {
            this.provider.disconnect();
        }
    }

    private <T> T with(ThrowableChannel<T> throwableChannel) {
        try {
            return throwableChannel.handle(this.channel);
        } catch (Exception e) {
            log.error("Unable to execute operation on  channel", e);
            return null;
        }
    }

    private void logReceived(Object obj) {
        if (obj == null) {
            log.debug("Received no message");
        } else if (log.isDebugEnabled()) {
            log.debug("Received: {}", obj);
        }
    }

    @Override // io.hoplin.RabbitMQClient
    public boolean isConnected() {
        return false;
    }

    @Override // io.hoplin.RabbitMQClient
    public boolean isOpenChannel() {
        return false;
    }

    @Override // io.hoplin.RabbitMQClient
    public int messageCount(String str) {
        try {
            return messageCountAsync(str).get().intValue();
        } catch (InterruptedException | ExecutionException e) {
            log.error("Unable to get message count", e);
            return -1;
        }
    }

    @Override // io.hoplin.RabbitMQClient
    public CompletableFuture<Integer> messageCountAsync(String str) {
        return null;
    }

    @Override // io.hoplin.RabbitMQClient
    public <T> void basicPublish(String str, String str2, T t) {
        basicPublish(str, str2, t, Collections.emptyMap());
    }

    @Override // io.hoplin.RabbitMQClient
    public <T> void basicPublish(String str, String str2, T t, Map<String, Object> map) {
        try {
            String uuid = UUID.randomUUID().toString();
            AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().contentType("text/json").contentEncoding("UTF-8").messageId(uuid).deliveryMode(2).headers(map).build();
            log.info("Publishing [exchange, routingKey, id] : {}, {}, {}", new Object[]{str, str2, uuid});
            this.channel.basicPublish(str, str2, build, this.codec.serialize(t));
        } catch (IOException e) {
            throw new HoplinRuntimeException("Unable to publish message", e);
        }
    }

    @Override // io.hoplin.RabbitMQClient
    public void basicAck(long j, boolean z) {
    }

    @Override // io.hoplin.RabbitMQClient
    public Channel channel() {
        return this.provider.acquire();
    }
}
