package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.class */
public class KafkaOpenMetadataEventProducer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventProducer.class);
    private static final String defaultThreadName = "KafkaProducer for topic ";
    private OMRSAuditLog auditLog;
    private String listenerThreadName;
    private String topicName;
    private static final long recoverySleepTimeSec = 10;
    private String localServerId;
    private Properties producerProperties;
    private Producer<String, String> producer;
    private KafkaOpenMetadataTopicConnector connector;
    private volatile List<String> sendBuffer = new ArrayList();
    private volatile boolean running = true;
    private int sleepTime = 1000;
    private long messageSendCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaOpenMetadataEventProducer(String str, String str2, Properties properties, KafkaOpenMetadataTopicConnector kafkaOpenMetadataTopicConnector, OMRSAuditLog oMRSAuditLog) {
        this.auditLog = oMRSAuditLog;
        this.topicName = str;
        this.localServerId = str2;
        this.connector = kafkaOpenMetadataTopicConnector;
        this.producerProperties = properties;
        this.listenerThreadName = defaultThreadName + str;
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES;
        oMRSAuditLog.logRecord("new producer", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(str, kafkaOpenMetadataTopicConnector.getPrintableProperties(properties)), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
    }

    private void publishEvent(String str) throws ConnectorCheckedException {
        ConnectorCheckedException connectorCheckedException;
        boolean z = false;
        long j = 0;
        while (!z) {
            try {
                try {
                    try {
                        try {
                            log.debug("Sending message {0}" + str);
                            this.producer.send(new ProducerRecord(this.topicName, this.localServerId, str)).get();
                            z = true;
                            this.messageSendCount++;
                            this.producer.flush();
                        } finally {
                        }
                    } catch (WakeupException e) {
                        log.error("Wake up for shut down " + e.toString());
                        this.producer.flush();
                    }
                } catch (ExecutionException e2) {
                    log.debug("Kafka had trouble sending event: " + str + "exception message is " + e2.getMessage());
                    if (j == recoverySleepTimeSec) {
                        j = 0;
                    } else {
                        if (j == 0) {
                            KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.EVENT_SEND_IN_ERROR_LOOP;
                            this.auditLog.logRecord("publishEvent", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(this.topicName, Long.toString(this.messageSendCount), Long.toString(getSendBufferSize()), e2.getMessage()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
                        }
                        j++;
                    }
                    this.producer.flush();
                }
            } catch (Throwable th) {
                this.producer.flush();
                throw th;
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_START;
        this.auditLog.logRecord(this.listenerThreadName, kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(this.topicName, Integer.toString(this.sendBuffer.size())), this.connector.getPrintableProperties(this.producerProperties), kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
        this.producer = new KafkaProducer(this.producerProperties);
        while (isRunning()) {
            try {
                String event = getEvent();
                if (event == null) {
                    Thread.sleep(this.sleepTime);
                } else {
                    while (event != null) {
                        publishEvent(event);
                        event = getEvent();
                    }
                }
            } catch (InterruptedException e) {
                log.info("Woken up from sleep " + e.getMessage());
            } catch (Throwable th) {
                log.error("Bad exception from sending events " + th.getMessage());
                recoverAfterError();
            }
        }
        this.producer.close();
        this.producer = null;
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode2 = KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_SHUTDOWN;
        this.auditLog.logRecord(this.listenerThreadName, kafkaOpenMetadataTopicConnectorAuditCode2.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode2.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode2.getFormattedLogMessage(this.topicName, Integer.toString(getSendBufferSize()), Long.toString(this.messageSendCount)), this.connector.getPrintableProperties(this.producerProperties), kafkaOpenMetadataTopicConnectorAuditCode2.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode2.getUserAction());
    }

    private synchronized void putEvent(String str) {
        this.sendBuffer.add(str);
    }

    private synchronized int getSendBufferSize() {
        return this.sendBuffer.size();
    }

    private synchronized String getEvent() {
        if (this.sendBuffer.isEmpty()) {
            return null;
        }
        return this.sendBuffer.remove(0);
    }

    public void sendEvent(String str) {
        putEvent(str);
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", Long.valueOf(recoverySleepTimeSec)));
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            log.debug("Interrupted while recovering", e);
        }
    }

    public void safeCloseProducer() {
        stopRunning();
    }

    private synchronized boolean isRunning() {
        return this.running;
    }

    private synchronized void stopRunning() {
        this.running = false;
    }
}
