package io.adamantic.quicknote.amqp;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import io.adamantic.quicknote.Quicknote;
import io.adamantic.quicknote.QuicknoteConfig;
import io.adamantic.quicknote.Receiver;
import io.adamantic.quicknote.types.Message;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.apache.commons.configuration2.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/adamantic/quicknote/amqp/AmqpReceiver.class */
public class AmqpReceiver extends AmqpBaseChannel implements Receiver {
    private static final Logger log = LoggerFactory.getLogger(AmqpReceiver.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpReceiver(String str, AmqpConnector amqpConnector, QuicknoteConfig quicknoteConfig) {
        super(str, amqpConnector, quicknoteConfig);
    }

    @Override // io.adamantic.quicknote.amqp.AmqpBaseChannel
    protected Configuration locateOwnConfig() {
        return this.config.configForReceiver(this.name);
    }

    @Override // io.adamantic.quicknote.amqp.AmqpBaseChannel
    protected void ownInitialize() {
    }

    public void subscribe(String str, Flow.Subscriber<? super Message> subscriber) {
        DeliverCallback deliverCallback = (str2, delivery) -> {
            if (log.isTraceEnabled()) {
                log.trace("Received message: {}", delivery);
            }
            subscriber.onNext(buildMessage(delivery));
        };
        CancelCallback cancelCallback = str3 -> {
            log.info("Subscription cancelled: {}", str3);
            subscriber.onComplete();
        };
        try {
            String str4 = this.destType;
            boolean z = -1;
            switch (str4.hashCode()) {
                case 107944209:
                    if (str4.equals(AmqpBaseChannel.DEFAULT_DEST_TYPE)) {
                        z = false;
                        break;
                    }
                    break;
                case 110546223:
                    if (str4.equals("topic")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    subscribeToQueue(subscriber, deliverCallback, cancelCallback);
                    break;
                case AmqpBaseChannel.DEFAULT_DEST_DURABLE /* 1 */:
                    subscribeToTopic(str, subscriber, deliverCallback, cancelCallback);
                    break;
            }
        } catch (IOException e) {
            log.error("Error subscribing to {}", this.destName, e);
            subscriber.onError(e);
        }
    }

    public void subscribe(Flow.Subscriber<? super Message> subscriber) {
        subscribe("#", subscriber);
    }

    private void subscribeToQueue(Flow.Subscriber<? super Message> subscriber, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        Channel channel = getChannel();
        channel.queueDeclare(this.destName, this.durable, false, false, (Map) null);
        channel.basicConsume(this.destName, true, consumerTag(), deliverCallback, cancelCallback);
        log.info("Subscribed to queue {}", this.destName);
    }

    private void subscribeToTopic(String str, Flow.Subscriber<? super Message> subscriber, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        Channel channel = getChannel();
        channel.exchangeDeclare(this.destName, "topic", this.durable);
        String queue = channel.queueDeclare().getQueue();
        log.debug("Binding queue {} to exchange {} with routing key {}", new Object[]{queue, this.destName, str});
        channel.queueBind(queue, this.destName, str);
        channel.basicConsume(queue, true, consumerTag(), deliverCallback, cancelCallback);
        log.info("Subscribed to topic {} with routing key {}", this.destName, str);
    }

    private static Message buildMessage(Delivery delivery) {
        long nextId;
        try {
            nextId = Long.parseLong(delivery.getProperties().getMessageId());
        } catch (NumberFormatException e) {
            nextId = Message.nextId();
            log.warn("Message ID is not a number: {} - assigning own system ID: {}", delivery.getProperties().getMessageId(), Long.valueOf(nextId));
        }
        return new Message(nextId).headers(delivery.getProperties().getHeaders() != null ? (Map) delivery.getProperties().getHeaders().entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return entry.getValue().toString();
        })) : new HashMap()).contentType(delivery.getProperties().getContentType()).payload(delivery.getBody()).routing(delivery.getEnvelope().getRoutingKey());
    }

    private String consumerTag() {
        return "qn-java-cns-" + this.name + "-" + Quicknote.clientId();
    }
}
