/*
 * Decompiled with CFR 0.152.
 */
package org.streampipes.messaging.kafka;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.messaging.EventProducer;
import org.streampipes.messaging.kafka.config.ProducerConfigFactory;
import org.streampipes.model.grounding.KafkaTransportProtocol;

public class SpKafkaProducer
implements EventProducer<KafkaTransportProtocol>,
Serializable {
    private static final String COLON = ":";
    private String brokerUrl;
    private String topic;
    private Producer<String, byte[]> producer;
    private Boolean connected;
    private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);

    public SpKafkaProducer() {
    }

    public SpKafkaProducer(String url, String topic) {
        String[] urlParts = url.split(COLON);
        KafkaTransportProtocol protocol = new KafkaTransportProtocol(urlParts[0], Integer.parseInt(urlParts[1]), topic);
        this.brokerUrl = url;
        this.topic = topic;
        this.producer = new KafkaProducer(this.makeProperties(protocol));
    }

    public void publish(String message) {
        this.publish(message.getBytes());
    }

    public void publish(byte[] message) {
        this.producer.send(new ProducerRecord(this.topic, (Object)message));
    }

    private Properties makeProperties(KafkaTransportProtocol protocol) {
        return new ProducerConfigFactory(protocol).makeProperties();
    }

    public void connect(KafkaTransportProtocol protocol) {
        LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName());
        this.brokerUrl = protocol.getBrokerHostname() + COLON + protocol.getKafkaPort();
        this.topic = protocol.getTopicDefinition().getActualTopicName();
        this.createKafaTopic(protocol);
        this.producer = new KafkaProducer(this.makeProperties(protocol));
        this.connected = true;
    }

    private void createKafaTopic(KafkaTransportProtocol settings) {
        String zookeeperHost = settings.getZookeeperHost() + COLON + settings.getZookeeperPort();
        Properties props = new Properties();
        props.put("bootstrap.servers", this.brokerUrl);
        HashMap<String, String> topicConfig = new HashMap<String, String>();
        topicConfig.put("retention.ms", "600000");
        AdminClient adminClient = KafkaAdminClient.create((Properties)props);
        NewTopic newTopic = new NewTopic(this.topic, 1, 1);
        newTopic.configs(topicConfig);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
        try {
            ((KafkaFuture)createTopicsResult.values().get(this.topic)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Could not create topic: " + this.topic + " on broker " + zookeeperHost);
        }
    }

    public void disconnect() {
        LOG.info("Kafka producer: Disconnecting from " + this.topic);
        this.producer.close();
        this.connected = false;
    }

    public Boolean isConnected() {
        return this.connected != null && this.connected != false;
    }
}

