package io.micronaut.rabbitmq.reactive;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.annotation.Internal;
import io.micronaut.messaging.exceptions.MessagingClientException;
import io.micronaut.rabbitmq.bind.RabbitConsumerState;
import io.micronaut.rabbitmq.connect.ChannelPool;
import io.micronaut.rabbitmq.connect.RabbitConnectionFactoryConfig;
import io.micronaut.rabbitmq.exception.RabbitClientException;
import io.micronaut.rabbitmq.intercept.DefaultConsumer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@Internal
@EachBean(ChannelPool.class)
/* loaded from: input_file:io/micronaut/rabbitmq/reactive/ReactorReactivePublisher.class */
public class ReactorReactivePublisher implements ReactivePublisher {
    private final ChannelPool channelPool;
    private final RabbitConnectionFactoryConfig config;

    public ReactorReactivePublisher(@Parameter ChannelPool channelPool, @Parameter RabbitConnectionFactoryConfig rabbitConnectionFactoryConfig) {
        this.channelPool = channelPool;
        this.config = rabbitConnectionFactoryConfig;
    }

    @Override // io.micronaut.rabbitmq.reactive.ReactivePublisher
    /* renamed from: publishAndConfirm, reason: merged with bridge method [inline-methods] */
    public Mono<Void> mo71publishAndConfirm(RabbitPublishState rabbitPublishState) {
        return Mono.from(getChannel().flatMap(this::initializePublish).flatMap(channel -> {
            return publishInternal(channel, rabbitPublishState);
        }).timeout(this.config.getConfirmTimeout(), Mono.error(new RabbitClientException(String.format("Timed out waiting for publisher confirm for exchange: [%s] and routing key: [%s]", rabbitPublishState.getExchange(), rabbitPublishState.getRoutingKey())))).then());
    }

    @Override // io.micronaut.rabbitmq.reactive.ReactivePublisher
    /* renamed from: publish, reason: merged with bridge method [inline-methods] */
    public Mono<Void> mo70publish(RabbitPublishState rabbitPublishState) {
        return getChannel().flatMap(channel -> {
            return publishInternalNoConfirm(channel, rabbitPublishState);
        }).then();
    }

    @Override // io.micronaut.rabbitmq.reactive.ReactivePublisher
    /* renamed from: publishAndReply, reason: merged with bridge method [inline-methods] */
    public Flux<RabbitConsumerState> mo69publishAndReply(RabbitPublishState rabbitPublishState) {
        Flux<RabbitConsumerState> flux = getChannel().flatMap(channel -> {
            return publishRpcInternal(channel, rabbitPublishState);
        }).flux();
        Optional<Duration> timeout = this.config.getRpc().getTimeout();
        flux.getClass();
        timeout.ifPresent(flux::timeout);
        return flux;
    }

    protected Mono<Channel> getChannel() {
        return Mono.create(monoSink -> {
            try {
                monoSink.success(this.channelPool.getChannel());
            } catch (IOException e) {
                monoSink.error(new RabbitClientException("Failed to retrieve a channel from the pool", e));
            }
        });
    }

    protected Mono<Object> publishInternal(Channel channel, RabbitPublishState rabbitPublishState) {
        return Mono.create(monoSink -> {
            Disposable createListener = createListener(channel, monoSink, rabbitPublishState);
            try {
                channel.basicPublish(rabbitPublishState.getExchange(), rabbitPublishState.getRoutingKey(), rabbitPublishState.getProperties(), rabbitPublishState.getBody());
            } catch (IOException e) {
                createListener.dispose();
                monoSink.error(e);
            }
        }).doFinally(signalType -> {
            returnChannel(channel);
        });
    }

    protected Mono<RabbitConsumerState> publishRpcInternal(Channel channel, RabbitPublishState rabbitPublishState) {
        return Mono.create(monoSink -> {
            Disposable disposable = null;
            try {
                String uuid = UUID.randomUUID().toString();
                AMQP.BasicProperties build = rabbitPublishState.getProperties().builder().correlationId(uuid).build();
                disposable = createConsumer(channel, rabbitPublishState, uuid, monoSink);
                channel.basicPublish(rabbitPublishState.getExchange(), rabbitPublishState.getRoutingKey(), build, rabbitPublishState.getBody());
            } catch (IOException e) {
                if (disposable != null) {
                    disposable.dispose();
                }
                monoSink.error(new MessagingClientException("Failed to publish the message", e));
            }
        }).doFinally(signalType -> {
            returnChannel(channel);
        });
    }

