package pcosta.kafka.internal;

import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pcosta.kafka.api.MessageFilter;
import pcosta.kafka.api.MessageKey;
import pcosta.kafka.api.MessageListener;
import pcosta.kafka.api.MessageListenerConfiguration;
import pcosta.kafka.api.MessagingException;
import pcosta.kafka.api.PlatformError;
import pcosta.kafka.api.PlatformErrorListener;
import pcosta.kafka.message.KafkaMessageProto;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:pcosta/kafka/internal/MessageReceiver.class */
public class MessageReceiver {
    private static final Logger log = LoggerFactory.getLogger(MessageReceiver.class);
    private final Map<String, MessageProcessor> processors = new HashMap();
    private PlatformErrorListener errorListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:pcosta/kafka/internal/MessageReceiver$MessageProcessor.class */
    public static class MessageProcessor {
        private final String topic;
        final int partition;
        long initialOffset;
        private final KafkaReceiver<String, byte[]> kafkaReceiver;
        private PlatformErrorListener errorListener;
        private final Map<Class<?>, Collection<MessageListener>> listenersMap = new HashMap();
        private final Map<String, ProtoBufType> supportedTypes = new HashMap();
        private final Map<String, Collection<MessageFilter>> filtersMap = new HashMap();
        private final ProtobufDeserializer protoDeserializer = new ProtobufDeserializer();

        MessageProcessor(String str, long j, int i, Class<?> cls, Collection<MessageFilter> collection, Collection<MessageListener> collection2, ExtensionRegistry extensionRegistry, PlatformErrorListener platformErrorListener) {
            this.topic = str;
            this.initialOffset = j;
            this.partition = i;
            this.errorListener = platformErrorListener;
            this.filtersMap.put(cls.getName(), collection);
            this.supportedTypes.put(cls.getName(), new ProtoBufType(cls, extensionRegistry));
            this.listenersMap.put(cls, collection2);
            this.kafkaReceiver = new KafkaReceiver<>(this.topic, new StringDeserializer(), new ByteArrayDeserializer(), this);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void process(byte[] bArr, String str, MessageKey messageKey, long j) {
            try {
                Message parseFrom = KafkaMessageProto.KafkaMessage.parseFrom(bArr);
                ProtoBufType orDefault = this.supportedTypes.getOrDefault(parseFrom.getPayloadClass(), this.supportedTypes.get(KafkaMessageProto.KafkaMessage.class.getName()) == null ? null : ProtoBufType.DEFAULT_PROTO_TYPE);
                if (Objects.isNull(orDefault)) {
                    String format = String.format("Received unsupported payload. Source: %s , Type: %s", str, parseFrom);
                    MessageReceiver.log.warn(format);
                    this.errorListener.onError(new PlatformErrorImpl(format, new MessagingException(format)));
                } else {
                    if (isFiltered(str, orDefault.messageType)) {
                        return;
                    }
                    Message parseFromV3 = orDefault == ProtoBufType.DEFAULT_PROTO_TYPE ? parseFrom : this.protoDeserializer.parseFromV3(parseFrom, orDefault.extensionRegistry);
                    if (MessageReceiver.log.isDebugEnabled()) {
                        MessageReceiver.log.debug("message has been received:{}CorrelationId: {}{}source topic: {}{}payload: {}{}", new Object[]{System.lineSeparator(), str, System.lineSeparator(), parseFromV3.toString(), System.lineSeparator()});
                    }
                    Message message = parseFromV3;
                    this.listenersMap.get(orDefault.messageType).forEach(messageListener -> {
                        MessageReceiver.log.debug("delivering message to {}", messageListener.getClass().getSimpleName());
                        messageListener.onMessage(new KafkaMetadata(str, messageKey, j, parseFrom.getTraceabilityId()), message);
                    });
                }
            } catch (Exception e) {
                this.errorListener.onError(new PlatformErrorImpl(e.getClass().getName(), e.getCause()));
                MessageReceiver.log.error("Error processing message: ", e);
            }
        }

        private boolean isFiltered(String str, Class<?> cls) {
            return this.filtersMap.getOrDefault(cls.getName(), Collections.emptyList()).stream().filter(messageFilter -> {
                return messageFilter.isEnabled() && messageFilter.filter(str, cls);
            }).peek(messageFilter2 -> {
                MessageReceiver.log.warn("message {} from {} is not going to be delivered to application due to filter: {}", new Object[]{cls, str, messageFilter2.getClass().getSimpleName()});
            }).findAny().isPresent();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void registerListeners(Class<?> cls, Collection<MessageListener> collection, ExtensionRegistry extensionRegistry) {
            this.listenersMap.computeIfAbsent(cls, cls2 -> {
                return new HashSet();
            }).addAll(collection);
            this.supportedTypes.putIfAbsent(cls.getName(), new ProtoBufType(cls, extensionRegistry));
            MessageReceiver.log.info("A listener for {} type was registered for topic: {}", cls, this.topic);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void processError(PlatformError platformError) {
            MessageReceiver.log.debug("delivering error to handler..");
            this.errorListener.onError(platformError);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void startReceiver() {
            this.kafkaReceiver.start();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void stopReceiver() {
            this.kafkaReceiver.stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:pcosta/kafka/internal/MessageReceiver$ProtoBufType.class */
    public static class ProtoBufType {
        static final ProtoBufType DEFAULT_PROTO_TYPE = new ProtoBufType(KafkaMessageProto.KafkaMessage.class, ExtensionRegistry.getEmptyRegistry());
        final Class<?> messageType;
        final ExtensionRegistry extensionRegistry;

        ProtoBufType(Class<?> cls, ExtensionRegistry extensionRegistry) {
            this.messageType = cls;
            this.extensionRegistry = extensionRegistry;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final <M extends Message> void registerListener(MessageListenerConfiguration<M> messageListenerConfiguration) {
        messageListenerConfiguration.getTopics().forEach(str -> {
            if (this.processors.containsKey(str)) {
                MessageProcessor messageProcessor = this.processors.get(str);
                messageProcessor.registerListeners(messageListenerConfiguration.getMessageType(), messageListenerConfiguration.getMessageListeners(), messageListenerConfiguration.getExtensionRegistry());
                if (messageListenerConfiguration.getOffset() > messageProcessor.initialOffset) {
                    messageProcessor.initialOffset = messageListenerConfiguration.getOffset();
                }
            } else {
                this.processors.putIfAbsent(str, new MessageProcessor(str, messageListenerConfiguration.getOffset(), messageListenerConfiguration.getPartition(), messageListenerConfiguration.getMessageType(), messageListenerConfiguration.getMessageFilters(), messageListenerConfiguration.getMessageListeners(), messageListenerConfiguration.getExtensionRegistry(), this.errorListener));
            }
            log.info("A listener for {} type was registered for topic: {}", messageListenerConfiguration.getMessageType(), str);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void registerErrorListener(PlatformErrorListener platformErrorListener) {
        this.errorListener = platformErrorListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void start() {
        log.info("Starting the kafka message receivers");
        this.processors.forEach((str, messageProcessor) -> {
            messageProcessor.startReceiver();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void close() {
        log.info("Stopping all kafka listenersMap..");
        this.processors.forEach((str, messageProcessor) -> {
            messageProcessor.stopReceiver();
        });
    }
}
