package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.frameworks.connectors.properties.EndpointDetails;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.IncomingEvent;
import org.odpi.openmetadata.repositoryservices.connectors.openmetadatatopic.OpenMetadataTopicConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataTopicConnector.class */
public class KafkaOpenMetadataTopicConnector extends OpenMetadataTopicConnector {
    static final String ENABLE_AUTO_COMMIT_PROPERTY = "enable.auto.commit";
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataTopicConnector.class);
    private final Properties producerProperties = new Properties();
    private final Properties consumerEgeriaProperties = new Properties();
    private final Properties consumerProperties = new Properties();
    private boolean outboundEventsEnabled = true;
    private boolean inboundEventsEnabled = true;
    private KafkaOpenMetadataEventConsumer consumer = null;
    private KafkaOpenMetadataEventProducer producer = null;
    private String topicName = null;
    private String serverId = null;
    private final List<IncomingEvent> incomingEventsList = Collections.synchronizedList(new ArrayList());
    private KafkaConsumerExecutor consumerExecutor = null;
    private KafkaProducerExecutor producerExecutor = null;
    final String threadHeader = "Kafka-";
    Thread consumerThread;
    Thread producerThread;

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataTopicConnector$KafkaConsumerExecutor.class */
    private class KafkaConsumerExecutor extends ThreadPoolExecutor {
        KafkaConsumerExecutor() {
            super(1, 1, Long.MAX_VALUE, TimeUnit.MICROSECONDS, new LinkedBlockingQueue(1));
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        public void afterExecute(Runnable runnable, Throwable th) {
            super.afterExecute(runnable, th);
            KafkaOpenMetadataTopicConnector.this.initializeConsumerAndConsumerThread();
            if (KafkaOpenMetadataTopicConnector.this.isActive()) {
                KafkaOpenMetadataTopicConnector.this.consumerExecutor.execute(KafkaOpenMetadataTopicConnector.this.consumerThread);
            }
        }
    }

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataTopicConnector$KafkaProducerExecutor.class */
    private class KafkaProducerExecutor extends ThreadPoolExecutor {
        KafkaProducerExecutor() {
            super(1, 1, Long.MAX_VALUE, TimeUnit.MICROSECONDS, new LinkedBlockingQueue(1));
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        public void afterExecute(Runnable runnable, Throwable th) {
            super.afterExecute(runnable, th);
            KafkaOpenMetadataTopicConnector.this.initializeProducerAndProducerThread();
            if (KafkaOpenMetadataTopicConnector.this.isActive()) {
                KafkaOpenMetadataTopicConnector.this.producerExecutor.execute(KafkaOpenMetadataTopicConnector.this.producerThread);
            }
        }
    }

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataTopicConnector$KafkaStatusChecker.class */
    private class KafkaStatusChecker {
        private Exception lastException = new Exception();

        private KafkaStatusChecker() {
        }

        boolean getRunningBrokers(Properties properties) {
            boolean z = false;
            AdminClient adminClient = null;
            try {
                try {
                    adminClient = KafkaAdminClient.create(properties);
                    if (!((Collection) adminClient.describeCluster().nodes().get()).isEmpty()) {
                        z = true;
                    }
                    if (adminClient != null) {
                        adminClient.close(Duration.ZERO);
                    }
                } catch (Exception e) {
                    this.lastException = e;
                    if (adminClient != null) {
                        adminClient.close(Duration.ZERO);
                    }
                }
                return z;
            } catch (Throwable th) {
                if (adminClient != null) {
                    adminClient.close(Duration.ZERO);
                }
                throw th;
            }
        }

        public boolean waitForBrokers(Properties properties) {
            int i = 0;
            boolean z = false;
            try {
                int parseInt = Integer.parseInt(properties.getProperty("bring.up.retries"));
                int parseInt2 = Integer.parseInt(properties.getProperty("bring.up.minSleepTime"));
                while (true) {
                    if (i >= parseInt) {
                        break;
                    }
                    if (KafkaOpenMetadataTopicConnector.this.auditLog != null) {
                        KafkaOpenMetadataTopicConnector.this.auditLog.logMessage("waitForBrokers", KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_CONNECTION_RETRY.getMessageDefinition(properties.getProperty("bootstrap.servers"), String.valueOf(i + 1), String.valueOf(parseInt)));
                    }
                    Instant now = Instant.now();
                    if (getRunningBrokers(properties)) {
                        z = true;
                        break;
                    }
                    i++;
                    long millis = Duration.between(now, Instant.now()).toMillis();
                    if (millis < parseInt2) {
                        Thread.sleep(parseInt2 - millis);
                    }
                }
            } catch (Exception e) {
                this.lastException = e;
            }
            return z;
        }

        public Exception getLastException() {
            return this.lastException;
        }
    }

    public KafkaOpenMetadataTopicConnector() {
        this.producerProperties.put("bootstrap.servers", "localhost:9092");
        this.producerProperties.put("acks", "all");
        this.producerProperties.put("retries", 1);
        this.producerProperties.put("batch.size", 16384);
        this.producerProperties.put("linger.ms", 0);
        this.producerProperties.put("buffer.memory", 33554432);
        this.producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producerProperties.put("bring.up.retries", "10");
        this.producerProperties.put("bring.up.minSleepTime", "5000");
        this.consumerProperties.put("bootstrap.servers", "localhost:9092");
        this.consumerProperties.put(ENABLE_AUTO_COMMIT_PROPERTY, "true");
        this.consumerProperties.put("auto.commit.interval.ms", "1000");
        this.consumerProperties.put("session.timeout.ms", "30000");
        this.consumerProperties.put("max.partition.fetch.bytes", 10485760);
        this.consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumerProperties.put("bring.up.retries", "10");
        this.consumerProperties.put("bring.up.minSleepTime", "5000");
    }

    private void initializeTopic() {
        EndpointDetails endpoint = this.connectionDetails.getEndpoint();
        if (endpoint == null) {
            if (this.auditLog != null) {
                this.auditLog.logMessage("initializeTopic", KafkaOpenMetadataTopicConnectorAuditCode.NO_TOPIC_NAME.getMessageDefinition());
                return;
            }
            return;
        }
        this.topicName = endpoint.getAddress();
        Map<String, Object> configurationProperties = this.connectionDetails.getConfigurationProperties();
        if (configurationProperties == null) {
            if (this.auditLog != null) {
                this.auditLog.logMessage("initializeTopic", KafkaOpenMetadataTopicConnectorAuditCode.NULL_ADDITIONAL_PROPERTIES.getMessageDefinition(this.topicName));
                return;
            }
            return;
        }
        initializeKafkaProperties(configurationProperties);
        if (configurationProperties.get(KafkaOpenMetadataTopicProvider.serverIdPropertyName) != null) {
            this.serverId = configurationProperties.get(KafkaOpenMetadataTopicProvider.serverIdPropertyName).toString();
        } else {
            this.serverId = UUID.randomUUID().toString();
        }
        if (StringUtils.isEmpty((String) this.consumerProperties.get("group.id"))) {
            this.consumerProperties.put("group.id", this.serverId);
        }
        if (this.auditLog != null) {
            this.auditLog.logMessage("initializeTopic", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_INITIALIZING.getMessageDefinition(this.topicName, this.serverId, this.consumerProperties.getProperty("bootstrap.servers")));
        }
    }

    private void initializeKafkaProperties(Map<String, Object> map) {
        try {
            copyProperties(map.get(KafkaOpenMetadataTopicProvider.producerPropertyName), this.producerProperties);
            copyProperties(map.get(KafkaOpenMetadataTopicProvider.consumerPropertyName), this.consumerProperties);
            copyProperties(map.get(KafkaOpenMetadataTopicProvider.egeriaConsumerPropertyName), this.consumerEgeriaProperties);
            Object obj = map.get("eventDirection");
            if (obj != null) {
                if (obj.equals("outOnly")) {
                    this.inboundEventsEnabled = false;
                } else if (obj.equals("inOnly")) {
                    this.outboundEventsEnabled = false;
                }
            }
        } catch (Exception e) {
            if (this.auditLog != null) {
                this.auditLog.logMessage("initializeKafkaProperties", KafkaOpenMetadataTopicConnectorAuditCode.UNABLE_TO_PARSE_CONFIG_PROPERTIES.getMessageDefinition(this.topicName, e.getClass().getName(), e.getMessage()));
            }
        }
    }

    private void copyProperties(Object obj, Properties properties) {
        if (obj != null) {
            if (!(obj instanceof String)) {
                for (Map.Entry entry : ((Map) obj).entrySet()) {
                    properties.setProperty((String) entry.getKey(), String.valueOf(entry.getValue()));
                }
                return;
            }
            String str = (String) obj;
            for (String str2 : str.substring(1, str.length() - 1).split(",")) {
                String[] split = str2.split("=");
                properties.setProperty(split[0].trim(), split[1].trim());
            }
        }
    }

    public void start() throws ConnectorCheckedException {
        boolean waitForBrokers;
        initializeTopic();
        KafkaStatusChecker kafkaStatusChecker = new KafkaStatusChecker();
        if (!this.outboundEventsEnabled || !this.inboundEventsEnabled) {
            waitForBrokers = this.outboundEventsEnabled ? kafkaStatusChecker.waitForBrokers(this.producerProperties) : kafkaStatusChecker.waitForBrokers(this.consumerProperties);
        } else if (this.consumerProperties.getProperty("bootstrap.servers").equals(this.producerProperties.getProperty("bootstrap.servers"))) {
            waitForBrokers = kafkaStatusChecker.waitForBrokers(this.producerProperties);
        } else {
            waitForBrokers = kafkaStatusChecker.waitForBrokers(this.producerProperties) && kafkaStatusChecker.waitForBrokers(this.consumerProperties);
        }
        if (!waitForBrokers) {
            if (this.auditLog != null) {
                this.auditLog.logMessage("waitForThisBroker", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_FAILED_INITIALIZING.getMessageDefinition(this.topicName));
            }
            throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_ATTEMPTING_KAFKA_INITIALIZATION.getMessageDefinition(kafkaStatusChecker.getLastException().getClass().getName(), this.topicName, kafkaStatusChecker.getLastException().getMessage()), getClass().getName(), "KafkaMonitor.waitForThisBroker", kafkaStatusChecker.getLastException());
        }
        if (this.inboundEventsEnabled) {
            initializeConsumerAndConsumerThread();
            this.consumerExecutor = new KafkaConsumerExecutor();
            this.consumerExecutor.execute(this.consumerThread);
        }
        if (this.outboundEventsEnabled) {
            initializeProducerAndProducerThread();
            this.producerExecutor = new KafkaProducerExecutor();
            this.producerExecutor.execute(this.producerThread);
        }
        super.start();
    }

    private void initializeConsumerAndConsumerThread() {
        log.info("Initializing the consumer thread");
        this.consumer = new KafkaOpenMetadataEventConsumer(this.topicName, this.serverId, new KafkaOpenMetadataEventConsumerConfiguration(this.consumerEgeriaProperties, this.auditLog), this.consumerProperties, this, this.auditLog);
        this.consumerThread = new Thread(this.consumer, "Kafka-Consumer-" + this.topicName);
    }

    private void initializeProducerAndProducerThread() {
        log.info("Initializing the producer thread");
        this.producer = new KafkaOpenMetadataEventProducer(this.topicName, this.serverId, this.producerProperties, this.auditLog);
        this.producerThread = new Thread(this.producer, "Kafka-Producer-" + this.topicName);
    }

    public void sendEvent(String str) throws ConnectorCheckedException {
        if (this.producer != null) {
            log.debug("Sending event");
            this.producer.sendEvent(str);
        }
    }

    protected List<IncomingEvent> checkForIncomingEvents() {
        ArrayList arrayList = null;
        if (this.incomingEventsList != null && !this.incomingEventsList.isEmpty()) {
            log.debug("Checking for events.  Number of found events: {}", Integer.valueOf(this.incomingEventsList.size()));
            arrayList = new ArrayList(this.incomingEventsList);
            this.incomingEventsList.removeAll(arrayList);
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void distributeToListeners(IncomingEvent incomingEvent) {
        log.debug("distribute event to listeners" + String.valueOf(incomingEvent));
        this.incomingEventsList.add(incomingEvent);
    }

    public void disconnect() throws ConnectorCheckedException {
        super.disconnect();
        if (this.consumer != null) {
            this.consumer.safeCloseConsumer();
        }
        if (this.producer != null) {
            this.producer.safeCloseProducer();
        }
        if (this.consumerThread != null) {
            try {
                this.consumerThread.join();
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                if (this.auditLog != null) {
                    this.auditLog.logException("disconnect", KafkaOpenMetadataTopicConnectorAuditCode.UNEXPECTED_SHUTDOWN_EXCEPTION.getMessageDefinition(e2.getClass().getName(), this.topicName, "consumerThread.join", e2.getMessage()), e2);
                }
            }
        }
        if (this.producerThread != null) {
            try {
                this.producerThread.join();
            } catch (InterruptedException e3) {
            } catch (Exception e4) {
                if (this.auditLog != null) {
                    this.auditLog.logException("disconnect", KafkaOpenMetadataTopicConnectorAuditCode.UNEXPECTED_SHUTDOWN_EXCEPTION.getMessageDefinition(e4.getClass().getName(), this.topicName, "producerThread.join", e4.getMessage()), e4);
                }
            }
        }
        if (this.auditLog != null) {
            this.auditLog.logMessage("disconnect", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_SHUTDOWN.getMessageDefinition(this.topicName));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumberOfUnprocessedEvents() {
        return this.incomingEventsList.size();
    }
}
