package io.atleon.aws.sqs;

import io.atleon.aws.sqs.NacknowledgerFactory;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFlux;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/atleon/aws/sqs/AloSqsReceiver.class */
public class AloSqsReceiver<T> {
    public static final String CONFIG_PREFIX = "sqs.receiver.";
    public static final String BODY_DESERIALIZER_CONFIG = "sqs.receiver.body.deserializer";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "sqs.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String NACKNOWLEDGER_TYPE_VISIBILITY_RESET = "visibility_reset";
    public static final String MAX_MESSAGES_PER_RECEPTION_CONFIG = "sqs.receiver.max.messages.per.reception";
    public static final String MESSAGE_ATTRIBUTES_TO_REQUEST_CONFIG = "sqs.receiver.message.attributes.to.request";
    public static final String MESSAGE_SYSTEM_ATTRIBUTES_TO_REQUEST_CONFIG = "sqs.receiver.message.system.attributes.to.request";
    public static final String WAIT_TIME_SECONDS_PER_RECEPTION_CONFIG = "sqs.receiver.wait.time.seconds.per.reception";
    public static final String VISIBILITY_TIMEOUT_SECONDS_CONFIG = "sqs.receiver.visibility.timeout";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "sqs.receiver.max.in.flight.per.subscription";
    public static final String DELETE_BATCH_SIZE_CONFIG = "sqs.receiver.delete.batch.size";
    public static final String DELETE_BATCH_INTERVAL_CONFIG = "sqs.receiver.delete.batch.interval";
    public static final String CLOSE_TIMEOUT_CONFIG = "sqs.receiver.close.timeout";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloSqsReceiver.class);
    private final SqsConfigSource configSource;

    private AloSqsReceiver(SqsConfigSource sqsConfigSource) {
        this.configSource = sqsConfigSource;
    }

    public static <T> AloSqsReceiver<T> from(SqsConfigSource sqsConfigSource) {
        return new AloSqsReceiver<>(sqsConfigSource);
    }

    public AloFlux<T> receiveAloBodies(String str) {
        return receiveAloMessages(str).map((v0) -> {
            return v0.body();
        });
    }

    public AloFlux<ReceivedSqsMessage<T>> receiveAloMessages(String str) {
        return (AloFlux) ((Mono) this.configSource.create()).flatMapMany(sqsConfig -> {
            return receiveMessages(sqsConfig, str);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    private Flux<Alo<ReceivedSqsMessage<T>>> receiveMessages(SqsConfig sqsConfig, String str) {
        SqsReceiverOptions newReceiverOptions = newReceiverOptions(sqsConfig);
        AloFactory<ReceivedSqsMessage<T>> loadAloFactory = sqsConfig.loadAloFactory();
        BodyDeserializer bodyDeserializer = (BodyDeserializer) sqsConfig.loadConfiguredOrThrow(BODY_DESERIALIZER_CONFIG, BodyDeserializer.class);
        NacknowledgerFactory createNacknowledgerFactory = createNacknowledgerFactory(sqsConfig);
        Sinks.Empty empty = Sinks.empty();
        return SqsReceiver.create(newReceiverOptions).receiveManual(str).mergeWith(empty.asMono()).map(sqsReceiverMessage -> {
            Objects.requireNonNull(empty);
            return toAlo(loadAloFactory, sqsReceiverMessage, bodyDeserializer, createNacknowledgerFactory, empty::tryEmitError);
        });
    }

    private static SqsReceiverOptions newReceiverOptions(SqsConfig sqsConfig) {
        Objects.requireNonNull(sqsConfig);
        return SqsReceiverOptions.newBuilder(sqsConfig::buildClient).maxMessagesPerReception(sqsConfig.loadInt(MAX_MESSAGES_PER_RECEPTION_CONFIG).orElse(10).intValue()).messageAttributesToRequest(sqsConfig.loadSetOfStringOrEmpty(MESSAGE_ATTRIBUTES_TO_REQUEST_CONFIG)).messageSystemAttributesToRequest(sqsConfig.loadSetOfStringOrEmpty(MESSAGE_SYSTEM_ATTRIBUTES_TO_REQUEST_CONFIG)).waitTimeSecondsPerReception(sqsConfig.loadInt(WAIT_TIME_SECONDS_PER_RECEPTION_CONFIG).orElse(5).intValue()).visibilityTimeoutSeconds(sqsConfig.loadInt(VISIBILITY_TIMEOUT_SECONDS_CONFIG).orElse(30).intValue()).maxInFlightPerSubscription(sqsConfig.loadInt(MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG).orElse(Integer.valueOf(SqsReceiverOptions.DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION)).intValue()).deleteBatchSize(sqsConfig.loadInt(DELETE_BATCH_SIZE_CONFIG).orElse(10).intValue()).deleteInterval(sqsConfig.loadDuration(DELETE_BATCH_INTERVAL_CONFIG).orElse(SqsReceiverOptions.DEFAULT_DELETE_INTERVAL)).closeTimeout(sqsConfig.loadDuration(CLOSE_TIMEOUT_CONFIG).orElse(SqsReceiverOptions.DEFAULT_CLOSE_TIMEOUT)).build();
    }

    private static <T> NacknowledgerFactory<T> createNacknowledgerFactory(SqsConfig sqsConfig) {
        return (NacknowledgerFactory) loadNacknowledgerFactory(sqsConfig, NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class).orElseGet(NacknowledgerFactory.Emit::new);
    }

    private static <T, N extends NacknowledgerFactory<T>> Optional<NacknowledgerFactory<T>> loadNacknowledgerFactory(SqsConfig sqsConfig, String str, Class<N> cls) {
        return sqsConfig.loadConfiguredWithPredefinedTypes(str, cls, AloSqsReceiver::instantiatePredefinedNacknowledgerFactory);
    }

    private static <T> Optional<NacknowledgerFactory<T>> instantiatePredefinedNacknowledgerFactory(String str) {
        return str.equalsIgnoreCase(NACKNOWLEDGER_TYPE_EMIT) ? Optional.of(new NacknowledgerFactory.Emit()) : str.equalsIgnoreCase(NACKNOWLEDGER_TYPE_VISIBILITY_RESET) ? Optional.of(new NacknowledgerFactory.VisibilityReset(LOGGER)) : Optional.empty();
    }

    private static <T> Alo<ReceivedSqsMessage<T>> toAlo(AloFactory<ReceivedSqsMessage<T>> aloFactory, SqsReceiverMessage sqsReceiverMessage, BodyDeserializer<T> bodyDeserializer, NacknowledgerFactory<T> nacknowledgerFactory, Consumer<Throwable> consumer) {
        DeserializedSqsMessage deserialize = DeserializedSqsMessage.deserialize(sqsReceiverMessage, bodyDeserializer);
        return aloFactory.create(deserialize, sqsReceiverMessage.deleter(), nacknowledgerFactory.create(deserialize, sqsReceiverMessage.deleter(), sqsReceiverMessage.visibilityChanger(), consumer));
    }
}
