package io.es4j.infrastructure.pgbroker.messagebroker;

import io.es4j.infrastructure.pgbroker.exceptions.InterruptMessageStream;
import io.es4j.infrastructure.pgbroker.models.ConsumerManager;
import io.es4j.infrastructure.pgbroker.models.DeadLetterKey;
import io.es4j.infrastructure.pgbroker.models.DeadLetterRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordID;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQueryBuilder;
import io.es4j.infrastructure.pgbroker.models.MessageState;
import io.es4j.infrastructure.pgbroker.models.RawMessage;
import io.es4j.sql.Repository;
import io.es4j.sql.exceptions.NotFound;
import io.es4j.sql.models.BaseRecord;
import io.es4j.sql.models.QueryOptions;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnItem;
import io.smallrye.mutiny.groups.MultiRepetition;
import io.smallrye.mutiny.subscription.FixedDemandPacer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/messagebroker/MessageProcessor.class */
public class MessageProcessor {
    private static final EnumSet<MessageState> DEAD_LETTER_QUEUE_STATES = EnumSet.of(MessageState.PARKED, MessageState.FATAL_FAILURE, MessageState.RETRIES_EXHAUSTED, MessageState.EXPIRED);
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class);
    private final Repository<MessageRecordID, MessageRecord, MessageRecordQuery> messageQueue;
    private final Repository<DeadLetterKey, DeadLetterRecord, MessageRecordQuery> deadLetterQueue;
    private final ConsumerManager consumerManager;

    public MessageProcessor(ConsumerManager consumerManager, Repository<MessageRecordID, MessageRecord, MessageRecordQuery> repository, Repository<DeadLetterKey, DeadLetterRecord, MessageRecordQuery> repository2) {
        this.consumerManager = consumerManager;
        this.messageQueue = repository;
        this.deadLetterQueue = repository2;
    }

    public Uni<Void> processPartitions(List<MessageRecord> list) {
        if (list.stream().anyMatch(messageRecord -> {
            return Objects.isNull(messageRecord.partitionId()) || Objects.equals(messageRecord.partitionId(), "none");
        })) {
            throw new IllegalArgumentException("Message missing partitionId");
        }
        if (list.stream().anyMatch(messageRecord2 -> {
            return Objects.isNull(messageRecord2.partitionKey());
        })) {
            throw new IllegalArgumentException("Message missing partitionKey");
        }
        return startPacedStream(splitOnPartitionKey(list).entrySet()).onItem().transformToUniAndMerge(entry -> {
            return searchPartitionKeyInDeadletterQueue((String) entry.getKey()).flatMap(list2 -> {
                if (list2.isEmpty()) {
                    LOGGER.debug("Processing messages {}", entry.getValue());
                    return processPartitionStream((List) entry.getValue());
                }
                LOGGER.debug("Parking messages {}", entry.getValue());
                return Uni.createFrom().item(((List) entry.getValue()).stream().map((v0) -> {
                    return v0.park();
                }).toList());
            });
        }).collect().asList().map(list2 -> {
            return list2.stream().flatMap((v0) -> {
                return v0.stream();
            }).toList();
        }).flatMap(list3 -> {
            return persistResults(list3.stream().map(MessageRecord::from).toList());
        });
    }

    public Uni<Void> processConcurrent(List<MessageRecord> list) {
        if (list.stream().anyMatch(messageRecord -> {
            return !Objects.equals(messageRecord.partitionId(), "none");
        })) {
            throw new IllegalArgumentException("Message with partition");
        }
        return startPacedStream(list).onItem().transformToUniAndMerge(messageRecord2 -> {
            return this.consumerManager.consumeMessage(parseRecord(messageRecord2));
        }).collect().asList().flatMap(list2 -> {
            return persistResults(list2.stream().map(MessageRecord::from).toList());
        });
    }

    private Uni<List<DeadLetterRecord>> searchPartitionKeyInDeadletterQueue(String str) {
        return this.deadLetterQueue.query(deadLetterQuery(str)).onFailure(NotFound.class).recoverWithItem(Collections.emptyList());
    }

    private Multi<MessageRecord> startPacedStream(Iterable<MessageRecord> iterable) {
        if (this.consumerManager.pgBrokerConfiguration().concurrency() == null) {
            return Multi.createFrom().iterable(iterable);
        }
        return Multi.createFrom().iterable(iterable).paceDemand().using(new FixedDemandPacer(this.consumerManager.pgBrokerConfiguration().concurrency().intValue(), this.consumerManager.pgBrokerConfiguration().throttle()));
    }

    private Multi<Map.Entry<String, List<RawMessage>>> startPacedStream(Set<Map.Entry<String, List<RawMessage>>> set) {
        if (this.consumerManager.pgBrokerConfiguration().concurrency() == null) {
            return Multi.createFrom().iterable(set);
        }
        return Multi.createFrom().iterable(set).paceDemand().using(new FixedDemandPacer(this.consumerManager.pgBrokerConfiguration().concurrency().intValue(), this.consumerManager.pgBrokerConfiguration().throttle()));
    }

    private Uni<List<RawMessage>> processPartitionStream(List<RawMessage> list) {
        Stack<RawMessage> fillStack = fillStack(list);
        LOGGER.debug("Processing partition messages {}", fillStack);
        ArrayList arrayList = new ArrayList(list.size());
        MultiRepetition repeating = Multi.createBy().repeating();
        Objects.requireNonNull(fillStack);
        return repeating.supplier(fillStack::pop).whilst(rawMessage -> {
            return !fillStack.isEmpty();
        }).onItem().transformToUniAndConcatenate(rawMessage2 -> {
            return this.consumerManager.consumeMessage(rawMessage2).map(rawMessage2 -> {
                return handleProcessResult(fillStack, arrayList, rawMessage2);
            });
        }).collect().asList().onItemOrFailure().transform((list2, th) -> {
            return arrayList;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Class<Void> handleProcessResult(Stack<RawMessage> stack, ArrayList<RawMessage> arrayList, RawMessage rawMessage) {
        arrayList.add(rawMessage);
        if (rawMessage.messageState() == MessageState.PROCESSED) {
            return Void.TYPE;
        }
        LOGGER.debug("Message failed parking the rest of the stream {}", stack);
        stack.forEach(rawMessage2 -> {
            arrayList.add(rawMessage2.park());
        });
        throw new InterruptMessageStream();
    }

    private static Stack<RawMessage> fillStack(List<RawMessage> list) {
        Stack<RawMessage> stack = new Stack<>();
        Stream<RawMessage> sorted = list.stream().sorted(Comparator.comparing((v0) -> {
            return v0.messageSequence();
        }));
        Objects.requireNonNull(stack);
        sorted.forEach((v1) -> {
            r1.push(v1);
        });
        return stack;
    }

    private static MessageRecordQuery deadLetterQuery(String str) {
        return MessageRecordQueryBuilder.builder().partitionKey(str).options(QueryOptions.simple()).build();
    }

    private Map<String, List<RawMessage>> splitOnPartitionKey(List<MessageRecord> list) {
        return (Map) list.stream().map(this::parseRecord).collect(Collectors.groupingBy((v0) -> {
            return v0.partitionKey();
        }));
    }

    private RawMessage parseRecord(MessageRecord messageRecord) {
        return new RawMessage(messageRecord.id(), messageRecord.scheduled(), messageRecord.expiration(), messageRecord.priority(), messageRecord.retryCounter(), messageRecord.messageState(), messageRecord.payloadClass(), messageRecord.payload(), messageRecord.failedProcessors(), messageRecord.baseRecord().tenant(), messageRecord.messageSequence(), messageRecord.partitionId(), messageRecord.partitionKey());
    }

    private Uni<Void> persistResults(List<MessageRecord> list) {
        LOGGER.debug("Persisting results for {}", list);
        return Uni.join().all(new Uni[]{ack(list), nack(list)}).andFailFast().replaceWithVoid();
    }

    private Uni<Void> nack(List<MessageRecord> list) {
        List<MessageRecord> list2 = list.stream().filter(messageRecord -> {
            return messageRecord.messageState() == MessageState.RETRY;
        }).toList();
        if (list2.isEmpty()) {
            return Uni.createFrom().voidItem();
        }
        LOGGER.debug("re-queuing unhandled messages {}", list2.stream().map((v0) -> {
            return v0.id();
        }).toList());
        return this.messageQueue.updateByKeyBatch(list2);
    }

    private Uni<Void> ack(List<MessageRecord> list) {
        ArrayList arrayList = new ArrayList();
        Map map = (Map) list.stream().filter(messageRecord -> {
            return messageRecord.messageState() == MessageState.PROCESSED || DEAD_LETTER_QUEUE_STATES.contains(messageRecord.messageState());
        }).collect(Collectors.groupingBy(messageRecord2 -> {
            return messageRecord2.baseRecord().tenant();
        }));
        List list2 = map.entrySet().stream().map(this::messageDropQuery).toList();
        List list3 = list.stream().filter(messageRecord3 -> {
            return DEAD_LETTER_QUEUE_STATES.contains(messageRecord3.messageState());
        }).map(MessageProcessor::parseDeadLetter).toList();
        if (!list2.isEmpty()) {
            LOGGER.debug("Dropping messages {}", map);
            MultiOnItem onItem = Multi.createFrom().iterable(list2).onItem();
            Repository<MessageRecordID, MessageRecord, MessageRecordQuery> repository = this.messageQueue;
            Objects.requireNonNull(repository);
            arrayList.add(onItem.transformToUniAndMerge((v1) -> {
                return r2.deleteQuery(v1);
            }).collect().asList().replaceWithVoid());
        }
        if (!list3.isEmpty()) {
            LOGGER.debug("Moving messages to dead-letter {}", list3);
            arrayList.add(this.deadLetterQueue.insertBatch(list3));
        }
        return !arrayList.isEmpty() ? Uni.join().all(arrayList).andCollectFailures().replaceWithVoid() : Uni.createFrom().voidItem();
    }

    private static DeadLetterRecord parseDeadLetter(MessageRecord messageRecord) {
        return new DeadLetterRecord(messageRecord.id(), messageRecord.scheduled(), messageRecord.expiration(), messageRecord.priority(), messageRecord.retryCounter(), messageRecord.messageState(), messageRecord.payloadClass(), messageRecord.payload(), messageRecord.failedProcessors(), messageRecord.verticleId(), messageRecord.partitionId(), messageRecord.partitionKey(), BaseRecord.newRecord(messageRecord.baseRecord().tenant()));
    }

    private MessageRecordQuery messageDropQuery(Map.Entry<String, List<MessageRecord>> entry) {
        return MessageRecordQueryBuilder.builder().ids(entry.getValue().stream().map((v0) -> {
            return v0.id();
        }).toList()).options(QueryOptions.simple(entry.getKey())).build();
    }
}
