package org.reactivecommons.async.impl.listeners;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.impl.RabbitMessage;
import org.reactivecommons.async.impl.communications.Message;
import org.reactivecommons.async.impl.communications.ReactiveMessageListener;
import org.reactivecommons.async.impl.communications.TopologyCreator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.Receiver;
import reactor.util.function.Tuple2;

/* loaded from: input_file:org/reactivecommons/async/impl/listeners/GenericMessageListener.class */
public abstract class GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(GenericMessageListener.class.getName());
    private final ConcurrentHashMap<String, Function<Message, Mono<Object>>> handlers = new ConcurrentHashMap<>();
    private final Receiver receiver;
    private final ReactiveMessageListener messageListener;
    final String queueName;

    public GenericMessageListener(String str, ReactiveMessageListener reactiveMessageListener) {
        this.receiver = reactiveMessageListener.getReceiver();
        this.queueName = str;
        this.messageListener = reactiveMessageListener;
    }

    protected Mono<Void> setUpBindings(TopologyCreator topologyCreator) {
        return Mono.empty();
    }

    public void startListener() {
        setUpBindings(this.messageListener.getTopologyCreator()).thenMany(this.receiver.consumeManualAck(this.queueName).transform(this::consumeFaultTolerant).transform(this::outerFailureProtection)).subscribe();
    }

    private Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery acknowledgableDelivery) {
        Function<Message, Mono<Object>> executor = getExecutor(getExecutorPath(acknowledgableDelivery));
        RabbitMessage fromDelivery = RabbitMessage.fromDelivery(acknowledgableDelivery);
        return Mono.defer(() -> {
            return ((Mono) executor.apply(fromDelivery)).zipWith(Mono.just(acknowledgableDelivery)).transform(this::enrichPostProcess);
        }).thenReturn(acknowledgableDelivery);
    }

    private <T> Flux<T> outerFailureProtection(Flux<T> flux) {
        return flux.onErrorContinue(th -> {
            return true;
        }, (th2, obj) -> {
            if (obj instanceof AcknowledgableDelivery) {
                try {
                    Mono.delay(Duration.ofMillis(350L)).doOnSuccess(l -> {
                        ((AcknowledgableDelivery) obj).nack(true);
                    }).subscribe();
                    log.log(Level.SEVERE, "Outer error protection reached for Async Consumer!! Severe Warning! ", th2);
                    log.warning("Returning message to communications: " + ((AcknowledgableDelivery) obj).getProperties().getHeaders().toString());
                } catch (Exception e) {
                    log.log(Level.SEVERE, "Error returning message in failure!", (Throwable) e);
                }
            }
        });
    }

    private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> flux) {
        return flux.flatMap(acknowledgableDelivery -> {
            return handle(acknowledgableDelivery).onErrorResume(th -> {
                try {
                    log.log(Level.SEVERE, "Error encounter while processing message:", th);
                    log.warning("Returning message to communications in 200ms: " + acknowledgableDelivery.getProperties().getHeaders().toString());
                    log.warning(new String(acknowledgableDelivery.getBody()));
                } catch (Exception e) {
                    log.log(Level.SEVERE, "Log Error", (Throwable) e);
                }
                return Mono.just(acknowledgableDelivery).delayElement(Duration.ofMillis(200L)).doOnNext(acknowledgableDelivery -> {
                    acknowledgableDelivery.nack(true);
                });
            }).doOnSuccess(acknowledgableDelivery -> {
                acknowledgableDelivery.ack();
            });
        });
    }

    private Function<Message, Mono<Object>> getExecutor(String str) {
        Function<Message, Mono<Object>> function = this.handlers.get(str);
        return function != null ? function : computeRawMessageHandler(str);
    }

    private Function<Message, Mono<Object>> computeRawMessageHandler(String str) {
        return this.handlers.computeIfAbsent(str, str2 -> {
            return rawMessageHandler(str);
        });
    }

    protected abstract Function<Message, Mono<Object>> rawMessageHandler(String str);

    protected abstract String getExecutorPath(AcknowledgableDelivery acknowledgableDelivery);

    protected Mono<Object> enrichPostProcess(Mono<Tuple2<Object, AcknowledgableDelivery>> mono) {
        return mono.map((v0) -> {
            return v0.getT1();
        });
    }
}
