/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.impl.listeners;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.impl.DiscardNotifier;
import org.reactivecommons.async.impl.FallbackStrategy;
import org.reactivecommons.async.impl.RabbitMessage;
import org.reactivecommons.async.impl.communications.Message;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.TopologyCreator;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;

public abstract class GenericMessageListener {
    @Generated
    private static final Logger log = Logger.getLogger(GenericMessageListener.class.getName());
    private final ConcurrentHashMap<String, Function<Message, Mono<Object>>> handlers = new ConcurrentHashMap();
    private final Receiver receiver;
    private final ReactiveMessageListener messageListener;
    final String queueName;
    private Scheduler scheduler = Schedulers.newParallel((String)this.getClass().getSimpleName(), (int)12);
    private final boolean useDLQRetries;
    private final long maxRetries;
    private final DiscardNotifier discardNotifier;
    private final String objectType;

    public GenericMessageListener(String queueName, ReactiveMessageListener listener, boolean useDLQRetries, long maxRetries, DiscardNotifier discardNotifier, String objectType) {
        this.receiver = listener.getReceiver();
        this.queueName = queueName;
        this.messageListener = listener;
        this.maxRetries = maxRetries;
        this.useDLQRetries = useDLQRetries;
        this.discardNotifier = discardNotifier;
        this.objectType = objectType;
    }

    protected Mono<Void> setUpBindings(TopologyCreator creator) {
        return Mono.empty();
    }

    public void startListener() {
        log.log(Level.INFO, "Using max concurrency {0}, in queue: {1}", new Object[]{this.messageListener.getMaxConcurrency(), this.queueName});
        if (this.useDLQRetries) {
            log.log(Level.INFO, "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!", new Object[]{this.maxRetries});
        } else {
            log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy");
        }
        ConsumeOptions consumeOptions = new ConsumeOptions();
        consumeOptions.qos(this.messageListener.getPrefetchCount().intValue());
        this.setUpBindings(this.messageListener.getTopologyCreator()).thenMany((Publisher)this.receiver.consumeManualAck(this.queueName, consumeOptions).transform(this::consumeFaultTolerant).transform(this::outerFailureProtection)).subscribe();
    }

    private Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj) {
        String executorPath = this.getExecutorPath(msj);
        Function<Message, Mono<Object>> handler = this.getExecutor(executorPath);
        RabbitMessage message = RabbitMessage.fromDelivery((Delivery)msj);
        return Mono.defer(() -> (Mono)handler.apply(message)).transform(this.enrichPostProcess(message)).transform(this.logExecution(executorPath)).subscribeOn(this.scheduler).thenReturn((Object)msj);
    }

    private Function<Mono<Object>, Mono<Object>> logExecution(String executorPath) {
        return objectMono -> {
            Instant beforeExecutionTime = Instant.now();
            return objectMono.doOnTerminate(() -> {
                Instant afterExecutionTime = Instant.now();
                long timeElapsed = Duration.between(beforeExecutionTime, afterExecutionTime).toMillis();
                log.log(Level.FINE, String.format("%s with path %s handled, took %d ms", this.objectType, executorPath, timeElapsed));
            });
        };
    }

    private <T> Flux<T> outerFailureProtection(Flux<T> messageFlux) {
        return messageFlux.onErrorContinue(t -> true, (throwable, elem) -> {
            if (elem instanceof AcknowledgableDelivery) {
                try {
                    String messageID = ((AcknowledgableDelivery)elem).getProperties().getMessageId();
                    log.log(Level.SEVERE, String.format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", messageID));
                    this.requeueOrAck((AcknowledgableDelivery)elem, (Throwable)throwable).subscribe();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Error returning message in failure!", e);
                }
            } else {
                throw new RuntimeException((Throwable)throwable);
            }
        });
    }

    private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> messageFlux) {
        return messageFlux.flatMap(msj -> this.handle((AcknowledgableDelivery)msj).doOnSuccess(AcknowledgableDelivery::ack).onErrorResume(err -> this.requeueOrAck((AcknowledgableDelivery)msj, (Throwable)err)), this.messageListener.getMaxConcurrency().intValue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void logError(Throwable err, AcknowledgableDelivery msj, FallbackStrategy strategy) {
        String messageID = msj.getProperties().getMessageId();
        try {
            log.log(Level.SEVERE, String.format("Error encounter while processing message %s: %s", messageID, err.toString()));
            log.warning(String.format("Message %s Headers: %s", messageID, msj.getProperties().getHeaders().toString()));
            log.warning(String.format("Message %s Body: %s", messageID, new String(msj.getBody())));
        }
        catch (Exception e) {
            try {
                log.log(Level.SEVERE, "Error Login message Content!!", e);
            }
            catch (Throwable throwable) {
                log.warning(String.format(strategy.message, messageID));
                throw throwable;
            }
            log.warning(String.format(strategy.message, messageID));
        }
        log.warning(String.format(strategy.message, messageID));
    }

    private Function<Message, Mono<Object>> getExecutor(String path) {
        Function<Message, Mono<Object>> handler = this.handlers.get(path);
        return handler != null ? handler : this.computeRawMessageHandler(path);
    }

    private Function<Message, Mono<Object>> computeRawMessageHandler(String commandId) {
        return this.handlers.computeIfAbsent(commandId, s -> this.rawMessageHandler(commandId));
    }

    protected abstract Function<Message, Mono<Object>> rawMessageHandler(String var1);

    protected abstract String getExecutorPath(AcknowledgableDelivery var1);

    protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
        return Function.identity();
    }

    private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Throwable err) {
        Long retryNumber = this.getRetryNumber(msj);
        if ((msj.getEnvelope().isRedeliver() || retryNumber > 0L) && this.useDLQRetries) {
            if (retryNumber >= this.maxRetries) {
                this.logError(err, msj, FallbackStrategy.DEFINITIVE_DISCARD);
                return this.discardNotifier.notifyDiscard(RabbitMessage.fromDelivery((Delivery)msj)).doOnSuccess(_a -> msj.ack()).thenReturn((Object)msj);
            }
            this.logError(err, msj, FallbackStrategy.RETRY_DLQ);
            msj.nack(false);
            return Mono.just((Object)msj);
        }
        this.logError(err, msj, FallbackStrategy.FAST_RETRY);
        return Mono.just((Object)msj).delayElement(Duration.ofMillis(200L)).doOnNext(m -> m.nack(true));
    }

    private Long getRetryNumber(AcknowledgableDelivery delivery) {
        return Optional.ofNullable(delivery.getProperties()).map(AMQP.BasicProperties::getHeaders).map(x -> (List)x.get("x-death")).filter(list -> !list.isEmpty()).map(list -> (HashMap)list.get(0)).map(hashMap -> (Long)hashMap.get("count")).orElse(0L);
    }
}

