package com.expediagroup.rhapsody.rabbitmq.factory;

import com.expediagroup.rhapsody.api.Acknowledgeable;
import com.expediagroup.rhapsody.rabbitmq.message.RabbitMessage;
import com.expediagroup.rhapsody.rabbitmq.message.RabbitMessageSendInterceptor;
import com.expediagroup.rhapsody.rabbitmq.serde.BodySerializer;
import com.expediagroup.rhapsody.util.ConfigLoading;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.rabbitmq.CorrelableOutboundMessage;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.OutboundMessageResult;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

/* loaded from: input_file:com/expediagroup/rhapsody/rabbitmq/factory/RabbitMQSenderFactory.class */
public class RabbitMQSenderFactory<T> {
    public static final String RESUBSCRIBE_ON_ERROR_CONFIG = "resubscribe.on.error";
    private static final boolean DEFAULT_RESUBSCRIBE_ON_ERROR = true;
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQSenderFactory.class);
    private static final SendOptions DEFAULT_SEND_OPTIONS = new SendOptions();
    private static final SendOptions DEFAULT_ACKNOWLEDGEABLE_SEND_OPTIONS = new SendOptions().exceptionHandler(RabbitMQSenderFactory::handleAcknowledgeableSendException);
    private final Sender sender;
    private final List<RabbitMessageSendInterceptor<T>> interceptors;
    private final BodySerializer<T> bodySerializer;
    private final boolean resubscribeOnError;

    public RabbitMQSenderFactory(RabbitConfigFactory rabbitConfigFactory) {
        Map map = (Map) rabbitConfigFactory.create();
        this.sender = new Sender(new SenderOptions().connectionFactory(RabbitConfigFactory.createConnectionFactory(map)));
        this.interceptors = RabbitMessageSendInterceptor.createInterceptors(map);
        this.bodySerializer = BodySerializer.create(map);
        this.resubscribeOnError = ((Boolean) ConfigLoading.load(map, RESUBSCRIBE_ON_ERROR_CONFIG, Boolean::valueOf, true)).booleanValue();
    }

    public Function<Publisher<Acknowledgeable<RabbitMessage<T>>>, Flux<Acknowledgeable<OutboundMessageResult<CorrelableOutboundMessage<T>>>>> sendAcknowledgeable() {
        return this::sendAcknowledgeable;
    }

    public Flux<Acknowledgeable<OutboundMessageResult<CorrelableOutboundMessage<T>>>> sendAcknowledgeable(Publisher<Acknowledgeable<RabbitMessage<T>>> publisher) {
        return Flux.from(publisher).map(this.interceptors.isEmpty() ? Function.identity() : Acknowledgeable.mapping(this::applyInterceptors)).map(acknowledgeable -> {
            return serialize(acknowledgeable, (v0) -> {
                return v0.get();
            }, acknowledgeable -> {
                return acknowledgeable.map((v0) -> {
                    return v0.getBody();
                });
            });
        }).transform(flux -> {
            return sendSerialized(flux, DEFAULT_ACKNOWLEDGEABLE_SEND_OPTIONS);
        }).map(this::toAcknowledgeableResult);
    }

    public Function<Publisher<RabbitMessage<T>>, Flux<OutboundMessageResult<CorrelableOutboundMessage<T>>>> send() {
        return this::send;
    }

    public Flux<OutboundMessageResult<CorrelableOutboundMessage<T>>> send(Publisher<RabbitMessage<T>> publisher) {
        return Flux.from(publisher).map(this::applyInterceptors).map(rabbitMessage -> {
            return serialize(rabbitMessage, Function.identity(), (v0) -> {
                return v0.getBody();
            });
        }).transform(flux -> {
            return sendSerialized(flux, DEFAULT_SEND_OPTIONS);
        });
    }

    private RabbitMessage<T> applyInterceptors(RabbitMessage<T> rabbitMessage) {
        Iterator<RabbitMessageSendInterceptor<T>> it = this.interceptors.iterator();
        while (it.hasNext()) {
            rabbitMessage = it.next().onSend(rabbitMessage);
        }
        return rabbitMessage;
    }

    private <W, C> CorrelableOutboundMessage<C> serialize(W w, Function<W, RabbitMessage<T>> function, Function<W, C> function2) {
        RabbitMessage<T> apply = function.apply(w);
        return new CorrelableOutboundMessage<>(apply.getExchange(), apply.getRoutingKey(), apply.getProperties(), this.bodySerializer.serialize(apply.getBody()), function2.apply(w));
    }

    private <M extends OutboundMessage> Flux<OutboundMessageResult<M>> sendSerialized(Flux<M> flux, SendOptions sendOptions) {
        return this.sender.sendWithTypedPublishConfirms(flux, sendOptions).doOnError(th -> {
            LOGGER.warn("An Error was encountered while trying to send to RabbitMQ. resubscribeOnError={}", Boolean.valueOf(this.resubscribeOnError), th);
        }).retry(th2 -> {
            return this.resubscribeOnError;
        });
    }

    private Acknowledgeable<OutboundMessageResult<CorrelableOutboundMessage<T>>> toAcknowledgeableResult(OutboundMessageResult<CorrelableOutboundMessage<Acknowledgeable<T>>> outboundMessageResult) {
        CorrelableOutboundMessage outboundMessage = outboundMessageResult.getOutboundMessage();
        return ((Acknowledgeable) outboundMessage.getCorrelationMetadata()).map(obj -> {
            return new OutboundMessageResult(toCorrelableOutboundMessage(outboundMessage, obj), outboundMessageResult.isAck(), outboundMessageResult.isReturned());
        });
    }

    private CorrelableOutboundMessage<T> toCorrelableOutboundMessage(OutboundMessage outboundMessage, T t) {
        return new CorrelableOutboundMessage<>(outboundMessage.getExchange(), outboundMessage.getRoutingKey(), outboundMessage.getProperties(), outboundMessage.getBody(), t);
    }

    private static void handleAcknowledgeableSendException(Sender.SendContext sendContext, Exception exc) {
        ((Acknowledgeable) Acknowledgeable.class.cast(((CorrelableOutboundMessage) CorrelableOutboundMessage.class.cast(sendContext.getMessage())).getCorrelationMetadata())).nacknowledge(exc);
    }
}