    protected Mono<Object> publishInternalNoConfirm(Channel channel, RabbitPublishState rabbitPublishState) {
        return Mono.create(monoSink -> {
            try {
                channel.basicPublish(rabbitPublishState.getExchange(), rabbitPublishState.getRoutingKey(), rabbitPublishState.getProperties(), rabbitPublishState.getBody());
                monoSink.success();
            } catch (IOException e) {
                monoSink.error(new MessagingClientException("Failed to publish the message", e));
            }
        }).doFinally(signalType -> {
            returnChannel(channel);
        });
    }

    protected Mono<Channel> initializePublish(Channel channel) {
        return Mono.create(monoSink -> {
            try {
                channel.confirmSelect();
                monoSink.success(channel);
            } catch (IOException e) {
                monoSink.error(new MessagingClientException("Failed to enable publisher confirms on the channel", e));
            }
        });
    }

    protected void returnChannel(Channel channel) {
        this.channelPool.returnChannel(channel);
    }

    protected Disposable createListener(Channel channel, final MonoSink<Object> monoSink, final RabbitPublishState rabbitPublishState) {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final Consumer consumer = confirmListener -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                channel.removeConfirmListener(confirmListener);
            }
        };
        final ConfirmListener confirmListener2 = new ConfirmListener() { // from class: io.micronaut.rabbitmq.reactive.ReactorReactivePublisher.1
            public void handleAck(long j, boolean z) {
                ackNack(j, z, true);
            }

            public void handleNack(long j, boolean z) {
                ackNack(j, z, false);
            }

            private void ackNack(long j, boolean z, boolean z2) {
                if (z2) {
                    monoSink.success();
                } else {
                    monoSink.error(new RabbitClientException("Message could not be delivered to the broker", (List<RabbitPublishState>) Collections.singletonList(rabbitPublishState)));
                }
                consumer.accept(this);
            }
        };
        channel.addConfirmListener(confirmListener2);
        return new Disposable() { // from class: io.micronaut.rabbitmq.reactive.ReactorReactivePublisher.2
            public void dispose() {
                consumer.accept(confirmListener2);
            }

            public boolean isDisposed() {
                return atomicBoolean.get();
            }
        };
    }

    protected Disposable createConsumer(final Channel channel, final RabbitPublishState rabbitPublishState, final String str, final MonoSink<RabbitConsumerState> monoSink) throws IOException {
        final String replyTo = rabbitPublishState.getProperties().getReplyTo();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final Consumer consumer = str2 -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                try {
                    channel.basicCancel(str2);
                } catch (IOException e) {
                }
            }
        };
        final String basicConsume = channel.basicConsume(replyTo, true, new DefaultConsumer() { // from class: io.micronaut.rabbitmq.reactive.ReactorReactivePublisher.3
            public void handleDelivery(String str3, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                if (replyTo.equals("amq.rabbitmq.reply-to") || str.equals(basicProperties.getCorrelationId())) {
                    consumer.accept(str3);
                    monoSink.success(new RabbitConsumerState(envelope, basicProperties, bArr, channel));
                }
            }

            @Override // io.micronaut.rabbitmq.intercept.DefaultConsumer
            public void handleCancel(String str3) throws IOException {
                handleError(str3);
            }

            @Override // io.micronaut.rabbitmq.intercept.DefaultConsumer
            public void handleShutdownSignal(String str3, ShutdownSignalException shutdownSignalException) {
                handleError(str3);
            }

            private void handleError(String str3) {
                consumer.accept(str3);
                monoSink.error(new RabbitClientException("Message was not able to be received from the reply to queue. The consumer was cancelled", (List<RabbitPublishState>) Collections.singletonList(rabbitPublishState)));
            }
        });
        return new Disposable() { // from class: io.micronaut.rabbitmq.reactive.ReactorReactivePublisher.4
            public void dispose() {
                consumer.accept(basicConsume);
            }

            public boolean isDisposed() {
                return atomicBoolean.get();
            }
        };
    }
}
