package io.atleon.rabbitmq;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import java.io.Closeable;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.rabbitmq.CorrelableOutboundMessage;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

/* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQSender.class */
public class AloRabbitMQSender<T> implements Closeable {
    public static final String CONFIG_PREFIX = "rabbitmq.sender.";
    public static final String BODY_SERIALIZER_CONFIG = "rabbitmq.sender.body.serializer";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloRabbitMQSender.class);
    private static final SendOptions SEND_OPTIONS = new SendOptions();
    private static final SendOptions ALO_SEND_OPTIONS = new SendOptions().exceptionHandler(AloRabbitMQSender::handleAloSendException);
    private final Mono<SendResources<T>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQSender$SendResources.class */
    public static final class SendResources<T> {
        private final Sender sender;
        private final BodySerializer<T> bodySerializer;

        public SendResources(Sender sender, BodySerializer<T> bodySerializer) {
            this.sender = sender;
            this.bodySerializer = bodySerializer;
        }

        public static <T> SendResources<T> fromConfig(RabbitMQConfig rabbitMQConfig) {
            return new SendResources<>(new Sender(new SenderOptions().connectionFactory(rabbitMQConfig.getConnectionFactory())), (BodySerializer) rabbitMQConfig.loadConfiguredOrThrow(AloRabbitMQSender.BODY_SERIALIZER_CONFIG));
        }

        public <R> Flux<RabbitMQSenderResult<R>> send(Publisher<R> publisher, Function<R, RabbitMQMessage<T>> function) {
            return Flux.from(publisher).map(obj -> {
                return toOutboundMessage(obj, function);
            }).transform(flux -> {
                return this.sender.sendWithTypedPublishConfirms(flux, AloRabbitMQSender.SEND_OPTIONS);
            }).map(RabbitMQSenderResult::fromMessageResult);
        }

        public <R> Flux<Alo<RabbitMQSenderResult<R>>> sendAlos(Publisher<Alo<R>> publisher, Function<R, RabbitMQMessage<T>> function) {
            return AloFlux.toFlux(publisher).handle(newAloEmitter(function.compose((v0) -> {
                return v0.get();
            }))).transform(flux -> {
                return this.sender.sendWithTypedPublishConfirms(flux, AloRabbitMQSender.ALO_SEND_OPTIONS);
            }).map(RabbitMQSenderResult::fromMessageResultOfAlo);
        }

        public void close() {
            this.sender.close();
        }

        private <R> BiConsumer<Alo<R>, SynchronousSink<CorrelableOutboundMessage<Alo<R>>>> newAloEmitter(Function<Alo<R>, RabbitMQMessage<T>> function) {
            return (alo, synchronousSink) -> {
                alo.runInContext(() -> {
                    synchronousSink.next(toOutboundMessage(alo, function));
                });
            };
        }

        private <R> CorrelableOutboundMessage<R> toOutboundMessage(R r, Function<R, RabbitMQMessage<T>> function) {
            RabbitMQMessage<T> apply = function.apply(r);
            return new CorrelableOutboundMessage<>(apply.getExchange(), apply.getRoutingKey(), apply.getProperties(), this.bodySerializer.serialize(apply.getBody()).bytes(), r);
        }
    }

    private AloRabbitMQSender(RabbitMQConfigSource rabbitMQConfigSource) {
        this.futureResources = ((Mono) rabbitMQConfigSource.create()).map(SendResources::fromConfig).cacheInvalidateWhen(sendResources -> {
            return this.closeSink.asFlux().next().then();
        }, (v0) -> {
            v0.close();
        });
    }

    public static <T> AloRabbitMQSender<T> from(RabbitMQConfigSource rabbitMQConfigSource) {
        return new AloRabbitMQSender<>(rabbitMQConfigSource);
    }

    public Function<Publisher<T>, Flux<RabbitMQSenderResult<T>>> sendBodies(RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return publisher -> {
            return sendBodies(publisher, rabbitMQMessageCreator);
        };
    }

    public Flux<RabbitMQSenderResult<T>> sendBodies(Publisher<T> publisher, RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendResources.send(publisher, rabbitMQMessageCreator);
        });
    }

    public Mono<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessage(RabbitMQMessage<T> rabbitMQMessage) {
        return sendMessages(Flux.just(rabbitMQMessage)).next();
    }

    public Flux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessages(Publisher<RabbitMQMessage<T>> publisher) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendResources.send(publisher, Function.identity());
        });
    }

    public Function<Publisher<Alo<T>>, AloFlux<RabbitMQSenderResult<T>>> sendAloBodies(RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return publisher -> {
            return sendAloBodies(publisher, rabbitMQMessageCreator);
        };
    }

    public AloFlux<RabbitMQSenderResult<T>> sendAloBodies(Publisher<Alo<T>> publisher, RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return (AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendResources.sendAlos(publisher, rabbitMQMessageCreator);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public AloFlux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendAloMessages(Publisher<Alo<RabbitMQMessage<T>>> publisher) {
        return (AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendResources.sendAlos(publisher, Function.identity());
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public void close(Object obj) {
        LOGGER.info("Closing AloRabbitMQSender due to reason={}", obj);
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeSink.tryEmitNext(Long.valueOf(System.currentTimeMillis()));
    }

    private static void handleAloSendException(Sender.SendContext sendContext, Exception exc) {
        Alo.nacknowledge((Alo) Alo.class.cast(((CorrelableOutboundMessage) CorrelableOutboundMessage.class.cast(sendContext.getMessage())).getCorrelationMetadata()), exc);
    }
}
