package io.atleon.rabbitmq;

import io.atleon.core.Acknowledgement;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloSignalListenerFactory;
import io.atleon.core.AloSignalListenerFactoryConfig;
import io.atleon.core.ErrorEmitter;
import io.atleon.rabbitmq.NacknowledgerFactory;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ReceiverOptions;

/* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQReceiver.class */
public class AloRabbitMQReceiver<T> {
    public static final String CONFIG_PREFIX = "rabbitmq.receiver.";
    public static final String QOS_CONFIG = "rabbitmq.receiver.qos";
    public static final String BODY_DESERIALIZER_CONFIG = "rabbitmq.receiver.body.deserializer";

    @Deprecated
    public static final String NACK_STRATEGY_CONFIG = "rabbitmq.receiver.nack.strategy";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "rabbitmq.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String NACKNOWLEDGER_TYPE_REQUEUE = "requeue";
    public static final String NACKNOWLEDGER_TYPE_DISCARD = "discard";
    public static final String ERROR_EMISSION_TIMEOUT_CONFIG = "rabbitmq.receiver.error.emission.timeout";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloRabbitMQReceiver.class);
    private final RabbitMQConfigSource configSource;

    @Deprecated
    /* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQReceiver$NackStrategy.class */
    public enum NackStrategy {
        EMIT,
        REQUEUE,
        DISCARD
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQReceiver$ReceiveResources.class */
    public static final class ReceiveResources<T> {
        private final RabbitMQConfig config;
        private final BodyDeserializer<T> bodyDeserializer;
        private final NacknowledgerFactory<T> nacknowledgerFactory;

        public ReceiveResources(RabbitMQConfig rabbitMQConfig) {
            this.config = rabbitMQConfig;
            this.bodyDeserializer = (BodyDeserializer) rabbitMQConfig.loadConfiguredOrThrow(AloRabbitMQReceiver.BODY_DESERIALIZER_CONFIG, BodyDeserializer.class);
            this.nacknowledgerFactory = createNacknowledgerFactory(rabbitMQConfig);
        }

        public Flux<Alo<ReceivedRabbitMQMessage<T>>> receive(String str) {
            AloFactory<ReceivedRabbitMQMessage<T>> loadAloFactory = loadAloFactory(str);
            ErrorEmitter<Alo<ReceivedRabbitMQMessage<T>>> newErrorEmitter = newErrorEmitter();
            Flux map = Flux.using(this::newReceiver, receiver -> {
                return receiver.consumeManualAck(str, newConsumeOptions());
            }, (v0) -> {
                v0.close();
            }).map(acknowledgableDelivery -> {
                Objects.requireNonNull(newErrorEmitter);
                return deserialize(acknowledgableDelivery, loadAloFactory, newErrorEmitter::safelyEmit);
            });
            Objects.requireNonNull(newErrorEmitter);
            return map.transform((v1) -> {
                return r1.applyTo(v1);
            }).transform(flux -> {
                return applySignalListenerFactories(flux, str);
            });
        }

        private AloFactory<ReceivedRabbitMQMessage<T>> loadAloFactory(String str) {
            return AloFactoryConfig.loadDecorated(this.config.modifyAndGetProperties(map -> {
                map.put(AloReceivedRabbitMQMessageDecorator.QUEUE_CONFIG, str);
            }), AloReceivedRabbitMQMessageDecorator.class);
        }

        private ErrorEmitter<Alo<ReceivedRabbitMQMessage<T>>> newErrorEmitter() {
            return ErrorEmitter.create(this.config.loadDuration(AloRabbitMQReceiver.ERROR_EMISSION_TIMEOUT_CONFIG).orElse(ErrorEmitter.DEFAULT_TIMEOUT));
        }

        private Receiver newReceiver() {
            return new Receiver(new ReceiverOptions().connectionFactory(this.config.buildConnectionFactory()));
        }

        private ConsumeOptions newConsumeOptions() {
            return new ConsumeOptions().qos(this.config.loadInt(AloRabbitMQReceiver.QOS_CONFIG).orElse(256).intValue());
        }

        private Flux<Alo<ReceivedRabbitMQMessage<T>>> applySignalListenerFactories(Flux<Alo<ReceivedRabbitMQMessage<T>>> flux, String str) {
            Iterator it = AloSignalListenerFactoryConfig.loadList(this.config.modifyAndGetProperties(map -> {
                map.put(AloReceivedRabbitMQMessageSignalListenerFactory.QUEUE_CONFIG, str);
            }), AloReceivedRabbitMQMessageSignalListenerFactory.class).iterator();
            while (it.hasNext()) {
                flux = flux.tap((AloSignalListenerFactory) it.next());
            }
            return flux;
        }

        private Alo<ReceivedRabbitMQMessage<T>> deserialize(AcknowledgableDelivery acknowledgableDelivery, AloFactory<ReceivedRabbitMQMessage<T>> aloFactory, Consumer<Throwable> consumer) {
            ReceivedRabbitMQMessage<T> create = ReceivedRabbitMQMessage.create(acknowledgableDelivery.getEnvelope().getExchange(), acknowledgableDelivery.getEnvelope().getRoutingKey(), acknowledgableDelivery.getProperties(), this.bodyDeserializer.deserialize(SerializedBody.ofBytes(acknowledgableDelivery.getBody())), acknowledgableDelivery.getEnvelope().isRedeliver());
            Acknowledgement create2 = Acknowledgement.create(() -> {
                ack(acknowledgableDelivery, consumer);
            }, this.nacknowledgerFactory.create(create, z -> {
                nack(acknowledgableDelivery, z, consumer);
            }, consumer));
            Objects.requireNonNull(create2);
            Runnable runnable = create2::positive;
            Objects.requireNonNull(create2);
            return aloFactory.create(create, runnable, create2::negative);
        }

        private static <T> NacknowledgerFactory<T> createNacknowledgerFactory(RabbitMQConfig rabbitMQConfig) {
            Optional loadNacknowledgerFactory = loadNacknowledgerFactory(rabbitMQConfig, AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class);
            if (loadNacknowledgerFactory.isPresent()) {
                return (NacknowledgerFactory) loadNacknowledgerFactory.get();
            }
            Optional loadEnum = rabbitMQConfig.loadEnum(AloRabbitMQReceiver.NACK_STRATEGY_CONFIG, NackStrategy.class);
            if (!loadEnum.isPresent()) {
                return new NacknowledgerFactory.Emit();
            }
            AloRabbitMQReceiver.LOGGER.warn("The configuration rabbitmq.receiver.nack.strategy is deprecated. Use rabbitmq.receiver.nacknowledger.type");
            return (NacknowledgerFactory) loadEnum.map((v0) -> {
                return v0.name();
            }).flatMap(ReceiveResources::newPredefinedNacknowledgerFactory).orElseThrow(() -> {
                return new IllegalStateException("Failed to convert NackStrategy to NacknowledgerFactory");
            });
        }

        private static <T, N extends NacknowledgerFactory<T>> Optional<NacknowledgerFactory<T>> loadNacknowledgerFactory(RabbitMQConfig rabbitMQConfig, String str, Class<N> cls) {
            return rabbitMQConfig.loadConfiguredWithPredefinedTypes(str, cls, ReceiveResources::newPredefinedNacknowledgerFactory);
        }

        private static <T> Optional<NacknowledgerFactory<T>> newPredefinedNacknowledgerFactory(String str) {
            return str.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_EMIT) ? Optional.of(new NacknowledgerFactory.Emit()) : str.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_REQUEUE) ? Optional.of(new NacknowledgerFactory.Nack(AloRabbitMQReceiver.LOGGER, true)) : str.equalsIgnoreCase(AloRabbitMQReceiver.NACKNOWLEDGER_TYPE_DISCARD) ? Optional.of(new NacknowledgerFactory.Nack(AloRabbitMQReceiver.LOGGER, false)) : Optional.empty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void ack(AcknowledgableDelivery acknowledgableDelivery, Consumer<? super Throwable> consumer) {
            try {
                acknowledgableDelivery.ack(false);
            } catch (Throwable th) {
                AloRabbitMQReceiver.LOGGER.error("Failed to ack", th);
                consumer.accept(th);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static void nack(AcknowledgableDelivery acknowledgableDelivery, boolean z, Consumer<? super Throwable> consumer) {
            try {
                acknowledgableDelivery.nack(false, z);
            } catch (Throwable th) {
                AloRabbitMQReceiver.LOGGER.error("Failed to nack", th);
                consumer.accept(th);
            }
        }
    }

    private AloRabbitMQReceiver(RabbitMQConfigSource rabbitMQConfigSource) {
        this.configSource = rabbitMQConfigSource;
    }

    public static <T> AloRabbitMQReceiver<T> from(RabbitMQConfigSource rabbitMQConfigSource) {
        return create(rabbitMQConfigSource);
    }

    public static <T> AloRabbitMQReceiver<T> create(RabbitMQConfigSource rabbitMQConfigSource) {
        return new AloRabbitMQReceiver<>(rabbitMQConfigSource);
    }

    public AloFlux<T> receiveAloBodies(String str) {
        return receiveAloMessages(str).mapNotNull((v0) -> {
            return v0.body();
        });
    }

    public AloFlux<ReceivedRabbitMQMessage<T>> receiveAloMessages(String str) {
        return (AloFlux) ((Mono) this.configSource.create()).map(ReceiveResources::new).flatMapMany(receiveResources -> {
            return receiveResources.receive(str);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }
}
