package com.agorapulse.worker.sqs.v2;

import com.agorapulse.micronaut.amazon.awssdk.sqs.SimpleQueueService;
import com.agorapulse.worker.convention.QueueListener;
import com.agorapulse.worker.queue.JobQueues;
import com.agorapulse.worker.queue.QueueMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.sqs.model.SqsException;

/* loaded from: input_file:com/agorapulse/worker/sqs/v2/SqsQueues.class */
public class SqsQueues implements JobQueues {
    private static final int MAX_MESSAGES_TO_POLL = 10;
    private static final int MIN_PARALLELISM = 4;
    private static final int MAX_WAITING_TIME = 20;
    private final SimpleQueueService simpleQueueService;
    private final ObjectMapper objectMapper;

    public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMapper) {
        this.simpleQueueService = simpleQueueService;
        this.objectMapper = objectMapper;
    }

    public <T> Publisher<QueueMessage<T>> readMessages(String str, int i, Duration duration, Argument<T> argument) {
        Instant now = Instant.now();
        return Flux.generate(() -> {
            return Integer.valueOf(i);
        }, (num, synchronousSink) -> {
            try {
                List receiveMessages = this.simpleQueueService.receiveMessages(str, Math.min(MAX_MESSAGES_TO_POLL, num.intValue()), 0, Math.min(MAX_WAITING_TIME, Math.toIntExact(duration.getSeconds())));
                if (receiveMessages.isEmpty() && duration.getSeconds() <= 20 && !QueueListener.Utils.isInfinitePoll(i, duration)) {
                    synchronousSink.complete();
                    return 0;
                }
                synchronousSink.next(receiveMessages);
                if (QueueListener.Utils.isInfinitePoll(i, duration)) {
                    return Integer.valueOf(i);
                }
                int intValue = num.intValue() - receiveMessages.size();
                if (intValue <= 0 || now.plus((TemporalAmount) duration).isBefore(Instant.now())) {
                    synchronousSink.complete();
                }
                return Integer.valueOf(intValue);
            } catch (Throwable th) {
                synchronousSink.error(th);
                return 0;
            }
        }).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        }).flatMap(message -> {
            return readMessageInternal(str, argument, message.body(), true).map(obj -> {
                return QueueMessage.requeueIfDeleted(message.messageId(), obj, () -> {
                    this.simpleQueueService.deleteMessage(str, message.receiptHandle());
                }, () -> {
                    this.simpleQueueService.sendMessage(str, message.body());
                });
            });
        }).parallel(Math.max(Schedulers.DEFAULT_POOL_SIZE, MIN_PARALLELISM));
    }

    public void sendMessage(String str, Object obj) {
        sendRawMessage(str, convertMessageToJson(obj));
    }

    public void sendRawMessage(String str, Object obj) {
        try {
            this.simpleQueueService.sendMessage(str, obj.toString());
        } catch (SqsException e) {
            if (e.getMessage() == null || !e.getMessage().contains("Concurrent access: Queue already exists")) {
                throw e;
            }
            sendMessage(str, obj);
        }
    }

    public void sendMessages(String str, Publisher<?> publisher) {
        sendRawMessages(str, Flux.from(publisher).map(this::convertMessageToJson));
    }

    public void sendRawMessages(String str, Publisher<?> publisher) {
        Flux.from(this.simpleQueueService.sendMessages(str, Flux.from(publisher).map(String::valueOf))).subscribe();
    }

    private <T> Mono<T> readMessageInternal(String str, Argument<T> argument, String str2, boolean z) {
        try {
            return Mono.just(this.objectMapper.readValue(str2, JacksonConfiguration.constructType(argument, this.objectMapper.getTypeFactory())));
        } catch (JsonProcessingException e) {
            if (z) {
                if (String.class.isAssignableFrom(argument.getType())) {
                    return Mono.just(argument.getType().cast(str2));
                }
                if (Collection.class.isAssignableFrom(argument.getType())) {
                    return (argument.getTypeParameters().length <= 0 || !CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) ? readMessageInternal(str, argument, "[" + str2 + "]", false) : readMessageInternal(str, argument, "[" + ((String) Arrays.stream(str2.split(",\\s*")).map(str3 -> {
                        return "\"" + str3 + "\"";
                    }).collect(Collectors.joining(","))) + "]", false);
                }
            }
            return Mono.error(new IllegalArgumentException("Cannot convert to " + String.valueOf(argument) + "from message\n" + str2, e));
        }
    }

    private String convertMessageToJson(Object obj) {
        try {
            return this.objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Cannot marshal object " + String.valueOf(obj) + " to JSON", e);
        }
    }
}
