/*
 * Decompiled with CFR 0.152.
 */
package net.acesinc.data.json.generator.log;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import net.acesinc.data.json.generator.log.EventLogger;
import net.acesinc.data.json.util.JsonUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class KafkaLogger
implements EventLogger {
    private static final Logger log = LogManager.getLogger(KafkaLogger.class);
    public static final String BROKER_SERVER_PROP_NAME = "broker.server";
    public static final String BROKER_PORT_PROP_NAME = "broker.port";
    private final KafkaProducer<String, String> producer;
    private final String topic;
    private final boolean sync;
    private final boolean flatten;
    private final Properties props = new Properties();
    private JsonUtils jsonUtils;

    public KafkaLogger(Map<String, Object> props) {
        String brokerHost = (String)props.get(BROKER_SERVER_PROP_NAME);
        Integer brokerPort = (Integer)props.get(BROKER_PORT_PROP_NAME);
        this.props.put("bootstrap.servers", brokerHost + ":" + brokerPort.toString());
        this.props.put("key.serializer", StringSerializer.class.getName());
        this.props.put("value.serializer", StringSerializer.class.getName());
        this.producer = new KafkaProducer(this.props);
        this.topic = (String)props.get("topic");
        this.sync = props.get("sync") != null ? (Boolean)props.get("sync") : false;
        this.flatten = props.get("flatten") != null ? (Boolean)props.get("flatten") : false;
        this.jsonUtils = new JsonUtils();
    }

    @Override
    public void logEvent(String event, Map<String, Object> producerConfig) {
        this.logEvent(event);
    }

    private void logEvent(String event) {
        boolean sync = false;
        String output = event;
        if (this.flatten) {
            try {
                output = this.jsonUtils.flattenJson(event);
            }
            catch (IOException ex) {
                log.error("Error flattening json. Unable to send event [ " + event + " ]", (Throwable)ex);
                return;
            }
        }
        ProducerRecord producerRecord = new ProducerRecord(this.topic, (Object)output);
        if (sync) {
            try {
                this.producer.send(producerRecord).get();
            }
            catch (InterruptedException | ExecutionException ex) {
                log.warn("Thread interrupted while waiting for synchronous response from producer", (Throwable)ex);
            }
        } else {
            log.debug("Sending event to Kafka: [ " + output + " ]");
            this.producer.send(producerRecord);
        }
    }

    @Override
    public void shutdown() {
        this.producer.close();
    }
}

