package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.odpi.openmetadata.frameworks.connectors.ffdc.ConnectorCheckedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventProducer.class */
public class KafkaOpenMetadataEventProducer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventProducer.class);
    private final AuditLog auditLog;
    private final String topicName;
    private final String localServerId;
    private final Properties producerProperties;
    private final List<String> sendBuffer = Collections.synchronizedList(new ArrayList());
    private volatile boolean running = true;
    private Producer<String, String> producer = null;
    private long messageSendCount = 0;
    private long kafkaSendAttemptCount = 0;
    private long messagePublishRequestCount = 0;
    private long inmemoryPutMessageCount = 0;
    private long kafkaSendFailCount = 0;
    private long messageFailedSendCount = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaOpenMetadataEventProducer(String str, String str2, Properties properties, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.topicName = str;
        this.localServerId = str2;
        this.producerProperties = properties;
        if (auditLog != null) {
            auditLog.logMessage("new producer", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_PRODUCER_PROPERTIES.getMessageDefinition(Integer.toString(properties.size()), str), properties.toString());
        }
    }

    private void publishEvent(String str) throws ConnectorCheckedException {
        boolean z = false;
        long j = 0;
        this.messagePublishRequestCount++;
        log.debug("Metrics: messagePublishRequestCount {}", Long.valueOf(this.messagePublishRequestCount));
        if (this.producer == null) {
            try {
                log.debug("Creating new producer for topic {}", this.topicName);
                this.producer = new KafkaProducer(this.producerProperties);
            } catch (Exception e) {
                if (this.auditLog != null) {
                    this.auditLog.logException("publishEvent", KafkaOpenMetadataTopicConnectorAuditCode.ERROR_CONNECTING_KAFKA_PRODUCER.getMessageDefinition(this.topicName), e);
                }
                throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_CONNECTING_KAFKA_PRODUCER.getMessageDefinition(e.getMessage()), getClass().getName(), "publishEvent", e);
            }
        }
        while (!z) {
            try {
                log.debug("Sending message try {} [0 based] : {}", Long.valueOf(j), str);
                ProducerRecord producerRecord = new ProducerRecord(this.topicName, this.localServerId, str);
                this.kafkaSendAttemptCount++;
                log.debug("Metrics: kafkaSendAttemptCount {}", Long.valueOf(this.kafkaSendAttemptCount));
                this.producer.send(producerRecord).get();
                z = true;
                this.messageSendCount++;
                log.debug("Metrics: messageSendCount {}", Long.valueOf(this.messageSendCount));
            } catch (WakeupException e2) {
                log.warn("Wake up for shut down");
            } catch (ExecutionException e3) {
                this.kafkaSendFailCount++;
                log.debug("Metrics: kafkaSendFailCount {}", Long.valueOf(this.kafkaSendFailCount));
                log.debug("Kafka had trouble sending event: {} : Exception  message is {}", str, e3.getMessage());
                if (!isExceptionRetryable(e3)) {
                    log.debug("Exception not retryable, closing producer");
                    this.producer.close();
                    this.producer = null;
                    this.messageFailedSendCount++;
                    log.warn("Metrics: messageFailedSendCount {}", Long.valueOf(this.messageFailedSendCount));
                    throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(e3.getClass().getName(), this.topicName, e3.getMessage()), getClass().getName(), "publishEvent", e3);
                }
                if (j == 10) {
                    this.producer.close();
                    this.producer = null;
                    this.messageFailedSendCount++;
                    log.warn("Metrics: messageFailedSendCount {}", Long.valueOf(this.messageFailedSendCount));
                    log.error("Retryable Exception closed producer after {} tries", Long.valueOf(j));
                    return;
                }
                if (j == 0) {
                    log.debug("Retrying event warning - count is {}", Long.valueOf(j));
                    if (this.auditLog != null) {
                        this.auditLog.logMessage("publishEvent", KafkaOpenMetadataTopicConnectorAuditCode.EVENT_SEND_IN_ERROR_LOOP.getMessageDefinition(this.topicName, Long.toString(this.messageSendCount), Long.toString(getSendBufferSize()), e3.getMessage()));
                    }
                }
                j++;
            } catch (Exception e4) {
                if (this.producer != null) {
                    this.producer.close();
                    this.producer = null;
                }
                log.warn("Closed producer due to Exception in sendEvent {}", e4.getMessage());
                if (e4 instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                this.messageFailedSendCount++;
                log.warn("Metrics: messageFailedSendCount {}", Long.valueOf(this.messageFailedSendCount));
                throw new ConnectorCheckedException(KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(e4.getClass().getName(), this.topicName, e4.getMessage()), getClass().getName(), "publishEvent", e4);
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        String str = this.topicName + "/" + Thread.currentThread().getName();
        String str2 = str + ":run";
        Thread.currentThread().setName(str);
        if (this.auditLog != null) {
            this.auditLog.logMessage(str2, KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_START.getMessageDefinition(this.topicName, String.valueOf(this.sendBuffer.size())), this.producerProperties.toString());
        }
        log.info("Main loop started for topic {}", this.topicName);
        while (isRunning()) {
            try {
                String event = getEvent();
                if (event == null) {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } else {
                    log.debug("Processing buffered events");
                    while (event != null) {
                        publishEvent(event);
                        event = getEvent();
                    }
                }
            } catch (InterruptedException e) {
                log.debug("Woken up from sleep ");
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                log.warn("Bad exception from sending events: {}", e2.getMessage());
                if (!isExceptionRetryable(e2)) {
                    break;
                }
                log.debug("Trying to recover");
                recoverAfterError();
            }
        }
        log.info("Exiting main loop for topic {} & cleaning up", this.topicName);
        if (this.producer != null) {
            log.debug("");
            this.producer.close();
            this.producer = null;
        }
        if (this.auditLog != null) {
            this.auditLog.logMessage(str2, KafkaOpenMetadataTopicConnectorAuditCode.KAFKA_PRODUCER_SHUTDOWN.getMessageDefinition(this.topicName, Integer.toString(getSendBufferSize()), Long.toString(this.messageSendCount)), this.producerProperties.toString());
        }
    }

    private void putEvent(String str) {
        this.inmemoryPutMessageCount++;
        log.debug("Metrics: inmemoryPutMessageCount {}", Long.valueOf(this.inmemoryPutMessageCount));
        log.debug("Metrics: sendBufferSize {}", Integer.valueOf(this.sendBuffer.size()));
        this.sendBuffer.add(str);
    }

    private int getSendBufferSize() {
        return this.sendBuffer.size();
    }

    private String getEvent() {
        if (this.sendBuffer.isEmpty()) {
            return null;
        }
        return this.sendBuffer.remove(0);
    }

    public void sendEvent(String str) {
        putEvent(str);
    }

    protected void recoverAfterError() {
        log.info("Waiting {} seconds to recover", 10L);
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            log.debug("Interrupted while recovering with exception: {}", e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    public void safeCloseProducer() {
        stopRunning();
    }

    private boolean isRunning() {
        return this.running;
    }

    private synchronized void stopRunning() {
        this.running = false;
    }

    private boolean isExceptionRetryable(Exception exc) {
        while (true) {
            Throwable cause = exc.getCause();
            if (cause == null) {
                return false;
            }
            if (cause instanceof RetriableException) {
                return true;
            }
            exc = new Exception(exc.getCause());
        }
    }
}
