package org.streampipes.messaging.kafka;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.messaging.EventConsumer;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.model.grounding.TransportProtocol;
import org.streampipes.model.grounding.WildcardTopicDefinition;

/* loaded from: input_file:org/streampipes/messaging/kafka/SpKafkaConsumer.class */
public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, Runnable, Serializable {
    private String kafkaUrl;
    private String topic;
    private String groupId;
    private InternalEventProcessor<byte[]> eventProcessor;
    private volatile boolean isRunning;
    private Boolean patternTopic = false;
    private static final Logger LOG = LoggerFactory.getLogger(SpKafkaConsumer.class);

    public SpKafkaConsumer() {
    }

    public SpKafkaConsumer(String str, String str2, InternalEventProcessor<byte[]> internalEventProcessor) {
        KafkaTransportProtocol kafkaTransportProtocol = new KafkaTransportProtocol();
        kafkaTransportProtocol.setKafkaPort(Integer.parseInt(str.split(":")[1]));
        kafkaTransportProtocol.setBrokerHostname(str.split(":")[0]);
        kafkaTransportProtocol.setTopicDefinition(new SimpleTopicDefinition(str2));
        try {
            connect(kafkaTransportProtocol, internalEventProcessor);
        } catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(getProperties());
        if (this.patternTopic.booleanValue()) {
            this.topic = replaceWildcardWithPatternFormat(this.topic);
            kafkaConsumer.subscribe(Pattern.compile(this.topic), new ConsumerRebalanceListener() { // from class: org.streampipes.messaging.kafka.SpKafkaConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                }
            });
        } else {
            kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        }
        while (this.isRunning) {
            Iterator it = kafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                this.eventProcessor.onEvent(((ConsumerRecord) it.next()).value());
            }
        }
        LOG.info("Closing Kafka Consumer.");
        kafkaConsumer.close();
    }

    private String replaceWildcardWithPatternFormat(String str) {
        return str.replaceAll("\\.", "\\\\.").replaceAll("\\*", ".*");
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.kafkaUrl);
        properties.put("group.id", this.groupId);
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "10000");
        properties.put("session.timeout.ms", "30000");
        properties.put("message.max.bytes", 5000012);
        properties.put("fetch.message.max.bytes", 5000012);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("zookeeper.session.timeout.ms", "60000");
        properties.put("zookeeper.sync.time.ms", "20000");
        properties.put("client.id", UUID.randomUUID().toString());
        return properties;
    }

    public void connect(KafkaTransportProtocol kafkaTransportProtocol, InternalEventProcessor<byte[]> internalEventProcessor) throws SpRuntimeException {
        LOG.info("Kafka consumer: Connecting to " + kafkaTransportProtocol.getTopicDefinition().getActualTopicName());
        if (kafkaTransportProtocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
            this.patternTopic = true;
        }
        this.eventProcessor = internalEventProcessor;
        this.kafkaUrl = kafkaTransportProtocol.getBrokerHostname() + ":" + kafkaTransportProtocol.getKafkaPort();
        this.topic = kafkaTransportProtocol.getTopicDefinition().getActualTopicName();
        this.groupId = UUID.randomUUID().toString();
        this.isRunning = true;
        new Thread(this).start();
    }

    public void disconnect() throws SpRuntimeException {
        LOG.info("Kafka consumer: Disconnecting from " + this.topic);
        this.isRunning = false;
    }

    public Boolean isConnected() {
        return Boolean.valueOf(this.isRunning);
    }

    public /* bridge */ /* synthetic */ void connect(TransportProtocol transportProtocol, InternalEventProcessor internalEventProcessor) throws SpRuntimeException {
        connect((KafkaTransportProtocol) transportProtocol, (InternalEventProcessor<byte[]>) internalEventProcessor);
    }
}
