package org.lorislab.quarkus.reactive.jms.tx;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Destination;
import javax.jms.IllegalStateRuntimeException;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.Topic;
import javax.json.bind.Jsonb;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.lorislab.quarkus.reactive.jms.tx.OutgoingJmsTxMessageMetadata;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/lorislab/quarkus/reactive/jms/tx/JmsTxSource.class */
class JmsTxSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(JmsTxSource.class);
    private final PublisherBuilder<IncomingJmsTxMessage<?>> source;
    private final JmsPublisher publisher;

    /* loaded from: input_file:org/lorislab/quarkus/reactive/jms/tx/JmsTxSource$JmsPublisher.class */
    private static class JmsPublisher implements Publisher<Message>, Subscription {
        private final JMSConsumer consumer;
        private boolean unbounded;
        private final AtomicLong requests = new AtomicLong();
        private final AtomicReference<Subscriber<? super Message>> downstream = new AtomicReference<>();
        private final ExecutorService executor = Executors.newSingleThreadExecutor();

        private JmsPublisher(JMSConsumer jMSConsumer) {
            this.consumer = jMSConsumer;
        }

        void close() {
            Subscriber<? super Message> andSet = this.downstream.getAndSet(null);
            if (andSet != null) {
                andSet.onComplete();
            }
            this.consumer.close();
            this.executor.shutdown();
        }

        public void subscribe(Subscriber<? super Message> subscriber) {
            if (this.downstream.compareAndSet(null, subscriber)) {
                subscriber.onSubscribe(this);
            } else {
                Subscriptions.fail(subscriber, new IllegalStateException("There is already a subscriber"));
            }
        }

        public void request(long j) {
            if (j <= 0 || this.unbounded) {
                return;
            }
            if (add(j) != Long.MAX_VALUE) {
                enqueue(j);
            } else {
                this.unbounded = true;
                startUnboundedReception();
            }
        }

        private void enqueue(long j) {
            for (int i = 0; i < j; i++) {
                this.executor.execute(() -> {
                    try {
                        Message receive = this.consumer.receive();
                        if (receive != null) {
                            this.requests.decrementAndGet();
                            this.downstream.get().onNext(receive);
                        }
                    } catch (IllegalStateRuntimeException e) {
                        JmsTxSource.LOGGER.warn("Unable to receive JMS messages - client has been closed");
                    }
                });
            }
        }

        private void startUnboundedReception() {
            this.consumer.setMessageListener(message -> {
                this.downstream.get().onNext(message);
            });
        }

        public void cancel() {
            close();
        }

        long add(long j) {
            long j2;
            long j3;
            do {
                j2 = this.requests.get();
                if (j2 == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
                long j4 = j2 + j;
                j3 = j4 < 0 ? Long.MAX_VALUE : j4;
            } while (!this.requests.compareAndSet(j2, j3));
            return j3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public JmsTxSource(JMSContext jMSContext, JmsTxConnectorIncomingConfiguration jmsTxConnectorIncomingConfiguration, Jsonb jsonb, Executor executor) {
        JMSConsumer createConsumer;
        Optional<String> destination = jmsTxConnectorIncomingConfiguration.getDestination();
        Objects.requireNonNull(jmsTxConnectorIncomingConfiguration);
        String orElseGet = destination.orElseGet(jmsTxConnectorIncomingConfiguration::getChannel);
        String orElse = jmsTxConnectorIncomingConfiguration.getSelector().orElse(null);
        boolean booleanValue = jmsTxConnectorIncomingConfiguration.getNoLocal().booleanValue();
        boolean booleanValue2 = jmsTxConnectorIncomingConfiguration.getBroadcast().booleanValue();
        boolean booleanValue3 = jmsTxConnectorIncomingConfiguration.getDurable().booleanValue();
        Topic destination2 = getDestination(jMSContext, orElseGet, jmsTxConnectorIncomingConfiguration);
        if (!booleanValue3) {
            createConsumer = jMSContext.createConsumer(destination2, orElse, booleanValue);
        } else {
            if (!(destination2 instanceof Topic)) {
                throw new IllegalArgumentException("Invalid destination, only topic can be durable");
            }
            createConsumer = jMSContext.createDurableConsumer(destination2, orElseGet, orElse, booleanValue);
        }
        this.publisher = new JmsPublisher(createConsumer);
        if (booleanValue2) {
            this.source = ReactiveStreams.fromPublisher(Multi.createFrom().publisher(this.publisher).map(message -> {
                return new IncomingJmsTxMessage(jMSContext, message, executor, jsonb);
            }).broadcast().toAllSubscribers());
        } else {
            this.source = ReactiveStreams.fromPublisher(this.publisher).map(message2 -> {
                return new IncomingJmsTxMessage(jMSContext, message2, executor, jsonb);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.publisher.close();
    }

    private Destination getDestination(JMSContext jMSContext, String str, JmsTxConnectorIncomingConfiguration jmsTxConnectorIncomingConfiguration) {
        String destinationType = jmsTxConnectorIncomingConfiguration.getDestinationType();
        String lowerCase = destinationType.toLowerCase();
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case 107944209:
                if (lowerCase.equals(OutgoingJmsTxMessageMetadata.OutputJmsTxMessageMetadataBuilder.QUEUE)) {
                    z = false;
                    break;
                }
                break;
            case 110546223:
                if (lowerCase.equals(OutgoingJmsTxMessageMetadata.OutputJmsTxMessageMetadataBuilder.TOPIC)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                LOGGER.info("Creating queue {}", str);
                return jMSContext.createQueue(str);
            case true:
                LOGGER.info("Creating topic {}", str);
                return jMSContext.createTopic(str);
            default:
                throw new IllegalArgumentException("Unknown destination type: " + destinationType);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublisherBuilder<IncomingJmsTxMessage<?>> getSource() {
        return this.source;
    }
}
