package reactor.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Operators;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:reactor/rabbitmq/RpcClient.class */
public class RpcClient implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);
    private final Mono<Channel> channelMono;
    private final String exchange;
    private final String routingKey;
    private final String replyTo = "amq.rabbitmq.reply-to";
    private final AtomicBoolean consumerSetUp;
    private final ConcurrentMap<String, RpcSubscriber> subscribers;
    private final AtomicReference<String> consumerTag;
    private final Supplier<String> correlationIdSupplier;
    private final Lock channelLock;
    private final CountDownLatch consumerSetLatch;

    /* loaded from: input_file:reactor/rabbitmq/RpcClient$RpcOperator.class */
    private class RpcOperator extends MonoOperator<RpcRequest, Delivery> {
        private final Channel channel;

        protected RpcOperator(Publisher<RpcRequest> publisher, Channel channel) {
            super(Mono.from(publisher));
            this.channel = channel;
        }

        @Override // reactor.core.publisher.Mono, reactor.core.CorePublisher
        public void subscribe(CoreSubscriber<? super Delivery> coreSubscriber) {
            this.source.subscribe((CoreSubscriber) new RpcSubscriber(this.channel, coreSubscriber));
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/RpcClient$RpcRequest.class */
    public static class RpcRequest {
        private final AMQP.BasicProperties properties;
        private final byte[] body;

        public RpcRequest(@Nullable AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.properties = basicProperties;
            this.body = bArr;
        }

        public RpcRequest(byte[] bArr) {
            this(null, bArr);
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/RpcClient$RpcSubscriber.class */
    private class RpcSubscriber implements CoreSubscriber<RpcRequest> {
        private final Channel channel;
        private final Subscriber<? super Delivery> subscriber;
        private final AtomicReference<SubscriberState> state = new AtomicReference<>(SubscriberState.INIT);
        private final AtomicReference<Throwable> firstException = new AtomicReference<>();
        private final AtomicBoolean received = new AtomicBoolean(false);

        RpcSubscriber(Channel channel, Subscriber<? super Delivery> subscriber) {
            this.channel = channel;
            this.subscriber = subscriber;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (RpcClient.this.consumerSetUp.compareAndSet(false, true)) {
                DeliverCallback deliverCallback = (str, delivery) -> {
                    String correlationId = delivery.getProperties().getCorrelationId();
                    RpcSubscriber rpcSubscriber = (RpcSubscriber) RpcClient.this.subscribers.remove(correlationId);
                    if (rpcSubscriber == null) {
                        throw new RabbitFluxException("No outstanding request for correlation ID " + correlationId);
                    }
                    rpcSubscriber.subscriber.onNext(delivery);
                    rpcSubscriber.received.set(true);
                };
                try {
                    try {
                        RpcClient.this.lockChannel();
                        try {
                            RpcClient.this.consumerTag.set(this.channel.basicConsume("amq.rabbitmq.reply-to", true, deliverCallback, str2 -> {
                            }));
                            RpcClient.this.unlockChannel();
                            RpcClient.this.consumerSetLatch.countDown();
                        } catch (Throwable th) {
                            RpcClient.this.unlockChannel();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        RpcClient.this.consumerSetLatch.countDown();
                        throw th2;
                    }
                } catch (IOException e) {
                    handleError(e);
                    RpcClient.this.consumerSetLatch.countDown();
                }
            } else {
                try {
                    if (!RpcClient.this.consumerSetLatch.await(60L, TimeUnit.SECONDS)) {
                        RpcClient.LOGGER.warn("Consumer setup not finished in 60 seconds, moving on.");
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
            this.state.set(SubscriberState.ACTIVE);
            this.subscriber.onSubscribe(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(RpcRequest rpcRequest) {
            if (checkComplete(rpcRequest)) {
                return;
            }
            try {
                String str = (String) RpcClient.this.correlationIdSupplier.get();
                AMQP.BasicProperties basicProperties = rpcRequest.properties;
                AMQP.BasicProperties build = (basicProperties == null ? new AMQP.BasicProperties.Builder() : basicProperties.builder()).correlationId(str).replyTo("amq.rabbitmq.reply-to").build();
                RpcClient.this.subscribers.put(str, this);
                RpcClient.this.lockChannel();
                try {
                    this.channel.basicPublish(RpcClient.this.exchange, RpcClient.this.routingKey, build, rpcRequest.body);
                    RpcClient.this.unlockChannel();
                } catch (Throwable th) {
                    RpcClient.this.unlockChannel();
                    throw th;
                }
            } catch (IOException e) {
                handleError(e);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.COMPLETE) || this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.subscriber.onError(th);
            } else if (this.firstException.compareAndSet(null, th) && this.state.get() == SubscriberState.COMPLETE) {
                Operators.onErrorDropped(th, currentContext());
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            if (this.state.compareAndSet(SubscriberState.ACTIVE, SubscriberState.OUTBOUND_DONE) && this.received.get()) {
                maybeComplete();
            }
        }

        private void maybeComplete() {
            if (this.state.compareAndSet(SubscriberState.OUTBOUND_DONE, SubscriberState.COMPLETE)) {
                this.subscriber.onComplete();
            }
        }

        private void handleError(Exception exc) {
            RpcClient.LOGGER.error("error during RPC", exc);
            boolean checkComplete = checkComplete(exc);
            this.firstException.compareAndSet(null, exc);
            if (checkComplete) {
                return;
            }
            onError(exc);
        }

        public <T> boolean checkComplete(T t) {
            boolean z = this.state.get() == SubscriberState.COMPLETE;
            if (z && this.firstException.get() == null) {
                Operators.onNextDropped(t, currentContext());
            }
            return z;
        }
    }

    public RpcClient(Mono<Channel> mono, String str, String str2, Supplier<String> supplier) {
        this.replyTo = "amq.rabbitmq.reply-to";
        this.consumerSetUp = new AtomicBoolean(false);
        this.subscribers = new ConcurrentHashMap();
        this.consumerTag = new AtomicReference<>();
        this.channelLock = new ReentrantLock();
        this.consumerSetLatch = new CountDownLatch(1);
        this.channelMono = mono;
        this.exchange = str;
        this.routingKey = str2;
        this.correlationIdSupplier = supplier;
    }

    public RpcClient(Mono<Channel> mono, String str, String str2) {
        this(mono, str, str2, defaultCorrelationProvider());
    }

    private static final Supplier<String> defaultCorrelationProvider() {
        AtomicLong atomicLong = new AtomicLong(0L);
        return () -> {
            return String.valueOf(atomicLong.getAndIncrement());
        };
    }

    public Mono<Delivery> rpc(Publisher<RpcRequest> publisher) {
        return this.channelMono.flatMap(channel -> {
            return new RpcOperator(publisher, channel);
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.subscribers.isEmpty()) {
            LOGGER.warn("Closing RPC client with outstanding request(s): " + this.subscribers.keySet());
        }
        if (this.consumerTag.get() != null) {
            try {
                Channel block = this.channelMono.block();
                lockChannel();
                try {
                    block.basicCancel(this.consumerTag.get());
                    unlockChannel();
                } catch (Throwable th) {
                    unlockChannel();
                    throw th;
                }
            } catch (IOException e) {
                throw new RabbitFluxException(e);
            }
        }
    }

    protected void lockChannel() {
        this.channelLock.lock();
    }

    protected void unlockChannel() {
        this.channelLock.unlock();
    }
}
