package org.reactivecommons.async.rabbit.listeners;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.FallbackStrategy;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/reactivecommons/async/rabbit/listeners/GenericMessageListener.class */
public abstract class GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(GenericMessageListener.class.getName());
    public static final int DEFAULT_RETRIES_DLQ = 10;
    private final Receiver receiver;
    private final ReactiveMessageListener messageListener;
    protected final String queueName;
    private final boolean useDLQRetries;
    private final boolean createTopology;
    private final long maxRetries;
    private final Duration retryDelay;
    private final DiscardNotifier discardNotifier;
    private final String objectType;
    private final CustomReporter customReporter;
    private volatile Flux<AcknowledgableDelivery> messageFlux;
    private final ConcurrentHashMap<String, Function<Message, Mono<Object>>> handlers = new ConcurrentHashMap<>();
    private final Scheduler scheduler = Schedulers.newParallel(getClass().getSimpleName(), 12);
    private final Scheduler errorReporterScheduler = Schedulers.newBoundedElastic(4, 256, "errorReporterScheduler");

    public GenericMessageListener(String str, ReactiveMessageListener reactiveMessageListener, boolean z, boolean z2, long j, long j2, DiscardNotifier discardNotifier, String str2, CustomReporter customReporter) {
        this.receiver = reactiveMessageListener.getReceiver();
        this.queueName = str;
        this.messageListener = reactiveMessageListener;
        this.createTopology = z2;
        this.maxRetries = resolveRetries(z, j);
        this.retryDelay = Duration.ofMillis(j2);
        this.useDLQRetries = z;
        this.discardNotifier = discardNotifier;
        this.objectType = str2;
        this.customReporter = customReporter;
    }

    private static long resolveRetries(boolean z, long j) {
        if (z && j == -1) {
            return 10L;
        }
        return j;
    }

    private boolean hasLocalRetries() {
        return (this.useDLQRetries || this.maxRetries == -1) ? false : true;
    }

    protected Mono<Void> setUpBindings(TopologyCreator topologyCreator) {
        return Mono.empty();
    }

    public void startListener() {
        log.log(Level.INFO, "Using max concurrency {0}, in queue: {1}", new Object[]{this.messageListener.getMaxConcurrency(), this.queueName});
        if (this.useDLQRetries) {
            log.log(Level.INFO, "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!", new Object[]{Long.valueOf(this.maxRetries)});
        } else {
            log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy");
        }
        ConsumeOptions consumeOptions = new ConsumeOptions();
        consumeOptions.qos(this.messageListener.getPrefetchCount().intValue());
        if (this.createTopology) {
            this.messageFlux = setUpBindings(this.messageListener.getTopologyCreator()).thenMany(this.receiver.consumeManualAck(this.queueName, consumeOptions).transform(this::consumeFaultTolerant));
        } else {
            this.messageFlux = this.receiver.consumeManualAck(this.queueName, consumeOptions).doOnError(th -> {
                log.log(Level.SEVERE, "Error listening queue", th);
            }).transform(this::consumeFaultTolerant);
        }
        onTerminate();
    }

    private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> flux) {
        return flux.flatMap(acknowledgableDelivery -> {
            Instant now = Instant.now();
            return handle(acknowledgableDelivery, now).doOnSuccess((v0) -> {
                v0.ack();
            }).onErrorResume(th -> {
                return requeueOrAck(acknowledgableDelivery, th, now);
            });
        }, this.messageListener.getMaxConcurrency().intValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery acknowledgableDelivery, Instant instant) {
        try {
            String executorPath = getExecutorPath(acknowledgableDelivery);
            Function<Message, Mono<Object>> executor = getExecutor(executorPath);
            RabbitMessage fromDelivery = RabbitMessage.fromDelivery(acknowledgableDelivery);
            Mono transform = Mono.defer(() -> {
                return (Mono) executor.apply(fromDelivery);
            }).transform(enrichPostProcess(fromDelivery));
            if (hasLocalRetries()) {
                transform = transform.retryWhen(Retry.fixedDelay(this.maxRetries, this.retryDelay)).onErrorMap(th -> {
                    if (th.getMessage() == null || !th.getMessage().contains("Retries exhausted")) {
                        return th;
                    }
                    log.warning(th.getMessage());
                    return th.getCause();
                });
            }
            return transform.doOnSuccess(obj -> {
                logExecution(executorPath, instant, true);
            }).subscribeOn(this.scheduler).thenReturn(acknowledgableDelivery);
        } catch (Exception e) {
            log.log(Level.SEVERE, String.format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", acknowledgableDelivery.getProperties().getMessageId()));
            return Mono.error(e);
        }
    }

    private void onTerminate() {
        this.messageFlux.doOnTerminate(this::onTerminate).subscribe(new LoggerSubscriber(getClass().getName()));
    }

    private void logExecution(String str, Instant instant, boolean z) {
        try {
            long millis = Duration.between(instant, Instant.now()).toMillis();
            doLogExecution(str, millis);
            this.customReporter.reportMetric(this.objectType, str, Long.valueOf(millis), z);
        } catch (Exception e) {
            log.log(Level.WARNING, "Unable to send execution metrics!", (Throwable) e);
        }
    }

    private void reportErrorMetric(AcknowledgableDelivery acknowledgableDelivery, Instant instant) {
        String str;
        try {
            str = getExecutorPath(acknowledgableDelivery);
        } catch (Exception e) {
            str = "unknown";
        }
        logExecution(str, instant, false);
    }

    private void doLogExecution(String str, long j) {
        log.log(Level.FINE, String.format("%s with path %s handled, took %d ms", this.objectType, str, Long.valueOf(j)));
    }

    protected void logError(Throwable th, AcknowledgableDelivery acknowledgableDelivery, FallbackStrategy fallbackStrategy) {
        String messageId = acknowledgableDelivery.getProperties().getMessageId();
        try {
            try {
                log.log(Level.SEVERE, String.format("Error encounter while processing message %s: %s", messageId, th.toString()), th);
                log.warning(String.format("Message %s Headers: %s", messageId, acknowledgableDelivery.getProperties().getHeaders().toString()));
                log.warning(String.format("Message %s Body: %s", messageId, new String(acknowledgableDelivery.getBody())));
                log.warning(String.format(fallbackStrategy.message, messageId));
            } catch (Exception e) {
                log.log(Level.SEVERE, "Error Login message Content!!", (Throwable) e);
                log.warning(String.format(fallbackStrategy.message, messageId));
            }
        } catch (Throwable th2) {
            log.warning(String.format(fallbackStrategy.message, messageId));
            throw th2;
        }
    }

    private Function<Message, Mono<Object>> getExecutor(String str) {
        return this.handlers.computeIfAbsent(str, this::rawMessageHandler);
    }

    protected abstract Function<Message, Mono<Object>> rawMessageHandler(String str);

    protected abstract String getExecutorPath(AcknowledgableDelivery acknowledgableDelivery);

    protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message message) {
        return Function.identity();
    }

    private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery acknowledgableDelivery, Throwable th, Instant instant) {
        long longValue = getRetryNumber(acknowledgableDelivery).longValue();
        RabbitMessage fromDelivery = RabbitMessage.fromDelivery(acknowledgableDelivery);
        boolean isRedeliver = acknowledgableDelivery.getEnvelope().isRedeliver();
        reportErrorMetric(acknowledgableDelivery, instant);
        sendErrorToCustomReporter(th, fromDelivery, isRedeliver || longValue > 0);
        if (hasLocalRetries() || longValue >= this.maxRetries) {
            logError(th, acknowledgableDelivery, FallbackStrategy.DEFINITIVE_DISCARD);
            return this.discardNotifier.notifyDiscard(fromDelivery).doOnSuccess(r3 -> {
                acknowledgableDelivery.ack();
            }).thenReturn(acknowledgableDelivery);
        }
        if (!this.useDLQRetries) {
            logError(th, acknowledgableDelivery, FallbackStrategy.FAST_RETRY);
            return Mono.just(acknowledgableDelivery).delayElement(this.retryDelay).doOnNext(acknowledgableDelivery2 -> {
                acknowledgableDelivery2.nack(true);
            });
        }
        logError(th, acknowledgableDelivery, FallbackStrategy.RETRY_DLQ);
        acknowledgableDelivery.nack(false);
        return Mono.just(acknowledgableDelivery);
    }

    private void sendErrorToCustomReporter(Throwable th, Message message, boolean z) {
        try {
            this.customReporter.reportError(th, message, parseMessageForReporter(message), z).subscribeOn(this.errorReporterScheduler).doOnError(th2 -> {
                log.log(Level.WARNING, "Error sending error to external reporter", th2);
            }).subscribe();
        } catch (Throwable th3) {
            log.log(Level.WARNING, "Error in scheduler when sending error to external reporter", th3);
        }
    }

    private Long getRetryNumber(AcknowledgableDelivery acknowledgableDelivery) {
        return (Long) Optional.ofNullable(acknowledgableDelivery.getProperties()).map((v0) -> {
            return v0.getHeaders();
        }).map(map -> {
            return (List) map.get("x-death");
        }).filter(list -> {
            return !list.isEmpty();
        }).map(list2 -> {
            return (HashMap) list2.get(0);
        }).map(hashMap -> {
            return (Long) hashMap.get("count");
        }).orElse(0L);
    }

    protected abstract Object parseMessageForReporter(Message message);
}
