package io.es4j.infrastructure.pgbroker.models;

import io.es4j.infrastructure.pgbroker.ConsumerTransactionProvider;
import io.es4j.infrastructure.pgbroker.QueueConsumer;
import io.es4j.infrastructure.pgbroker.exceptions.ConsumerExeception;
import io.es4j.infrastructure.pgbroker.exceptions.DuplicateMessage;
import io.es4j.infrastructure.pgbroker.exceptions.InvalidProcessorException;
import io.es4j.infrastructure.pgbroker.exceptions.MessageParsingException;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/es4j/infrastructure/pgbroker/models/ConsumerManager.class */
public final class ConsumerManager extends Record {
    private final PgBrokerConfiguration pgBrokerConfiguration;
    private final List<ConsumerWrap> consumerWraps;
    private final ConsumerTransactionProvider consumerTransactionProvider;
    private final Vertx vertx;
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerManager.class);

    public ConsumerManager(PgBrokerConfiguration pgBrokerConfiguration, List<ConsumerWrap> list, ConsumerTransactionProvider consumerTransactionProvider, Vertx vertx) {
        this.pgBrokerConfiguration = pgBrokerConfiguration;
        this.consumerWraps = list;
        this.consumerTransactionProvider = consumerTransactionProvider;
        this.vertx = vertx;
    }

    public Uni<RawMessage> consumeMessage(RawMessage rawMessage) {
        try {
            QueueConsumer resolveProcessor = resolveProcessor(rawMessage);
            Message<?> parseMessage = parseMessage(resolveProcessor, rawMessage);
            return this.consumerTransactionProvider.transaction(resolveProcessor.getClass().getName(), parseMessage, (message, consumerTransaction) -> {
                try {
                    return resolveProcessor.blockingProcessor().booleanValue() ? this.vertx.executeBlocking(process(parseMessage, resolveProcessor, consumerTransaction).onFailure().transform(ConsumerExeception::new)) : process(parseMessage, resolveProcessor, consumerTransaction).onFailure().transform(ConsumerExeception::new);
                } catch (Exception e) {
                    throw new ConsumerExeception(e);
                }
            }).onItemOrFailure().transform((r9, th) -> {
                if (th == null) {
                    LOGGER.debug("Consumer {} has correctly processed the message {}  ", resolveProcessor.getClass().getName(), rawMessage);
                    return rawMessage.withState(MessageState.PROCESSED);
                }
                if (!(th instanceof DuplicateMessage)) {
                    return retryableFailure(this.pgBrokerConfiguration, rawMessage, th, resolveProcessor);
                }
                LOGGER.debug("Duplicated message will be ignored {}  ", rawMessage);
                return rawMessage.withState(MessageState.PROCESSED);
            });
        } catch (Exception e) {
            return Uni.createFrom().item(rawMessage.withState(MessageState.FATAL_FAILURE));
        }
    }

    private Uni<Void> process(Message<?> message, QueueConsumer queueConsumer, ConsumerTransaction consumerTransaction) {
        return queueConsumer.process(message.payload(), consumerTransaction);
    }

    private QueueConsumer resolveProcessor(RawMessage rawMessage) {
        return (QueueConsumer) this.consumerWraps.stream().filter(consumerWrap -> {
            return consumerWrap.doesMessageMatch(rawMessage);
        }).findFirst().map(consumerWrap2 -> {
            return consumerWrap2.resolveProcessor(rawMessage.tenant());
        }).orElseThrow(() -> {
            LOGGER.error("Unable to find processor for message -> " + rawMessage.id());
            return new InvalidProcessorException();
        });
    }

    private Message<?> parseMessage(QueueConsumer queueConsumer, RawMessage rawMessage) {
        try {
            return new Message<>(rawMessage.id(), rawMessage.tenant(), String.valueOf(rawMessage.partitionId()), rawMessage.scheduled(), rawMessage.expiration(), rawMessage.priority(), queueConsumer.parse(rawMessage.payload(), Class.forName(rawMessage.payloadClass())));
        } catch (ClassNotFoundException e) {
            LOGGER.error("Could not find class for message -> " + rawMessage.id());
            throw new MessageParsingException();
        }
    }

    private <T> RawMessage retryableFailure(PgBrokerConfiguration pgBrokerConfiguration, RawMessage rawMessage, Throwable th, QueueConsumer<T> queueConsumer) {
        MessageState messageState;
        if (queueConsumer.fatalExceptions().stream().anyMatch(cls -> {
            return cls.isAssignableFrom(th.getCause().getClass());
        })) {
            LOGGER.error("Fatal failure {}, message will be sent to dead letter queue {} ", new Object[]{queueConsumer.getClass().getName(), rawMessage, th.getCause()});
            messageState = MessageState.FATAL_FAILURE;
        } else if (rawMessage.retryCounter().intValue() + 1 > pgBrokerConfiguration.maxRetry().intValue()) {
            LOGGER.error("Retries exhausted for {} ", rawMessage, th.getCause());
            messageState = MessageState.RETRIES_EXHAUSTED;
        } else {
            LOGGER.error(String.format("Failure in task processor %s || Message %s will be queued for retry -> ", queueConsumer.getClass().getName(), rawMessage.id()), th.getCause());
            messageState = MessageState.RETRY;
        }
        return rawMessage.withFailure(messageState, th);
    }

    @Override // java.lang.Record
    public final String toString() {
        return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ConsumerManager.class), ConsumerManager.class, "pgBrokerConfiguration;consumerWraps;consumerTransactionProvider;vertx", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->pgBrokerConfiguration:Lio/es4j/infrastructure/pgbroker/models/PgBrokerConfiguration;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->consumerWraps:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->consumerTransactionProvider:Lio/es4j/infrastructure/pgbroker/ConsumerTransactionProvider;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->vertx:Lio/vertx/mutiny/core/Vertx;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final int hashCode() {
        return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ConsumerManager.class), ConsumerManager.class, "pgBrokerConfiguration;consumerWraps;consumerTransactionProvider;vertx", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->pgBrokerConfiguration:Lio/es4j/infrastructure/pgbroker/models/PgBrokerConfiguration;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->consumerWraps:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->consumerTransactionProvider:Lio/es4j/infrastructure/pgbroker/ConsumerTransactionProvider;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->vertx:Lio/vertx/mutiny/core/Vertx;").dynamicInvoker().invoke(this) /* invoke-custom */;
    }

    @Override // java.lang.Record
    public final boolean equals(Object obj) {
        return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ConsumerManager.class, Object.class), ConsumerManager.class, "pgBrokerConfiguration;consumerWraps;consumerTransactionProvider;vertx", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->pgBrokerConfiguration:Lio/es4j/infrastructure/pgbroker/models/PgBrokerConfiguration;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->consumerWraps:Ljava/util/List;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->consumerTransactionProvider:Lio/es4j/infrastructure/pgbroker/ConsumerTransactionProvider;", "FIELD:Lio/es4j/infrastructure/pgbroker/models/ConsumerManager;->vertx:Lio/vertx/mutiny/core/Vertx;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
    }

    public PgBrokerConfiguration pgBrokerConfiguration() {
        return this.pgBrokerConfiguration;
    }

    public List<ConsumerWrap> consumerWraps() {
        return this.consumerWraps;
    }

    public ConsumerTransactionProvider consumerTransactionProvider() {
        return this.consumerTransactionProvider;
    }

    public Vertx vertx() {
        return this.vertx;
    }
}
