package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConfigurationWrapper;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.class */
public class KafkaOpenMetadataEventConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private OMRSAuditLog auditLog;
    private final long recoverySleepTimeSec;
    private final long pollTimeout;
    private final long maxQueueSize;
    private KafkaOpenMetadataEventConsumerConfiguration config;
    private KafkaConsumer<String, String> consumer;
    private String topicToSubscribe;
    private String localServerId;
    private KafkaOpenMetadataTopicConnector connector;
    private final long maxMsBetweenPolls;
    private final long consumerTimeoutPreventionSafetyWindowMs;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
    private long maxNextPollTimestampToAvoidConsumerTimeout = 0;
    private Boolean running = true;

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaOpenMetadataEventConsumer.log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
            KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
        }
    }

    public KafkaOpenMetadataEventConsumer(String str, String str2, KafkaOpenMetadataEventConsumerConfiguration kafkaOpenMetadataEventConsumerConfiguration, Properties properties, KafkaOpenMetadataTopicConnector kafkaOpenMetadataTopicConnector, OMRSAuditLog oMRSAuditLog) {
        this.auditLog = oMRSAuditLog;
        this.consumer = new KafkaConsumer<>(properties);
        this.topicToSubscribe = str;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), new HandleRebalance());
        this.connector = kafkaOpenMetadataTopicConnector;
        this.localServerId = str2;
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES;
        oMRSAuditLog.logRecord("initialize", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(str, kafkaOpenMetadataTopicConnector.getPrintableProperties(properties)), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
        this.maxMsBetweenPolls = new KafkaConfigurationWrapper(properties).getMaxPollIntervalMs();
        this.recoverySleepTimeSec = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.RECOVERY_SLEEP_TIME);
        this.maxQueueSize = kafkaOpenMetadataEventConsumerConfiguration.getIntProperty(KafkaOpenMetadataEventConsumerProperty.MAX_QUEUE_SIZE);
        this.consumerTimeoutPreventionSafetyWindowMs = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_TIMEOUT_PREVENTION_SAFETY_WINDOW_MS);
        this.pollTimeout = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.POLL_TIMEOUT);
    }

    public void stop() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private void updateNextMaxPollTimestamp() {
        this.maxNextPollTimestampToAvoidConsumerTimeout = (System.currentTimeMillis() + this.maxMsBetweenPolls) - this.consumerTimeoutPreventionSafetyWindowMs;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (isRunning()) {
            try {
                try {
                    try {
                        boolean z = System.currentTimeMillis() > this.maxNextPollTimestampToAvoidConsumerTimeout;
                        int numberOfUnprocessedEvents = this.connector.getNumberOfUnprocessedEvents();
                        if (z || numberOfUnprocessedEvents <= this.maxQueueSize) {
                            updateNextMaxPollTimestamp();
                            ConsumerRecords poll = this.consumer.poll(this.pollTimeout);
                            log.debug("Found records: " + poll.count());
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                String str = (String) consumerRecord.value();
                                log.debug("Received message: " + str);
                                if (this.localServerId.equals(consumerRecord.key())) {
                                    log.debug("Ignoring message with key: " + ((String) consumerRecord.key()) + " and value " + ((String) consumerRecord.value()));
                                } else {
                                    try {
                                        this.connector.distributeToListeners(str);
                                    } catch (Exception e) {
                                        log.error(String.format("Error distributing inbound event: %s", e.getMessage()), e);
                                        if (this.auditLog != null) {
                                            KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT;
                                            this.auditLog.logRecord("run", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(this.topicToSubscribe, e.getClass().getName(), str, e.getMessage()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
                                        }
                                    }
                                }
                                this.currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                            }
                            awaitNextPollingTime();
                        } else {
                            log.warn("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", Integer.valueOf(numberOfUnprocessedEvents), Long.valueOf(this.maxQueueSize));
                            awaitNextPollingTime();
                            awaitNextPollingTime();
                        }
                    } catch (WakeupException e2) {
                        log.debug("Received wakeup call, proceeding with graceful shutdown", e2);
                        awaitNextPollingTime();
                    }
                } catch (Exception e3) {
                    log.error(String.format("Unexpected error: %s", e3.getMessage()), e3);
                    if (this.auditLog != null) {
                        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode2 = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT;
                        this.auditLog.logRecord("run", kafkaOpenMetadataTopicConnectorAuditCode2.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode2.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode2.getFormattedLogMessage(this.topicToSubscribe, e3.getClass().getName(), e3.getMessage()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode2.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode2.getUserAction());
                    }
                    recoverAfterError();
                    awaitNextPollingTime();
                }
            } catch (Throwable th) {
                awaitNextPollingTime();
                throw th;
            }
        }
        if (this.consumer != null) {
            try {
                this.consumer.commitSync(this.currentOffsets);
                this.consumer.close();
                this.consumer = null;
            } catch (Throwable th2) {
                this.consumer.close();
                throw th2;
            }
        }
    }

    private void awaitNextPollingTime() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            log.error(String.format("Interruption error: %s", e.getMessage()), e);
        }
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", Long.valueOf(this.recoverySleepTimeSec)));
        try {
            Thread.sleep(this.recoverySleepTimeSec * 1000);
        } catch (InterruptedException e) {
            log.debug("Interrupted while recovering", e);
        }
    }

    public void safeCloseConsumer() {
        stopRunning();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private synchronized boolean isRunning() {
        return this.running.booleanValue();
    }

    private synchronized void stopRunning() {
        this.running = false;
    }
}
