package org.phoebus.olog.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:org/phoebus/olog/kafka/KafkaProducer.class */
public class KafkaProducer {
    private static Logger logger = Logger.getLogger(KafkaProducer.class.getName());
    private static KafkaProducer instance;
    private static final short REPLICATION_FACTOR = 1;
    private static final int PARTITIONS = 1;
    private static final String CLEANUP_POLICY = "cleanup.policy";
    private static final String COMPACT_POLICY = "compact";
    private static final String SEGMENT_TIME = "segment.ms";
    private static final String TIME = "10000";
    private static final String DIRTY_2_CLEAN = "min.cleanable.dirty.ratio";
    private static final String RATIO = "0.01";
    private static final String TOPIC_NAME = "olog-notify";
    private static final String KEY = "new";
    private String bootstrapServers;
    private Producer<String, String> producer;
    private ObjectMapper objectMapper = new ObjectMapper();

    public static KafkaProducer getInstance() {
        if (instance == null) {
            instance = new KafkaProducer();
        }
        return instance;
    }

    private KafkaProducer() {
        this.bootstrapServers = System.getenv("KAFKA_BOOTSTRAP_SERVERS");
        if (this.bootstrapServers == null) {
            logger.log(Level.INFO, "KAFKA_BOOTSTRAP_SERVERS not found in system environment, using system property or properties file");
            Properties properties = new Properties();
            try {
                properties.load(getClass().getResourceAsStream("/kafka_integration_config.properties"));
                this.bootstrapServers = System.getProperty("kafka.bootstrap.servers", properties.getProperty("kafka.bootstrap.servers"));
            } catch (IOException e) {
                logger.log(Level.SEVERE, "Unable to load configuration.", (Throwable) e);
                return;
            }
        }
        logger.log(Level.INFO, "Using Kafka bootstrap servers: " + this.bootstrapServers);
        discoverAndCreateTopic();
        this.producer = createProducer(this.bootstrapServers);
    }

    public void send(LogEntryMessage logEntryMessage) {
        try {
            this.producer.send(new ProducerRecord(TOPIC_NAME, KEY, this.objectMapper.writeValueAsString(logEntryMessage)), (recordMetadata, exc) -> {
                if (exc != null) {
                    logger.log(Level.SEVERE, "Encountered exception when sending message", (Throwable) exc);
                } else {
                    logger.log(Level.INFO, "Kafka message acknowledged");
                }
            });
            logger.log(Level.INFO, "Kafka message sent");
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to send Kafka message", (Throwable) e);
        }
    }

    private void discoverAndCreateTopic() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        AdminClient create = AdminClient.create(properties);
        try {
            if (((Set) create.listTopics().names().get()).contains(TOPIC_NAME)) {
                logger.log(Level.INFO, "Topic olog-notify already exists.");
            } else {
                logger.log(Level.INFO, "Could not find topic olog-notify, creating it.");
                createTopic(create, TOPIC_NAME);
            }
        } catch (Exception e) {
            logger.log(Level.WARNING, "Unable to list topics or create topic olog-notify", (Throwable) e);
        }
    }

    private void createTopic(AdminClient adminClient, String str) {
        NewTopic newTopic = new NewTopic(str, 1, (short) 1);
        HashMap hashMap = new HashMap();
        hashMap.put(SEGMENT_TIME, TIME);
        hashMap.put(CLEANUP_POLICY, COMPACT_POLICY);
        hashMap.put(DIRTY_2_CLEAN, RATIO);
        newTopic.configs(hashMap);
        try {
            adminClient.createTopics(Arrays.asList(newTopic)).all().get();
        } catch (Exception e) {
            logger.log(Level.WARNING, "Attempt to create topics failed", (Throwable) e);
        }
    }

    private Producer<String, String> createProducer(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("linger.ms", 20);
        StringSerializer stringSerializer = new StringSerializer();
        return new org.apache.kafka.clients.producer.KafkaProducer(properties, stringSerializer, stringSerializer);
    }
}
