package io.es4j.infrastructure.pgbroker.core;

import io.es4j.infrastructure.pgbroker.models.ConsumerFailureKey;
import io.es4j.infrastructure.pgbroker.models.ConsumerFailureQuery;
import io.es4j.infrastructure.pgbroker.models.ConsumerFailureRecord;
import io.es4j.infrastructure.pgbroker.models.ConsumerRouter;
import io.es4j.infrastructure.pgbroker.models.MessageRecord;
import io.es4j.infrastructure.pgbroker.models.MessageRecordKey;
import io.es4j.infrastructure.pgbroker.models.MessageRecordQuery;
import io.es4j.infrastructure.pgbroker.models.MessageState;
import io.es4j.infrastructure.pgbroker.models.RawMessage;
import io.es4j.sql.Repository;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiOnItem;
import io.smallrye.mutiny.groups.MultiRepetition;
import io.smallrye.mutiny.subscription.FixedDemandPacer;
import io.smallrye.mutiny.tuples.Tuple2;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/core/MessageRouter.class */
public class MessageRouter {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouter.class);
    private final Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> messageBroker;
    private final Repository<ConsumerFailureKey, ConsumerFailureRecord, ConsumerFailureQuery> consumerFailures;
    private final ConsumerRouter consumerRouter;

    public MessageRouter(ConsumerRouter consumerRouter, Repository<MessageRecordKey, MessageRecord, MessageRecordQuery> repository, Repository<ConsumerFailureKey, ConsumerFailureRecord, ConsumerFailureQuery> repository2) {
        this.consumerRouter = consumerRouter;
        this.messageBroker = repository;
        this.consumerFailures = repository2;
    }

    public Uni<Void> routeTopicPartition(List<MessageRecord> list) {
        return startPacedStream(splitOnPartitionKey(list).entrySet()).onItem().transformToUniAndMerge(entry -> {
            LOGGER.debug("Processing partition stream {} -> {}", entry.getKey(), entry.getValue());
            return processTopicPartitionStream((List) entry.getValue());
        }).collect().asList().flatMap(list2 -> {
            List list2 = list2.stream().flatMap((v0) -> {
                return v0.stream();
            }).toList();
            return Uni.join().all(new Uni[]{updateConsumedMessages(list2.stream().map((v0) -> {
                return v0.getItem1();
            }).map(MessageRecord::from).toList()), persistFailures(list2.stream().map((v0) -> {
                return v0.getItem2();
            }).flatMap((v0) -> {
                return v0.stream();
            }).toList())}).andFailFast();
        }).replaceWithVoid();
    }

    public Uni<Void> routeQueues(List<MessageRecord> list) {
        return startPacedStream(list).onItem().transformToUniAndMerge(messageRecord -> {
            return (Objects.nonNull(messageRecord.expiration()) && Instant.now().isAfter(messageRecord.expiration())) ? Uni.createFrom().item(Tuple2.of(parseMessage(messageRecord).withState(MessageState.EXPIRED), Optional.empty())) : this.consumerRouter.routeQueue(parseMessage(messageRecord));
        }).collect().asList().flatMap(list2 -> {
            return Uni.join().all(new Uni[]{updateConsumedMessages(list2.stream().map((v0) -> {
                return v0.getItem1();
            }).map(MessageRecord::from).toList()), persistFailures(list2.stream().map((v0) -> {
                return v0.getItem2();
            }).flatMap((v0) -> {
                return v0.stream();
            }).toList())}).andFailFast();
        }).replaceWithVoid();
    }

    private Multi<MessageRecord> startPacedStream(Iterable<MessageRecord> iterable) {
        if (this.consumerRouter.brokerConfiguration().consumerConcurrency() == null) {
            return Multi.createFrom().iterable(iterable);
        }
        return Multi.createFrom().iterable(iterable).paceDemand().using(new FixedDemandPacer(this.consumerRouter.brokerConfiguration().consumerConcurrency().intValue(), this.consumerRouter.brokerConfiguration().consumerThrottle()));
    }

    private Multi<Map.Entry<String, List<RawMessage>>> startPacedStream(Set<Map.Entry<String, List<RawMessage>>> set) {
        if (this.consumerRouter.brokerConfiguration().consumerConcurrency() == null) {
            return Multi.createFrom().iterable(set);
        }
        return Multi.createFrom().iterable(set).paceDemand().using(new FixedDemandPacer(this.consumerRouter.brokerConfiguration().consumerConcurrency().intValue(), this.consumerRouter.brokerConfiguration().consumerThrottle()));
    }

    private Uni<List<Tuple2<RawMessage, List<ConsumerFailureRecord>>>> processTopicPartitionStream(List<RawMessage> list) {
        Stack<RawMessage> fillStack = fillStack(list);
        LOGGER.debug("Processing partition messages {}", fillStack);
        MultiRepetition repeating = Multi.createBy().repeating();
        Objects.requireNonNull(fillStack);
        MultiOnItem onItem = repeating.supplier(fillStack::pop).whilst(rawMessage -> {
            return !fillStack.isEmpty();
        }).onItem();
        ConsumerRouter consumerRouter = this.consumerRouter;
        Objects.requireNonNull(consumerRouter);
        return onItem.transformToUniAndConcatenate(consumerRouter::fanOut).collect().asList();
    }

    private static Stack<RawMessage> fillStack(List<RawMessage> list) {
        Stack<RawMessage> stack = new Stack<>();
        Stream<RawMessage> sorted = list.stream().sorted(Comparator.comparing((v0) -> {
            return v0.messageSequence();
        }));
        Objects.requireNonNull(stack);
        sorted.forEach((v1) -> {
            r1.push(v1);
        });
        return stack;
    }

    private Map<String, List<RawMessage>> splitOnPartitionKey(List<MessageRecord> list) {
        return (Map) list.stream().map(this::parseMessage).collect(Collectors.groupingBy((v0) -> {
            return v0.partitionKey();
        }));
    }

    private RawMessage parseMessage(MessageRecord messageRecord) {
        return new RawMessage(messageRecord.messageId(), messageRecord.scheduled(), messageRecord.expiration(), messageRecord.priority(), messageRecord.messageState(), messageRecord.messageAddress(), messageRecord.payload(), messageRecord.baseRecord().tenant(), messageRecord.messageSequence(), messageRecord.partitionId(), messageRecord.partitionKey(), messageRecord.schemaVersion());
    }

    private Uni<Void> updateConsumedMessages(List<MessageRecord> list) {
        if (list.isEmpty()) {
            return Uni.createFrom().voidItem();
        }
        LOGGER.debug("Updating messages {}", list);
        return this.messageBroker.updateByKeyBatch(list);
    }

    private Uni<Void> persistFailures(List<ConsumerFailureRecord> list) {
        if (list.isEmpty()) {
            return Uni.createFrom().voidItem();
        }
        LOGGER.debug("Persisting failures {}", list);
        return this.consumerFailures.insertBatch(list);
    }
}
