package com.networknt.tram.consumer;

import com.networknt.config.JsonMapper;
import com.networknt.tram.message.common.Message;
import com.networknt.tram.message.common.MessageImpl;
import com.networknt.tram.message.consumer.MessageConsumer;
import com.networknt.tram.message.consumer.MessageHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/tram/consumer/MessageConsumerKafkaImpl.class */
public class MessageConsumerKafkaImpl implements MessageConsumer {
    private String bootstrapServers;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private List<TramKafkaConsumer> consumers = new ArrayList();

    public MessageConsumerKafkaImpl(String str) {
        this.bootstrapServers = str;
    }

    public void subscribe(String str, Set<String> set, MessageHandler messageHandler) {
        SwimlaneBasedDispatcher swimlaneBasedDispatcher = new SwimlaneBasedDispatcher(str, Executors.newCachedThreadPool());
        TramKafkaConsumer tramKafkaConsumer = new TramKafkaConsumer(str, (consumerRecord, biConsumer) -> {
            swimlaneBasedDispatcher.dispatch(toMessage(consumerRecord), Integer.valueOf(consumerRecord.partition()), message -> {
                try {
                    this.logger.trace("Invoking handler {} {}", str, message.getId());
                    messageHandler.accept(message);
                } catch (Throwable th) {
                    this.logger.trace("Got exception {} {}", str, message.getId());
                    this.logger.trace("Got exception ", th);
                    biConsumer.accept(null, th);
                }
                this.logger.trace("handled message {} {}", str, message.getId());
                biConsumer.accept(null, null);
            });
        }, new ArrayList(set));
        this.consumers.add(tramKafkaConsumer);
        tramKafkaConsumer.start();
    }

    public void close() {
        this.consumers.forEach((v0) -> {
            v0.stop();
        });
    }

    private Message toMessage(ConsumerRecord<String, String> consumerRecord) {
        return (Message) JsonMapper.fromJson((String) consumerRecord.value(), MessageImpl.class);
    }
}
