package com.expediagroup.rhapsody.rabbitmq.factory;

import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgementConfig;
import com.expediagroup.rhapsody.core.transformer.AutoAcknowledgingTransformer;
import com.expediagroup.rhapsody.rabbitmq.message.AckableRabbitMessage;
import com.expediagroup.rhapsody.rabbitmq.message.RabbitMessage;
import com.expediagroup.rhapsody.rabbitmq.message.SafeAckerNacker;
import com.expediagroup.rhapsody.rabbitmq.serde.BodyDeserializer;
import com.expediagroup.rhapsody.util.ConfigLoading;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;

/* loaded from: input_file:com/expediagroup/rhapsody/rabbitmq/factory/RabbitMQFluxFactory.class */
public class RabbitMQFluxFactory<T> {
    public static final String QOS_CONFIG = "qos";
    private final Map<String, Object> properties;
    private final NacknowledgerFactory nacknowledgerFactory;
    private final ConnectionFactory connectionFactory;
    private final BodyDeserializer<T> bodyDeserializer;
    private final AckableRabbitMessageFactory<T> messageFactory;

    public RabbitMQFluxFactory(RabbitConfigFactory rabbitConfigFactory) {
        this(rabbitConfigFactory, new DiscardSingleNacknowledgerFactory());
    }

    public RabbitMQFluxFactory(RabbitConfigFactory rabbitConfigFactory, NacknowledgerFactory nacknowledgerFactory) {
        this.properties = (Map) rabbitConfigFactory.create();
        this.nacknowledgerFactory = nacknowledgerFactory;
        this.connectionFactory = RabbitConfigFactory.createConnectionFactory(this.properties);
        this.bodyDeserializer = BodyDeserializer.create(this.properties);
        this.messageFactory = AckableRabbitMessageFactory.create(this.properties);
    }

    public Flux<RabbitMessage<T>> consumeAuto(String str, AutoAcknowledgementConfig autoAcknowledgementConfig) {
        return consume(str).transformDeferred(new AutoAcknowledgingTransformer(autoAcknowledgementConfig, flux -> {
            return flux.takeLast(1);
        }, (v0) -> {
            v0.multipleAck();
        })).map((v0) -> {
            return v0.get();
        });
    }

    public Flux<AckableRabbitMessage<T>> consume(String str) {
        return Flux.defer(() -> {
            return consumeManualAck(str);
        });
    }

    private Flux<AckableRabbitMessage<T>> consumeManualAck(String str) {
        Sinks.Empty empty = Sinks.empty();
        return new Receiver(createReceiverOptions()).consumeManualAck(str, createConsumeOptions()).map(acknowledgableDelivery -> {
            Objects.requireNonNull(empty);
            return deserialize(acknowledgableDelivery, empty::tryEmitError);
        }).mergeWith(empty.asMono());
    }

    private ReceiverOptions createReceiverOptions() {
        ReceiverOptions receiverOptions = new ReceiverOptions();
        receiverOptions.connectionFactory(this.connectionFactory);
        return receiverOptions;
    }

    private ConsumeOptions createConsumeOptions() {
        ConsumeOptions consumeOptions = new ConsumeOptions();
        consumeOptions.qos(((Integer) ConfigLoading.load(this.properties, QOS_CONFIG, Integer::valueOf, Integer.valueOf(consumeOptions.getQos()))).intValue());
        return consumeOptions;
    }

    private AckableRabbitMessage<T> deserialize(AcknowledgableDelivery acknowledgableDelivery, Consumer<? super Throwable> consumer) {
        RabbitMessage rabbitMessage = new RabbitMessage(acknowledgableDelivery.getEnvelope().getExchange(), acknowledgableDelivery.getEnvelope().getRoutingKey(), acknowledgableDelivery.getProperties(), this.bodyDeserializer.deserialize(acknowledgableDelivery.getBody()));
        SafeAckerNacker safeAckerNacker = new SafeAckerNacker(ackType -> {
            acknowledgableDelivery.ack(ackType.isMultiple());
        }, nackType -> {
            acknowledgableDelivery.nack(nackType.isMultiple(), nackType.isRequeue());
        }, consumer);
        return this.messageFactory.create(rabbitMessage, safeAckerNacker, this.nacknowledgerFactory.create(acknowledgableDelivery, safeAckerNacker, consumer));
    }
}
