package org.reactivecommons.async.kafka.listeners;

import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.CloudEventHandler;
import org.reactivecommons.async.api.handlers.DomainEventHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.EventExecutor;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.kafka.communications.ReactiveMessageListener;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverRecord;

/* loaded from: input_file:org/reactivecommons/async/kafka/listeners/ApplicationNotificationsListener.class */
public class ApplicationNotificationsListener extends GenericMessageListener {

    @Generated
    private static final Logger log = Logger.getLogger(ApplicationNotificationsListener.class.getName());
    private final MessageConverter messageConverter;
    private final HandlerResolver resolver;

    public ApplicationNotificationsListener(ReactiveMessageListener reactiveMessageListener, HandlerResolver handlerResolver, MessageConverter messageConverter, boolean z, boolean z2, long j, int i, DiscardNotifier discardNotifier, CustomReporter customReporter, String str) {
        super(reactiveMessageListener, z, z2, j, i, discardNotifier, "event", customReporter, str + "-notification-" + UUID.randomUUID(), handlerResolver.getNotificationNames());
        this.resolver = handlerResolver;
        this.messageConverter = messageConverter;
    }

    @Override // org.reactivecommons.async.kafka.listeners.GenericMessageListener
    protected Function<Message, Mono<Object>> rawMessageHandler(String str) {
        RegisteredEventListener eventListener = this.resolver.getEventListener(str);
        EventExecutor eventExecutor = new EventExecutor(eventListener.getHandler(), resolveConverter(eventListener));
        return message -> {
            return eventExecutor.execute(message).cast(Object.class);
        };
    }

    @Override // org.reactivecommons.async.kafka.listeners.GenericMessageListener
    protected String getExecutorPath(ReceiverRecord<String, byte[]> receiverRecord) {
        return receiverRecord.topic();
    }

    @Override // org.reactivecommons.async.kafka.listeners.GenericMessageListener
    protected Object parseMessageForReporter(Message message) {
        return this.messageConverter.readDomainEventStructure(message);
    }

    private <T, D> Function<Message, Object> resolveConverter(RegisteredEventListener<T, D> registeredEventListener) {
        if (registeredEventListener.getHandler() instanceof DomainEventHandler) {
            Class inputClass = registeredEventListener.getInputClass();
            return message -> {
                return this.messageConverter.readDomainEvent(message, inputClass);
            };
        }
        if (!(registeredEventListener.getHandler() instanceof CloudEventHandler)) {
            throw new RuntimeException("Unknown handler type");
        }
        MessageConverter messageConverter = this.messageConverter;
        Objects.requireNonNull(messageConverter);
        return messageConverter::readCloudEvent;
    }
}
