package io.atleon.aws.sqs;

import io.atleon.core.Batcher;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;

/* loaded from: input_file:io/atleon/aws/sqs/SqsSender.class */
public final class SqsSender implements Closeable {
    private static final Retry DEFAULT_RETRY = Retry.backoff(3, Duration.ofMillis(10));
    private final Mono<SqsAsyncClient> futureClient;
    private final Batcher batcher;
    private final int maxRequestsInFlight;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    /* loaded from: input_file:io/atleon/aws/sqs/SqsSender$MessageSendFailedException.class */
    public static final class MessageSendFailedException extends RuntimeException {
        private final String code;
        private final String message;

        public MessageSendFailedException(String str, String str2) {
            super(String.format("Sending message failed with code=%s: %s", str, str2));
            this.code = str;
            this.message = str2;
        }

        public String code() {
            return this.code;
        }

        public String message() {
            return this.message;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/aws/sqs/SqsSender$SqsSendEntry.class */
    public static final class SqsSendEntry<C> {
        private final SendMessageBatchRequestEntry requestEntry;
        private final C correlationMetadata;

        private SqsSendEntry(SendMessageBatchRequestEntry sendMessageBatchRequestEntry, C c) {
            this.requestEntry = sendMessageBatchRequestEntry;
            this.correlationMetadata = c;
        }

        public static <C> List<SqsSendEntry<C>> singletonList(SqsSenderMessage<C> sqsSenderMessage) {
            return Collections.singletonList(create(sqsSenderMessage));
        }

        public static <C> SqsSendEntry<C> create(SqsSenderMessage<C> sqsSenderMessage) {
            return new SqsSendEntry<>((SendMessageBatchRequestEntry) SendMessageBatchRequestEntry.builder().id(sqsSenderMessage.requestId()).messageDeduplicationId(sqsSenderMessage.messageDeduplicationId().orElse(null)).messageGroupId(sqsSenderMessage.messageGroupId().orElse(null)).messageAttributes(sqsSenderMessage.messageAttributes()).messageSystemAttributesWithStrings(sqsSenderMessage.messageSystemAttributes()).messageBody(sqsSenderMessage.body()).delaySeconds(sqsSenderMessage.senderDelaySeconds().orElse(null)).build(), sqsSenderMessage.correlationMetadata());
        }

        public String requestId() {
            return this.requestEntry.id();
        }

        public SendMessageBatchRequestEntry requestEntry() {
            return this.requestEntry;
        }

        public C correlationMetadata() {
            return this.correlationMetadata;
        }
    }

    private SqsSender(SqsSenderOptions sqsSenderOptions) {
        Objects.requireNonNull(sqsSenderOptions);
        this.futureClient = Mono.fromSupplier(sqsSenderOptions::createClient).cacheInvalidateWhen(sqsAsyncClient -> {
            return this.closeSink.asFlux().next().then();
        }, (v0) -> {
            v0.close();
        });
        this.batcher = Batcher.create(sqsSenderOptions.batchSize(), sqsSenderOptions.batchDuration(), sqsSenderOptions.batchPrefetch());
        this.maxRequestsInFlight = sqsSenderOptions.maxRequestsInFlight();
    }

    public static SqsSender create(SqsSenderOptions sqsSenderOptions) {
        return new SqsSender(sqsSenderOptions);
    }

    public <C> Mono<SqsSenderResult<C>> send(SqsSenderMessage<C> sqsSenderMessage, String str) {
        return this.futureClient.flatMapMany(sqsAsyncClient -> {
            return send(sqsAsyncClient, SqsSendEntry.singletonList(sqsSenderMessage), str);
        }).next();
    }

    public <C> Flux<SqsSenderResult<C>> send(Publisher<SqsSenderMessage<C>> publisher, String str) {
        return this.futureClient.flatMapMany(sqsAsyncClient -> {
            return send(sqsAsyncClient, Flux.from(publisher), str);
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeSink.tryEmitNext(Long.valueOf(System.currentTimeMillis()));
    }

    private <C> Flux<SqsSenderResult<C>> send(SqsAsyncClient sqsAsyncClient, Flux<SqsSenderMessage<C>> flux, String str) {
        return this.batcher.applyMapping(flux.map(SqsSendEntry::create), list -> {
            return send(sqsAsyncClient, list, str);
        }, this.maxRequestsInFlight);
    }

    private <C> Flux<SqsSenderResult<C>> send(SqsAsyncClient sqsAsyncClient, List<SqsSendEntry<C>> list, String str) {
        SendMessageBatchRequest sendMessageBatchRequest = (SendMessageBatchRequest) SendMessageBatchRequest.builder().queueUrl(str).entries((Collection) list.stream().map((v0) -> {
            return v0.requestEntry();
        }).collect(Collectors.toList())).build();
        Map map = (Map) list.stream().filter(sqsSendEntry -> {
            return sqsSendEntry.correlationMetadata() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.requestId();
        }, (v0) -> {
            return v0.correlationMetadata();
        }));
        return Mono.fromFuture(() -> {
            return sqsAsyncClient.sendMessageBatch(sendMessageBatchRequest);
        }).retryWhen(DEFAULT_RETRY).flatMapIterable(sendMessageBatchResponse -> {
            return createResults(sendMessageBatchResponse, map);
        });
    }

    private <C> List<SqsSenderResult<C>> createResults(SendMessageBatchResponse sendMessageBatchResponse, Map<String, C> map) {
        return (List) Stream.concat(sendMessageBatchResponse.failed().stream().map(batchResultErrorEntry -> {
            return createFailureResult(batchResultErrorEntry, map.get(batchResultErrorEntry.id()));
        }), sendMessageBatchResponse.successful().stream().map(sendMessageBatchResultEntry -> {
            return createSuccessResult(sendMessageBatchResultEntry, map.get(sendMessageBatchResultEntry.id()));
        })).collect(Collectors.toList());
    }

    private <C> SqsSenderResult<C> createFailureResult(BatchResultErrorEntry batchResultErrorEntry, C c) {
        return SqsSenderResult.failure(batchResultErrorEntry.id(), new MessageSendFailedException(batchResultErrorEntry.code(), batchResultErrorEntry.message()), c);
    }

    private <C> SqsSenderResult<C> createSuccessResult(SendMessageBatchResultEntry sendMessageBatchResultEntry, C c) {
        return SqsSenderResult.success(sendMessageBatchResultEntry.id(), sendMessageBatchResultEntry.messageId(), sendMessageBatchResultEntry.sequenceNumber(), c);
    }
}
