/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.messaging.kafka;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.messaging.EventConsumer;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.kafka.config.ConsumerConfigFactory;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.model.grounding.TopicDefinition;
import org.streampipes.model.grounding.WildcardTopicDefinition;

public class SpKafkaConsumer
implements EventConsumer<KafkaTransportProtocol>,
Runnable,
Serializable {
    private String topic;
    private InternalEventProcessor<byte[]> eventProcessor;
    private KafkaTransportProtocol protocol;
    private volatile boolean isRunning;
    private Boolean patternTopic = false;
    private static final Logger LOG = LoggerFactory.getLogger(SpKafkaConsumer.class);

    public SpKafkaConsumer() {
    }

    public SpKafkaConsumer(String kafkaUrl, String topic, InternalEventProcessor<byte[]> callback) {
        KafkaTransportProtocol protocol = new KafkaTransportProtocol();
        protocol.setKafkaPort(Integer.parseInt(kafkaUrl.split(":")[1]));
        protocol.setBrokerHostname(kafkaUrl.split(":")[0]);
        protocol.setTopicDefinition((TopicDefinition)new SimpleTopicDefinition(topic));
        try {
            this.connect(protocol, callback);
        }
        catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.getProperties());
        if (!this.patternTopic.booleanValue()) {
            kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        } else {
            this.topic = this.replaceWildcardWithPatternFormat(this.topic);
            kafkaConsumer.subscribe(Pattern.compile(this.topic), new ConsumerRebalanceListener(){

                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                }
            });
        }
        while (this.isRunning) {
            ConsumerRecords records = kafkaConsumer.poll(100L);
            for (ConsumerRecord record : records) {
                this.eventProcessor.onEvent(record.value());
            }
        }
        LOG.info("Closing Kafka Consumer.");
        kafkaConsumer.close();
    }

    private String replaceWildcardWithPatternFormat(String topic) {
        topic = topic.replaceAll("\\.", "\\\\.");
        return topic.replaceAll("\\*", ".*");
    }

    private Properties getProperties() {
        return new ConsumerConfigFactory(this.protocol).makeProperties();
    }

    public void connect(KafkaTransportProtocol protocol, InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException {
        LOG.info("Kafka consumer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
        if (protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
            this.patternTopic = true;
        }
        this.eventProcessor = eventProcessor;
        this.protocol = protocol;
        this.topic = protocol.getTopicDefinition().getActualTopicName();
        this.isRunning = true;
        Thread thread = new Thread(this);
        thread.start();
    }

    public void disconnect() throws SpRuntimeException {
        LOG.info("Kafka consumer: Disconnecting from " + this.topic);
        this.isRunning = false;
    }

    public Boolean isConnected() {
        return this.isRunning;
    }
}

