package reactor.rabbitmq;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:reactor/rabbitmq/Receiver.class */
public class Receiver implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
    private static final Function<Connection, Channel> CHANNEL_CREATION_FUNCTION = new ChannelCreationFunction();
    private final Mono<? extends Connection> connectionMono;
    private final AtomicReference<Connection> connection;
    private final Scheduler connectionSubscriptionScheduler;
    private final boolean privateConnectionSubscriptionScheduler;
    private final int connectionClosingTimeout;
    private final AtomicBoolean closingOrClosed;

    /* loaded from: input_file:reactor/rabbitmq/Receiver$AcknowledgmentContext.class */
    public static class AcknowledgmentContext {
        private final AcknowledgableDelivery delivery;
        private final Consumer<AcknowledgableDelivery> consumer;

        public AcknowledgmentContext(AcknowledgableDelivery acknowledgableDelivery, Consumer<AcknowledgableDelivery> consumer) {
            this.delivery = acknowledgableDelivery;
            this.consumer = consumer;
        }

        public void ackOrNack() {
            this.consumer.accept(this.delivery);
        }
    }

    /* loaded from: input_file:reactor/rabbitmq/Receiver$ChannelCreationFunction.class */
    private static class ChannelCreationFunction implements Function<Connection, Channel> {
        private ChannelCreationFunction() {
        }

        @Override // java.util.function.Function
        public Channel apply(Connection connection) {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new RabbitFluxException("Error while creating channel", e);
            }
        }
    }

    public Receiver() {
        this(new ReceiverOptions());
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Receiver(ReceiverOptions receiverOptions) {
        this.connection = new AtomicReference<>();
        this.closingOrClosed = new AtomicBoolean(false);
        this.privateConnectionSubscriptionScheduler = receiverOptions.getConnectionSubscriptionScheduler() == null;
        this.connectionSubscriptionScheduler = receiverOptions.getConnectionSubscriptionScheduler() == null ? createScheduler("rabbitmq-receiver-connection-subscription") : receiverOptions.getConnectionSubscriptionScheduler();
        this.connectionMono = receiverOptions.getConnectionMono() == null ? receiverOptions.getConnectionMonoConfigurator().apply(Mono.fromCallable(() -> {
            return receiverOptions.getConnectionSupplier() == null ? receiverOptions.getConnectionFactory().newConnection() : receiverOptions.getConnectionSupplier().apply(null);
        })).doOnNext(connection -> {
            this.connection.set(connection);
        }).subscribeOn(this.connectionSubscriptionScheduler).transform(this::cache) : receiverOptions.getConnectionMono();
        if (receiverOptions.getConnectionClosingTimeout() == null || Duration.ZERO.equals(receiverOptions.getConnectionClosingTimeout())) {
            this.connectionClosingTimeout = -1;
        } else {
            this.connectionClosingTimeout = (int) receiverOptions.getConnectionClosingTimeout().toMillis();
        }
    }

    protected <T> Mono<T> cache(Mono<T> mono) {
        return Utils.cache(mono);
    }

    protected Scheduler createScheduler(String str) {
        return Schedulers.newBoundedElastic(Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, str);
    }

    public Flux<Delivery> consumeNoAck(String str) {
        return consumeNoAck(str, new ConsumeOptions());
    }

    public Flux<Delivery> consumeNoAck(String str, ConsumeOptions consumeOptions) {
        return Flux.create(fluxSink -> {
            Mono<R> map = this.connectionMono.map(CHANNEL_CREATION_FUNCTION);
            Consumer consumer = channel -> {
                try {
                    if (consumeOptions.getChannelCallback() != null) {
                        consumeOptions.getChannelCallback().accept(channel);
                    }
                    DeliverCallback deliverCallback = (str2, delivery) -> {
                        fluxSink.next(delivery);
                        if (consumeOptions.getStopConsumingBiFunction().apply(Long.valueOf(fluxSink.requestedFromDownstream()), delivery).booleanValue()) {
                            fluxSink.complete();
                        }
                    };
                    AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                    CancelCallback cancelCallback = str3 -> {
                        LOGGER.info("Flux consumer {} has been cancelled", str3);
                        atomicBoolean.set(false);
                        fluxSink.complete();
                    };
                    completeOnChannelShutdown(channel, fluxSink);
                    String basicConsume = channel.basicConsume(str, true, consumeOptions.getConsumerTag(), deliverCallback, cancelCallback);
                    AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                    LOGGER.info("Consumer {} consuming from {} has been registered", basicConsume, str);
                    fluxSink.onDispose(() -> {
                        LOGGER.info("Cancelling consumer {} consuming from {}", basicConsume, str);
                        if (atomicBoolean2.compareAndSet(false, true)) {
                            try {
                                if (channel.isOpen() && channel.getConnection().isOpen()) {
                                    if (atomicBoolean.compareAndSet(true, false)) {
                                        channel.basicCancel(basicConsume);
                                    }
                                    channel.close();
                                }
                            } catch (IOException | TimeoutException e) {
                                LOGGER.warn("Error while closing channel: " + e.getMessage());
                            }
                        }
                    });
                } catch (Exception e) {
                    fluxSink.error(new RabbitFluxException(e));
                }
            };
            fluxSink.getClass();
            map.subscribe(consumer, fluxSink::error);
        }, consumeOptions.getOverflowStrategy());
    }

    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> fluxSink) {
        channel.addShutdownListener(shutdownSignalException -> {
            if (!isRecoverable(channel)) {
                fluxSink.complete();
            } else {
                if (AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(shutdownSignalException)) {
                    return;
                }
                fluxSink.complete();
            }
        });
    }

    public Flux<Delivery> consumeAutoAck(String str) {
        return consumeAutoAck(str, new ConsumeOptions());
    }

    public Flux<Delivery> consumeAutoAck(String str, ConsumeOptions consumeOptions) {
        return consumeManualAck(str, consumeOptions).doOnNext((v0) -> {
            v0.ack();
        }).map(acknowledgableDelivery -> {
            return acknowledgableDelivery;
        });
    }

    public Flux<AcknowledgableDelivery> consumeManualAck(String str) {
        return consumeManualAck(str, new ConsumeOptions());
    }

    public Flux<AcknowledgableDelivery> consumeManualAck(String str, ConsumeOptions consumeOptions) {
        return Flux.create(fluxSink -> {
            Mono<R> map = this.connectionMono.map(CHANNEL_CREATION_FUNCTION);
            Consumer consumer = channel -> {
                try {
                    if (consumeOptions.getChannelCallback() != null) {
                        consumeOptions.getChannelCallback().accept(channel);
                    }
                    if (consumeOptions.getQos() != 0) {
                        channel.basicQos(consumeOptions.getQos());
                    }
                    DeliverCallback deliverCallback = (str2, delivery) -> {
                        Delivery acknowledgableDelivery = new AcknowledgableDelivery(delivery, channel, consumeOptions.getExceptionHandler());
                        if (consumeOptions.getHookBeforeEmitBiFunction().apply(Long.valueOf(fluxSink.requestedFromDownstream()), acknowledgableDelivery).booleanValue()) {
                            fluxSink.next(acknowledgableDelivery);
                        }
                        if (consumeOptions.getStopConsumingBiFunction().apply(Long.valueOf(fluxSink.requestedFromDownstream()), delivery).booleanValue()) {
                            fluxSink.complete();
                        }
                    };
                    AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                    CancelCallback cancelCallback = str3 -> {
                        LOGGER.info("Flux consumer {} has been cancelled", str3);
                        atomicBoolean.set(false);
                        fluxSink.complete();
                    };
                    completeOnChannelShutdown(channel, fluxSink);
                    String basicConsume = channel.basicConsume(str, false, consumeOptions.getConsumerTag(), deliverCallback, cancelCallback);
                    AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
                    LOGGER.info("Consumer {} consuming from {} has been registered", basicConsume, str);
                    fluxSink.onDispose(() -> {
                        LOGGER.info("Cancelling consumer {} consuming from {}", basicConsume, str);
                        if (atomicBoolean2.compareAndSet(false, true)) {
                            try {
                                if (channel.isOpen() && channel.getConnection().isOpen()) {
                                    if (atomicBoolean.compareAndSet(true, false)) {
                                        channel.basicCancel(basicConsume);
                                    }
                                    channel.close();
                                }
                            } catch (IOException | TimeoutException e) {
                                LOGGER.warn("Error while closing channel: " + e.getMessage());
                            }
                        }
                    });
                } catch (Exception e) {
                    fluxSink.error(new RabbitFluxException(e));
                }
            };
            fluxSink.getClass();
            map.subscribe(consumer, fluxSink::error);
        }, consumeOptions.getOverflowStrategy());
    }

    protected boolean isRecoverable(Connection connection) {
        return Utils.isRecoverable(connection);
    }

    protected boolean isRecoverable(Channel channel) {
        return Utils.isRecoverable(channel);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closingOrClosed.compareAndSet(false, true)) {
            if (this.connection.get() != null) {
                Helpers.safelyExecute(LOGGER, () -> {
                    this.connection.get().close(this.connectionClosingTimeout);
                }, "Error while closing receiver connection");
            }
            if (this.privateConnectionSubscriptionScheduler) {
                Helpers.safelyExecute(LOGGER, () -> {
                    this.connectionSubscriptionScheduler.dispose();
                }, "Error while disposing connection subscriber scheduler");
            }
        }
    }
}
