package org.reactivecommons.async.kafka.listeners;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.FallbackStrategy;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.kafka.KafkaMessage;
import org.reactivecommons.async.kafka.communications.ReactiveMessageListener;
import org.reactivecommons.async.kafka.communications.topology.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.util.retry.Retry;

/* loaded from: input_file:org/reactivecommons/async/kafka/listeners/GenericMessageListener.class */
public abstract class GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(GenericMessageListener.class.getName());
    private final ReactiveMessageListener messageListener;
    private final List<String> topics;
    private final String groupId;
    private final boolean useDLQ;
    private final boolean createTopology;
    private final long maxRetries;
    private final Duration retryDelay;
    private final DiscardNotifier discardNotifier;
    private final String objectType;
    private final CustomReporter customReporter;
    private volatile Flux<ReceiverRecord<String, byte[]>> messageFlux;
    private final ConcurrentHashMap<String, Function<Message, Mono<Object>>> handlers = new ConcurrentHashMap<>();
    private final Scheduler scheduler = Schedulers.newParallel(getClass().getSimpleName(), 12);
    private final Scheduler errorReporterScheduler = Schedulers.newBoundedElastic(4, 256, "errorReporterScheduler");

    /* JADX INFO: Access modifiers changed from: protected */
    public GenericMessageListener(ReactiveMessageListener reactiveMessageListener, boolean z, boolean z2, long j, long j2, DiscardNotifier discardNotifier, String str, CustomReporter customReporter, String str2, List<String> list) {
        this.groupId = str2;
        this.topics = list;
        this.messageListener = reactiveMessageListener;
        this.createTopology = z2;
        this.maxRetries = j;
        this.retryDelay = Duration.ofMillis(j2);
        this.useDLQ = z;
        this.discardNotifier = discardNotifier;
        this.objectType = str;
        this.customReporter = customReporter;
    }

    private boolean hasRetries() {
        return this.maxRetries != -1;
    }

    protected Mono<Void> setUpBindings(TopologyCreator topologyCreator) {
        return topologyCreator.createTopics(this.topics);
    }

    public void startListener(TopologyCreator topologyCreator) {
        log.log(Level.INFO, "Using max concurrency {0}, in receiver for topics: {1}", new Object[]{Integer.valueOf(this.messageListener.getMaxConcurrency()), this.topics});
        if (this.useDLQ) {
            log.log(Level.INFO, "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!", new Object[]{Long.valueOf(this.maxRetries)});
        } else {
            log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy");
        }
        if (this.createTopology) {
            this.messageFlux = setUpBindings(topologyCreator).thenMany(this.messageListener.listen(this.groupId, this.topics).doOnError(th -> {
                log.log(Level.SEVERE, "Error listening queue", th);
            }).transform(this::consumeFaultTolerant));
        } else {
            this.messageFlux = this.messageListener.listen(this.groupId, this.topics).doOnError(th2 -> {
                log.log(Level.SEVERE, "Error listening queue", th2);
            }).transform(this::consumeFaultTolerant);
        }
        onTerminate();
    }

    private void onTerminate() {
        this.messageFlux.doOnTerminate(this::onTerminate).subscribe(new LoggerSubscriber(getClass().getName()));
    }

    private Flux<ReceiverRecord<String, byte[]>> consumeFaultTolerant(Flux<ReceiverRecord<String, byte[]>> flux) {
        return flux.flatMap(receiverRecord -> {
            Instant now = Instant.now();
            return handle(receiverRecord, now).doOnSuccess(receiverRecord -> {
                receiverRecord.receiverOffset().acknowledge();
            }).onErrorResume(th -> {
                return requeueOrAck(receiverRecord, th, now);
            });
        }, this.messageListener.getMaxConcurrency());
    }

    protected Mono<ReceiverRecord<String, byte[]>> handle(ReceiverRecord<String, byte[]> receiverRecord, Instant instant) {
        try {
            String executorPath = getExecutorPath(receiverRecord);
            Function<Message, Mono<Object>> executor = getExecutor(executorPath);
            KafkaMessage fromDelivery = KafkaMessage.fromDelivery(receiverRecord, executorPath);
            Mono transform = Mono.defer(() -> {
                return (Mono) executor.apply(fromDelivery);
            }).transform(enrichPostProcess(fromDelivery));
            if (hasRetries()) {
                transform = transform.retryWhen(Retry.fixedDelay(this.maxRetries, this.retryDelay)).onErrorMap(th -> {
                    if (th.getMessage() == null || !th.getMessage().contains("Retries exhausted")) {
                        return th;
                    }
                    log.warning(th.getMessage());
                    return th.getCause();
                });
            }
            return transform.doOnSuccess(obj -> {
                logExecution(executorPath, instant, true);
            }).subscribeOn(this.scheduler).thenReturn(receiverRecord);
        } catch (Exception e) {
            log.log(Level.SEVERE, String.format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", receiverRecord.key()));
            return Mono.error(e);
        }
    }

    private Mono<ReceiverRecord<String, byte[]>> requeueOrAck(ReceiverRecord<String, byte[]> receiverRecord, Throwable th, Instant instant) {
        KafkaMessage fromDelivery = KafkaMessage.fromDelivery(receiverRecord);
        reportErrorMetric(receiverRecord, instant);
        sendErrorToCustomReporter(th, fromDelivery, hasRetries());
        if (hasRetries()) {
            logError(th, receiverRecord, FallbackStrategy.DEFINITIVE_DISCARD);
            return this.useDLQ ? this.discardNotifier.notifyDiscard(fromDelivery).doOnSuccess(r3 -> {
                receiverRecord.receiverOffset().acknowledge();
            }).thenReturn(receiverRecord) : Mono.just(receiverRecord);
        }
        logError(th, receiverRecord, FallbackStrategy.FAST_RETRY);
        return Mono.just(receiverRecord).delayElement(this.retryDelay);
    }

    private void logExecution(String str, Instant instant, boolean z) {
        try {
            long millis = Duration.between(instant, Instant.now()).toMillis();
            doLogExecution(str, millis);
            this.customReporter.reportMetric(this.objectType, str, Long.valueOf(millis), z);
        } catch (Exception e) {
            log.log(Level.WARNING, "Unable to send execution metrics!", (Throwable) e);
        }
    }

    private void reportErrorMetric(ReceiverRecord<String, byte[]> receiverRecord, Instant instant) {
        String str;
        try {
            str = getExecutorPath(receiverRecord);
        } catch (Exception e) {
            str = "unknown";
        }
        logExecution(str, instant, false);
    }

    private void doLogExecution(String str, long j) {
        log.log(Level.FINE, String.format("%s with path %s handled, took %d ms", this.objectType, str, Long.valueOf(j)));
    }

    protected void logError(Throwable th, ReceiverRecord<String, byte[]> receiverRecord, FallbackStrategy fallbackStrategy) {
        String str = (String) receiverRecord.key();
        try {
            try {
                log.log(Level.SEVERE, String.format("Error encounter while processing message %s: %s", str, th.toString()), th);
                log.warning(String.format("Message %s Headers: %s", str, receiverRecord.headers().toString()));
                log.warning(String.format("Message %s Body: %s", str, new String((byte[]) receiverRecord.value())));
                log.warning(String.format(fallbackStrategy.message, str));
            } catch (Exception e) {
                log.log(Level.SEVERE, "Error Login message Content!!", (Throwable) e);
                log.warning(String.format(fallbackStrategy.message, str));
            }
        } catch (Throwable th2) {
            log.warning(String.format(fallbackStrategy.message, str));
            throw th2;
        }
    }

    private Function<Message, Mono<Object>> getExecutor(String str) {
        Function<Message, Mono<Object>> function = this.handlers.get(str);
        return function != null ? function : computeRawMessageHandler(str);
    }

    private Function<Message, Mono<Object>> computeRawMessageHandler(String str) {
        return this.handlers.computeIfAbsent(str, str2 -> {
            return rawMessageHandler(str);
        });
    }

    protected abstract Function<Message, Mono<Object>> rawMessageHandler(String str);

    protected abstract String getExecutorPath(ReceiverRecord<String, byte[]> receiverRecord);

    protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message message) {
        return Function.identity();
    }

    private void sendErrorToCustomReporter(Throwable th, Message message, boolean z) {
        try {
            this.customReporter.reportError(th, message, parseMessageForReporter(message), z).subscribeOn(this.errorReporterScheduler).doOnError(th2 -> {
                log.log(Level.WARNING, "Error sending error to external reporter", th2);
            }).subscribe();
        } catch (Throwable th3) {
            log.log(Level.WARNING, "Error in scheduler when sending error to external reporter", th3);
        }
    }

    protected abstract Object parseMessageForReporter(Message message);
}
