package org.jzenith.kafka.consumer;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import io.reactivex.Single;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumerRecord;
import java.io.IOException;
import java.util.List;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.jzenith.core.JZenithException;
import org.jzenith.kafka.model.AbstractMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jzenith/kafka/consumer/TopicHandlerDispatcher.class */
public class TopicHandlerDispatcher {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TopicHandlerDispatcher.class);
    private final Multimap<String, TopicHandler<AbstractMessage>> topicHandlers;
    private final ObjectMapper objectMapper;

    public TopicHandlerDispatcher(ObjectMapper objectMapper, Multimap<String, TopicHandler<AbstractMessage>> multimap) {
        this.topicHandlers = ImmutableMultimap.copyOf(multimap);
        this.objectMapper = objectMapper;
    }

    public Single<DispatcherResult> handle(KafkaConsumerRecord<String, String> kafkaConsumerRecord) {
        String str = (String) kafkaConsumerRecord.value();
        try {
            JsonNode jsonNode = this.objectMapper.readTree(str).get("type");
            if (jsonNode == null) {
                log.warn("Could not get message type, message is skipped:\n{}");
                return Single.just(DispatcherResult.skip(kafkaConsumerRecord));
            }
            String asText = jsonNode.asText();
            if (StringUtils.isBlank(asText)) {
                log.warn("No type in value, message is skipped:\n{}", str);
                return Single.just(DispatcherResult.skip(kafkaConsumerRecord));
            }
            try {
                try {
                    AbstractMessage abstractMessage = (AbstractMessage) this.objectMapper.readValue(str, Class.forName(asText));
                    return Single.concat((List) this.topicHandlers.get(kafkaConsumerRecord.topic()).parallelStream().map(topicHandler -> {
                        return Single.just(abstractMessage).flatMap(abstractMessage2 -> {
                            return topicHandler.handleMessage(Single.just(abstractMessage2));
                        }).onErrorReturn(HandlerResult::fail);
                    }).collect(ImmutableList.toImmutableList())).toList().map(list -> {
                        return DispatcherResult.create(kafkaConsumerRecord, list);
                    });
                } catch (IOException e) {
                    throw new JZenithException(e);
                }
            } catch (ClassNotFoundException e2) {
                log.warn("Can't load class {}, message will be skipped:\n{}", asText, str);
                return Single.just(DispatcherResult.skip(kafkaConsumerRecord));
            }
        } catch (IOException e3) {
            log.warn("Could not parse message, message is skipped:\n{}", str, e3);
            return Single.just(DispatcherResult.skip(kafkaConsumerRecord));
        }
    }
}
