package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.AdditionalProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.ConnectionProperties;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointProperties;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditingComponent;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataTopicConnector.class */
public class KafkaOpenMetadataTopicConnector extends OpenMetadataTopicConnector {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataTopicConnector.class);
    private static final OMRSAuditLog auditLog = new OMRSAuditLog(OMRSAuditingComponent.OPEN_METADATA_TOPIC_CONNECTOR);
    private Properties producerProperties = new Properties();
    private Properties consumerProperties = new Properties();
    private Thread consumerThread = null;
    private String outTopic = null;
    private String serverId = null;
    private KafkaOpenMetadataEventConsumer consumer = null;
    private List<String> incomingEventsList = Collections.synchronizedList(new ArrayList());

    public KafkaOpenMetadataTopicConnector() {
        this.producerProperties.put("bootstrap.servers", "localhost:9092");
        this.producerProperties.put("acks", "all");
        this.producerProperties.put("retries", 1);
        this.producerProperties.put("batch.size", 16384);
        this.producerProperties.put("linger.ms", 0);
        this.producerProperties.put("buffer.memory", 33554432);
        this.producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.consumerProperties.put("bootstrap.servers", "localhost:9092");
        this.consumerProperties.put("group.id", "test");
        this.consumerProperties.put("enable.auto.commit", "true");
        this.consumerProperties.put("auto.commit.interval.ms", "1000");
        this.consumerProperties.put("session.timeout.ms", "30000");
        this.consumerProperties.put("zookeeper.session.timeout.ms", 400);
        this.consumerProperties.put("zookeeper.sync.time.ms", 200);
        this.consumerProperties.put("fetch.message.max.bytes", 10485760);
        this.consumerProperties.put("max.partition.fetch.bytes", 10485760);
        this.consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public void initialize(String str, ConnectionProperties connectionProperties) {
        super.initialize(str, connectionProperties);
        EndpointProperties endpoint = connectionProperties.getEndpoint();
        if (endpoint == null) {
            KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.NO_TOPIC_NAME;
            auditLog.logRecord("initialize", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(new String[0]), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
            return;
        }
        this.outTopic = endpoint.getAddress();
        AdditionalProperties additionalProperties = connectionProperties.getAdditionalProperties();
        if (additionalProperties == null) {
            KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode2 = KafkaOpenMetadataTopicConnectorAuditCode.NULL_ADDITIONAL_PROPERTIES;
            auditLog.logRecord("initialize", kafkaOpenMetadataTopicConnectorAuditCode2.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode2.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode2.getFormattedLogMessage(this.outTopic), (String) null, kafkaOpenMetadataTopicConnectorAuditCode2.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode2.getUserAction());
            return;
        }
        initializeKafkaProperties(additionalProperties);
        this.serverId = (String) additionalProperties.getProperty(KafkaOpenMetadataTopicProvider.serverIdPropertyName);
        this.consumerProperties.put("group.id", this.serverId);
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode3 = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_INITIALIZING;
        auditLog.logRecord("initialize", kafkaOpenMetadataTopicConnectorAuditCode3.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode3.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode3.getFormattedLogMessage(this.outTopic, this.serverId), (String) null, kafkaOpenMetadataTopicConnectorAuditCode3.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode3.getUserAction());
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode4 = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES;
        auditLog.logRecord("initialize", kafkaOpenMetadataTopicConnectorAuditCode4.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode4.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode4.getFormattedLogMessage(this.producerProperties.toString()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode4.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode4.getUserAction());
        this.consumer = new KafkaOpenMetadataEventConsumer(this.outTopic, this.serverId, this.consumerProperties, this);
        this.consumerThread = new Thread(this.consumer);
        this.consumerThread.start();
    }

    private void initializeKafkaProperties(AdditionalProperties additionalProperties) {
        try {
            Object property = additionalProperties.getProperty(KafkaOpenMetadataTopicProvider.producerPropertyName);
            if (property != null) {
                for (Map.Entry entry : ((Map) property).entrySet()) {
                    this.producerProperties.setProperty((String) entry.getKey(), (String) entry.getValue());
                }
            }
            Object property2 = additionalProperties.getProperty(KafkaOpenMetadataTopicProvider.consumerPropertyName);
            if (property2 != null) {
                for (Map.Entry entry2 : ((Map) property2).entrySet()) {
                    this.consumerProperties.setProperty((String) entry2.getKey(), (String) entry2.getValue());
                }
            }
        } catch (Throwable th) {
            KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.UNABLE_TO_PARSE_ADDITIONAL_PROPERTIES;
            auditLog.logRecord("initializeKafkaProperties", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(this.outTopic, th.getClass().getName(), th.getMessage()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
        }
    }

    public void start() throws ConnectorCheckedException {
        super.start();
    }

    public void sendEvent(String str) throws ConnectorCheckedException {
        KafkaProducer kafkaProducer = new KafkaProducer(this.producerProperties);
        try {
            try {
                log.debug("Sending message {0}" + str);
                kafkaProducer.send(new ProducerRecord(this.outTopic, this.serverId, str)).get();
                kafkaProducer.flush();
                kafkaProducer.close();
            } catch (InterruptedException | ExecutionException e) {
                log.error("Exception in sendEvent ", e);
                KafkaOpenMetadataTopicConnectorErrorCode kafkaOpenMetadataTopicConnectorErrorCode = KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT;
                throw new ConnectorCheckedException(kafkaOpenMetadataTopicConnectorErrorCode.getHTTPErrorCode(), getClass().getName(), "sendEvent", kafkaOpenMetadataTopicConnectorErrorCode.getErrorMessageId() + kafkaOpenMetadataTopicConnectorErrorCode.getFormattedErrorMessage(e.getClass().getName(), this.outTopic, e.getMessage()), kafkaOpenMetadataTopicConnectorErrorCode.getSystemAction(), kafkaOpenMetadataTopicConnectorErrorCode.getUserAction(), e);
            }
        } catch (Throwable th) {
            kafkaProducer.flush();
            kafkaProducer.close();
            throw th;
        }
    }

    protected List<String> checkForEvents() {
        ArrayList arrayList = null;
        if (this.incomingEventsList != null && !this.incomingEventsList.isEmpty()) {
            log.debug("checking for events {0}" + this.incomingEventsList);
            arrayList = new ArrayList(this.incomingEventsList);
            this.incomingEventsList.removeAll(arrayList);
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void distributeToListeners(String str) {
        log.debug("distribute event to listeners" + str);
        this.incomingEventsList.add(str);
    }

    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_SHUTDOWN;
        auditLog.logRecord("disconnect", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(this.outTopic), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
        this.consumer.safeCloseConsumer();
    }
}
