package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.class */
public class KafkaOpenMetadataEventConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private AuditLog auditLog;
    private final long recoverySleepTimeSec;
    private final long pollTimeout;
    private final long maxQueueSize;
    private KafkaConsumer<String, String> consumer;
    private String topicToSubscribe;
    private String localServerId;
    private KafkaOpenMetadataTopicConnector connector;
    private final long maxMsBetweenPolls;
    private final long consumerTimeoutPreventionSafetyWindowMs;
    private final long messageProcessingStatusCheckIntervalMs;
    private final long messageProcessingTimeoutMs;
    private final boolean isAutoCommitEnabled;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = Collections.synchronizedMap(new HashMap());
    private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
    private long maxNextPollTimestampToAvoidConsumerTimeout = 0;
    private Map<TopicPartition, Queue<KafkaIncomingEvent>> unprocessedEventQueues = new HashMap();
    private Boolean running = true;

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaOpenMetadataEventConsumer.log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
            KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaOpenMetadataEventConsumer(String str, String str2, KafkaOpenMetadataEventConsumerConfiguration kafkaOpenMetadataEventConsumerConfiguration, Properties properties, KafkaOpenMetadataTopicConnector kafkaOpenMetadataTopicConnector, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.consumer = new KafkaConsumer<>(properties);
        this.topicToSubscribe = str;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), new HandleRebalance());
        this.connector = kafkaOpenMetadataTopicConnector;
        this.localServerId = str2;
        auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES.getMessageDefinition(Integer.toString(properties.size()), str), properties.toString());
        this.maxMsBetweenPolls = new KafkaConfigurationWrapper(properties).getMaxPollIntervalMs();
        this.recoverySleepTimeSec = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.RECOVERY_SLEEP_TIME);
        this.maxQueueSize = kafkaOpenMetadataEventConsumerConfiguration.getIntProperty(KafkaOpenMetadataEventConsumerProperty.MAX_QUEUE_SIZE);
        this.consumerTimeoutPreventionSafetyWindowMs = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_TIMEOUT_PREVENTION_SAFETY_WINDOW_MS);
        this.pollTimeout = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.POLL_TIMEOUT);
        this.isAutoCommitEnabled = getBooleanProperty(properties, "enable.auto.commit", false);
        this.messageProcessingStatusCheckIntervalMs = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
        long longProperty = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
        this.messageProcessingTimeoutMs = longProperty < 0 ? longProperty : TimeUnit.MILLISECONDS.convert(longProperty, TimeUnit.MINUTES);
    }

    private static boolean getBooleanProperty(Properties properties, String str, boolean z) {
        String property = properties.getProperty(str);
        return property == null ? z : Boolean.valueOf(property).booleanValue();
    }

    public void stop() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private void updateNextMaxPollTimestamp() {
        this.maxNextPollTimestampToAvoidConsumerTimeout = (System.currentTimeMillis() + this.maxMsBetweenPolls) - this.consumerTimeoutPreventionSafetyWindowMs;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (isRunning()) {
            try {
                try {
                    try {
                        checkForFullyProcessedMessagesIfNeeded();
                        boolean z = System.currentTimeMillis() > this.maxNextPollTimestampToAvoidConsumerTimeout;
                        int numberOfUnprocessedEvents = this.connector.getNumberOfUnprocessedEvents();
                        if (z || numberOfUnprocessedEvents <= this.maxQueueSize) {
                            updateNextMaxPollTimestamp();
                            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.pollTimeout));
                            log.debug("Found records: " + poll.count());
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                String str = (String) consumerRecord.value();
                                log.debug("Received message: " + str);
                                KafkaIncomingEvent kafkaIncomingEvent = new KafkaIncomingEvent(str, consumerRecord.offset());
                                if (this.localServerId.equals(consumerRecord.key())) {
                                    log.debug("Ignoring message with key: " + ((String) consumerRecord.key()) + " and value " + ((String) consumerRecord.value()));
                                } else {
                                    try {
                                        addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), kafkaIncomingEvent);
                                        this.connector.distributeToListeners(kafkaIncomingEvent);
                                    } catch (Exception e) {
                                        log.error(String.format("Error distributing inbound event: %s", e.getMessage()), e);
                                        if (this.auditLog != null) {
                                            this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT.getMessageDefinition(this.topicToSubscribe, e.getClass().getName(), str, e.getMessage()), e);
                                        }
                                    }
                                }
                                if (this.isAutoCommitEnabled) {
                                    this.currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                                }
                            }
                            awaitNextPollingTime();
                        } else {
                            log.warn("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", Integer.valueOf(numberOfUnprocessedEvents), Long.valueOf(this.maxQueueSize));
                            awaitNextPollingTime();
                            awaitNextPollingTime();
                        }
                    } catch (Exception e2) {
                        log.error(String.format("Unexpected error: %s", e2.getMessage()), e2);
                        if (this.auditLog != null) {
                            this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT.getMessageDefinition(this.topicToSubscribe, e2.getClass().getName(), e2.getMessage()), e2);
                        }
                        recoverAfterError();
                        awaitNextPollingTime();
                    }
                } catch (WakeupException e3) {
                    log.debug("Received wakeup call, proceeding with graceful shutdown", e3);
                    awaitNextPollingTime();
                }
            } catch (Throwable th) {
                awaitNextPollingTime();
                throw th;
            }
        }
        if (this.consumer != null) {
            try {
                if (!checkForFullyProcessedMessages()) {
                    if (!this.isAutoCommitEnabled) {
                        int numberOfUnprocessedMessages = getNumberOfUnprocessedMessages();
                        if (numberOfUnprocessedMessages > 0) {
                            log.error("Consumer was shut down before all message processing has completed!  There are " + numberOfUnprocessedMessages + " messages whose processing is incomplete.");
                        } else {
                            log.info("All messages have been fully processed.  Consumer is shutting down safely.");
                        }
                    }
                    log.info("Committing current offsets before shutdown: " + this.currentOffsets);
                    this.consumer.commitSync(this.currentOffsets);
                }
                this.consumer = null;
            } finally {
                this.consumer.close();
            }
        }
    }

    private void addUnprocessedEvent(int i, String str, KafkaIncomingEvent kafkaIncomingEvent) {
        if (this.isAutoCommitEnabled) {
            return;
        }
        TopicPartition topicPartition = new TopicPartition(str, i);
        Queue<KafkaIncomingEvent> queue = this.unprocessedEventQueues.get(topicPartition);
        if (queue == null) {
            queue = new SynchronizedQueue(new ArrayDeque());
            synchronized (this.unprocessedEventQueues) {
                this.unprocessedEventQueues.put(topicPartition, queue);
            }
        }
        queue.add(kafkaIncomingEvent);
    }

    private boolean checkForFullyProcessedMessagesIfNeeded() {
        if (this.isAutoCommitEnabled || System.currentTimeMillis() < this.nextMessageProcessingStatusCheckTime) {
            return false;
        }
        boolean checkForFullyProcessedMessages = checkForFullyProcessedMessages();
        this.nextMessageProcessingStatusCheckTime = System.currentTimeMillis() + this.messageProcessingStatusCheckIntervalMs;
        return checkForFullyProcessedMessages;
    }

    private boolean checkForFullyProcessedMessages() {
        HashMap hashMap;
        if (this.isAutoCommitEnabled) {
            return false;
        }
        log.info("Checking for fully processed messages whose offsets need to be committed");
        synchronized (this.unprocessedEventQueues) {
            hashMap = new HashMap(this.unprocessedEventQueues);
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry entry : hashMap.entrySet()) {
            KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue = removeFullyProcessedEventsFromBeginningOfQueue((Queue) entry.getValue());
            if (removeFullyProcessedEventsFromBeginningOfQueue != null) {
                hashMap2.put(entry.getKey(), new OffsetAndMetadata(removeFullyProcessedEventsFromBeginningOfQueue.getOffset() + 1));
            }
        }
        if (hashMap2.isEmpty()) {
            return false;
        }
        this.currentOffsets.putAll(hashMap2);
        log.info("Committing: " + hashMap2);
        this.consumer.commitSync(hashMap2);
        return true;
    }

    private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent kafkaIncomingEvent = null;
        while (isFirstEventFullyProcessed(queue)) {
            kafkaIncomingEvent = queue.remove();
            log.info("Message with offset " + kafkaIncomingEvent.getOffset() + " has been fully processed.");
        }
        KafkaIncomingEvent peek = queue.peek();
        if (peek != null) {
            log.info("Waiting for completing of processing of message with offset " + peek.getOffset());
        }
        return kafkaIncomingEvent;
    }

    private boolean isFirstEventFullyProcessed(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent peek = queue.peek();
        if (peek == null) {
            return false;
        }
        if (this.messageProcessingTimeoutMs < 0 || !peek.hasTimeElapsedSinceCreation(this.messageProcessingTimeoutMs)) {
            return peek.isFullyProcessed();
        }
        log.warn("Processing of message at offset " + peek.getOffset() + " timed out.");
        return true;
    }

    private int getNumberOfUnprocessedMessages() {
        HashMap hashMap;
        if (this.isAutoCommitEnabled) {
            return 0;
        }
        synchronized (this.unprocessedEventQueues) {
            hashMap = new HashMap(this.unprocessedEventQueues);
        }
        int i = 0;
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            if (!((Queue) it.next()).isEmpty()) {
                i++;
            }
        }
        return i;
    }

    private void awaitNextPollingTime() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            log.error(String.format("Interruption error: %s", e.getMessage()), e);
        }
    }

    private void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", Long.valueOf(this.recoverySleepTimeSec)));
        try {
            Thread.sleep(this.recoverySleepTimeSec * 1000);
        } catch (InterruptedException e) {
            log.debug("Interrupted while recovering", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void safeCloseConsumer() {
        stopRunning();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private synchronized boolean isRunning() {
        return this.running.booleanValue();
    }

    private synchronized void stopRunning() {
        this.running = false;
    }
}
