package io.es4j.infrastructure.pgbroker.models;

import io.es4j.infrastructure.pgbroker.ConsumerTransactionProvider;
import io.es4j.infrastructure.pgbroker.exceptions.DuplicateMessage;
import io.es4j.infrastructure.pgbroker.exceptions.InvalidProcessorException;
import io.es4j.sql.models.BaseRecord;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/models/ConsumerRouter.class */
public final class ConsumerRouter extends Record {
    private final BrokerConfiguration brokerConfiguration;
    private final List<TopicSubscriberWrapper> topicConsumers;
    private final List<QueueConsumerWrapper> queueConsumers;
    private final ConsumerTransactionProvider consumerTransactionProvider;
    private final Vertx vertx;
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerRouter.class);

    public ConsumerRouter(BrokerConfiguration brokerConfiguration, List<TopicSubscriberWrapper> list, List<QueueConsumerWrapper> list2, ConsumerTransactionProvider consumerTransactionProvider, Vertx vertx) {
        this.brokerConfiguration = brokerConfiguration;
        this.topicConsumers = list;
        this.queueConsumers = list2;
        this.consumerTransactionProvider = consumerTransactionProvider;
        this.vertx = vertx;
    }

    public Uni<Tuple2<RawMessage, List<ConsumerFailureRecord>>> fanOut(RawMessage rawMessage) {
        ArrayList arrayList = new ArrayList();
        return Multi.createFrom().iterable(findConsumers(rawMessage)).onItem().transformToUniAndMerge(topicSubscriberWrapper -> {
            return this.consumerTransactionProvider.transaction(topicSubscriberWrapper.consumer().getClass().getName(), rawMessage, (rawMessage2, consumerTransaction) -> {
                return topicSubscriberWrapper.consumer().blocking().booleanValue() ? this.vertx.executeBlocking(process(rawMessage, topicSubscriberWrapper, consumerTransaction, arrayList)) : process(rawMessage, topicSubscriberWrapper, consumerTransaction, arrayList);
            });
        }).collect().asList().map(obj -> {
            return Tuple2.of(rawMessage.withState(MessageState.CONSUMED), arrayList);
        });
    }

    private <T> Uni<Void> process(RawMessage rawMessage, TopicSubscriberWrapper<T> topicSubscriberWrapper, ConsumerTransaction consumerTransaction, List<ConsumerFailureRecord> list) {
        return topicSubscriberWrapper.consume(rawMessage, consumerTransaction).onFailure(th -> {
            return topicSubscriberWrapper.consumer().retryOn().stream().anyMatch(cls -> {
                return cls.isAssignableFrom(th.getClass());
            });
        }).retry().withBackOff(topicSubscriberWrapper.consumer().retryBackOff()).atMost(topicSubscriberWrapper.consumer().numberOfAttempts().intValue()).onFailure().invoke(th2 -> {
            LOGGER.error("Topic subscriber {} failed to process message {}", new Object[]{topicSubscriberWrapper.consumer().getClass().getSimpleName(), rawMessage, th2});
        }).onItemOrFailure().transform((r10, th3) -> {
            return Objects.nonNull(th3) ? Boolean.valueOf(list.add(parseConsumerFailure(topicSubscriberWrapper.consumer().getClass().getName(), rawMessage.messageId(), th3))) : Void.TYPE;
        }).replaceWithVoid();
    }

    private <T> Uni<Tuple2<RawMessage, Optional<ConsumerFailureRecord>>> process(RawMessage rawMessage, QueueConsumerWrapper<T> queueConsumerWrapper, ConsumerTransaction consumerTransaction) {
        return queueConsumerWrapper.consume(rawMessage, consumerTransaction).onFailure(th -> {
            return queueConsumerWrapper.consumer().retryOn().stream().anyMatch(cls -> {
                return cls.isAssignableFrom(th.getClass());
            });
        }).retry().withBackOff(queueConsumerWrapper.consumer().retryBackOff()).atMost(queueConsumerWrapper.consumer().numberOfAttempts().intValue()).onItemOrFailure().transform((r10, th2) -> {
            if (th2 == null || (th2 instanceof DuplicateMessage)) {
                LOGGER.debug("{} consumed message -> {}", queueConsumerWrapper.consumer().getClass().getName(), rawMessage);
                return Tuple2.of(rawMessage.withState(MessageState.CONSUMED), Optional.empty());
            }
            LOGGER.error("{} failed for message -> {} ", new Object[]{queueConsumerWrapper.consumer().getClass().getName(), rawMessage, th2});
            return Tuple2.of(rawMessage.withState(MessageState.CONSUMED), Optional.of(parseConsumerFailure(queueConsumerWrapper.consumer().getClass().getName(), rawMessage.messageId(), th2)));
        });
    }

    private ConsumerFailureRecord parseConsumerFailure(String str, String str2, Throwable th) {
        return ConsumerFailureRecordBuilder.builder().messageId(str2).consumer(str).error(new JsonObject().put("message", th.getMessage()).put("localMessage", th.getLocalizedMessage()).put("causeMessage", Objects.nonNull(th.getCause()) ? th.getCause().getMessage() : null).put("causeLocalMessage", Objects.nonNull(th.getCause()) ? th.getCause().getLocalizedMessage() : null)).baseRecord(BaseRecord.newRecord()).build();
    }

    private List<TopicSubscriberWrapper> findConsumers(RawMessage rawMessage) {
        return this.topicConsumers.stream().filter(topicSubscriberWrapper -> {
            return topicSubscriberWrapper.match(rawMessage);
        }).toList();
    }

    public Uni<Tuple2<RawMessage, Optional<ConsumerFailureRecord>>> routeQueue(RawMessage rawMessage) {
        QueueConsumerWrapper resolveQueueConsumer = resolveQueueConsumer(rawMessage);
        return this.consumerTransactionProvider.transaction(resolveQueueConsumer.consumer().getClass().getName(), rawMessage, (rawMessage2, consumerTransaction) -> {
            return resolveQueueConsumer.consumer().blocking().booleanValue() ? this.vertx.executeBlocking(process(rawMessage, resolveQueueConsumer, consumerTransaction)) : process(rawMessage, resolveQueueConsumer, consumerTransaction);
        });
    }

    private <T> QueueConsumerWrapper<T> resolveQueueConsumer(RawMessage rawMessage) {
        return this.queueConsumers.stream().filter(queueConsumerWrapper -> {
            return queueConsumerWrapper.isMatch(rawMessage);
        }).findFirst().orElseThrow(() -> {
            LOGGER.error("Unable to find processor for message -> {}", rawMessage);
            return new InvalidProcessorException();
        });
    }

    @Override // java.lang.Record
    public final String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ConsumerRouter.class), ConsumerRouter.class, "brokerConfiguration;topicConsumers;queueConsumers;consumerTransactionProvider;vertx", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->brokerConfiguration:Lio/es4j/infrastructure/pgbroker/models/BrokerConfiguration;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->topicConsumers:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->queueConsumers:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->consumerTransactionProvider:Lio/es4j/infrastructure/pgbroker/ConsumerTransactionProvider;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->vertx:Lio/vertx/mutiny/core/Vertx;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ConsumerRouter.class), ConsumerRouter.class, "brokerConfiguration;topicConsumers;queueConsumers;consumerTransactionProvider;vertx", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->brokerConfiguration:Lio/es4j/infrastructure/pgbroker/models/BrokerConfiguration;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->topicConsumers:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->queueConsumers:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->consumerTransactionProvider:Lio/es4j/infrastructure/pgbroker/ConsumerTransactionProvider;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->vertx:Lio/vertx/mutiny/core/Vertx;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ConsumerRouter.class, Object.class), ConsumerRouter.class, "brokerConfiguration;topicConsumers;queueConsumers;consumerTransactionProvider;vertx", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->brokerConfiguration:Lio/es4j/infrastructure/pgbroker/models/BrokerConfiguration;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->topicConsumers:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->queueConsumers:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->consumerTransactionProvider:Lio/es4j/infrastructure/pgbroker/ConsumerTransactionProvider;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerRouter;->vertx:Lio/vertx/mutiny/core/Vertx;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public BrokerConfiguration brokerConfiguration() {
        return this.brokerConfiguration;
    }

    public List<TopicSubscriberWrapper> topicConsumers() {
        return this.topicConsumers;
    }

    public List<QueueConsumerWrapper> queueConsumers() {
        return this.queueConsumers;
    }

    public ConsumerTransactionProvider consumerTransactionProvider() {
        return this.consumerTransactionProvider;
    }

    public Vertx vertx() {
        return this.vertx;
    }
}
