package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.frameworks.auditlog.AuditLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.class */
public class KafkaOpenMetadataEventConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private final AuditLog auditLog;
    private final long recoverySleepTimeSec;
    private final long pollTimeout;
    private final long maxQueueSize;
    private KafkaConsumer<String, String> consumer;
    private final String topicToSubscribe;
    private final String localServerId;
    private final KafkaOpenMetadataTopicConnector connector;
    private final long maxMsBetweenPolls;
    private final long consumerTimeoutPreventionSafetyWindowMs;
    private final long messageProcessingStatusCheckIntervalMs;
    private final long messageProcessingTimeoutMs;
    private final boolean isAutoCommitEnabled;
    private long nextMessageProcessingStatusCheckTime = System.currentTimeMillis();
    private long maxNextPollTimestampToAvoidConsumerTimeout = 0;
    private boolean initialPartitionAssignment = true;
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets = new ConcurrentHashMap();
    private final Map<TopicPartition, BlockingDeque<KafkaIncomingEvent>> unprocessedEventQueues = new ConcurrentHashMap();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final long startTime = System.currentTimeMillis();
    private long countIgnoredMessages = 0;
    private long countReceivedMessages = 0;
    private long countCommits = 0;
    private long countMessagesToProcess = 0;
    private long countMessagesFailedToProcess = 0;

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        AuditLog auditLog;

        public HandleRebalance(AuditLog auditLog) {
            this.auditLog = auditLog;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            try {
                if (KafkaOpenMetadataEventConsumer.this.initialPartitionAssignment) {
                    KafkaOpenMetadataEventConsumer.log.debug("Received initial PartitionsAssigned event");
                    long size = collection.size();
                    if (size != 1) {
                        KafkaOpenMetadataEventConsumer.log.warn("Received PartitionsAssigned event with {} partitions. This is not supported.", Long.valueOf(size));
                    } else {
                        KafkaOpenMetadataEventConsumer.this.initialPartitionAssignment = false;
                        TopicPartition next = collection.iterator().next();
                        int partition = next.partition();
                        String str = next.topic();
                        long j = KafkaOpenMetadataEventConsumer.this.startTime;
                        KafkaOpenMetadataEventConsumer.log.info("Querying for offset by timestamp: {}", Long.valueOf(j));
                        OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) KafkaOpenMetadataEventConsumer.this.consumer.offsetsForTimes(Collections.singletonMap(next, Long.valueOf(j))).get(next);
                        if (offsetAndTimestamp != null) {
                            long offset = offsetAndTimestamp.offset();
                            KafkaOpenMetadataEventConsumer.log.info("Earliest offset found for {} is {}", Long.valueOf(j), Long.valueOf(offsetAndTimestamp.timestamp()));
                            long position = KafkaOpenMetadataEventConsumer.this.consumer.position(next);
                            if (position > offset) {
                                KafkaOpenMetadataEventConsumer.log.info("Seeking to {} for partition {} and topic {} as current offset {} is too late", new Object[]{Long.valueOf(offset), Integer.valueOf(partition), str, Long.valueOf(position)});
                                KafkaOpenMetadataEventConsumer.this.consumer.seek(next, offset);
                            } else {
                                KafkaOpenMetadataEventConsumer.log.info("Not Seeking to {} for partition {} and topic {} as current offset {} is older", new Object[]{Long.valueOf(offset), Integer.valueOf(partition), str, Long.valueOf(position)});
                            }
                        } else {
                            KafkaOpenMetadataEventConsumer.log.info("No missed events found for partition {} and topic {}", Integer.valueOf(partition), str);
                        }
                    }
                } else {
                    KafkaOpenMetadataEventConsumer.log.debug("PartitionsAssigned Event - no action needed");
                }
            } catch (Exception e) {
                KafkaOpenMetadataEventConsumer.log.info("Error correcting seek position, continuing with defaults. Exception: {}", e.getMessage());
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            if (KafkaOpenMetadataEventConsumer.this.currentOffsets.isEmpty()) {
                KafkaOpenMetadataEventConsumer.log.debug("PartitionsRevoked Event - no action needed");
                return;
            }
            KafkaOpenMetadataEventConsumer.log.info("Lost partitions in rebalance. Committing current offsets: {}", KafkaOpenMetadataEventConsumer.this.currentOffsets);
            try {
                KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
            } catch (Exception e) {
                if (this.auditLog != null) {
                    this.auditLog.logException("onPartitionsRevoked.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(e.getClass().getName(), KafkaOpenMetadataEventConsumer.this.topicToSubscribe, e.getMessage()), e);
                }
            } catch (CommitFailedException e2) {
                if (this.auditLog != null) {
                    this.auditLog.logMessage("onPartitionsRevoked.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.FAILED_TO_COMMIT_CONSUMED_EVENTS.getMessageDefinition());
                }
            } catch (WakeupException e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaOpenMetadataEventConsumer(String str, String str2, KafkaOpenMetadataEventConsumerConfiguration kafkaOpenMetadataEventConsumerConfiguration, Properties properties, KafkaOpenMetadataTopicConnector kafkaOpenMetadataTopicConnector, AuditLog auditLog) {
        this.auditLog = auditLog;
        this.consumer = new KafkaConsumer<>(properties);
        this.topicToSubscribe = str;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), new HandleRebalance(auditLog));
        this.connector = kafkaOpenMetadataTopicConnector;
        this.localServerId = str2;
        if (auditLog != null) {
            auditLog.logMessage("initialize", KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES.getMessageDefinition(Integer.toString(properties.size()), str), properties.toString());
        }
        this.maxMsBetweenPolls = new KafkaConfigurationWrapper(properties).getMaxPollIntervalMs();
        this.recoverySleepTimeSec = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.RECOVERY_SLEEP_TIME);
        this.maxQueueSize = kafkaOpenMetadataEventConsumerConfiguration.getIntProperty(KafkaOpenMetadataEventConsumerProperty.MAX_QUEUE_SIZE);
        this.consumerTimeoutPreventionSafetyWindowMs = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_TIMEOUT_PREVENTION_SAFETY_WINDOW_MS);
        this.pollTimeout = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.POLL_TIMEOUT);
        this.isAutoCommitEnabled = getBooleanProperty(properties, "enable.auto.commit", false);
        this.messageProcessingStatusCheckIntervalMs = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.COMMIT_CHECK_INTERVAL_MS);
        long longProperty = kafkaOpenMetadataEventConsumerConfiguration.getLongProperty(KafkaOpenMetadataEventConsumerProperty.CONSUMER_EVENT_PROCESSING_TIMEOUT_MINS);
        this.messageProcessingTimeoutMs = longProperty < 0 ? longProperty : TimeUnit.MILLISECONDS.convert(longProperty, TimeUnit.MINUTES);
    }

    private static boolean getBooleanProperty(Properties properties, String str, boolean z) {
        String property = properties.getProperty(str);
        return property == null ? z : Boolean.parseBoolean(property);
    }

    private void updateNextMaxPollTimestamp() {
        this.maxNextPollTimestampToAvoidConsumerTimeout = (System.currentTimeMillis() + this.maxMsBetweenPolls) - this.consumerTimeoutPreventionSafetyWindowMs;
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setName(this.topicToSubscribe + "/" + Thread.currentThread().getName());
        log.info("Main loop started for topic {}", this.topicToSubscribe);
        while (isRunning()) {
            try {
                try {
                    checkForFullyProcessedMessagesIfNeeded();
                    boolean z = System.currentTimeMillis() > this.maxNextPollTimestampToAvoidConsumerTimeout;
                    int numberOfUnprocessedEvents = this.connector.getNumberOfUnprocessedEvents();
                    if (z || numberOfUnprocessedEvents <= this.maxQueueSize) {
                        updateNextMaxPollTimestamp();
                        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.pollTimeout));
                        log.debug("Found records: {}", Integer.valueOf(poll.count()));
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            String str = (String) consumerRecord.value();
                            log.debug("Received message: {}", str);
                            this.countReceivedMessages++;
                            log.debug("Metrics: receivedMessages: {}", Long.valueOf(this.countReceivedMessages));
                            KafkaIncomingEvent kafkaIncomingEvent = new KafkaIncomingEvent(str, consumerRecord.offset());
                            String str2 = (String) consumerRecord.key();
                            String str3 = (String) consumerRecord.value();
                            if (this.localServerId.equals(str2)) {
                                log.debug("Ignoring message with key: {} and value: {}", str2, str3);
                                this.countIgnoredMessages++;
                                log.debug("Metrics: ignoredMessages: {}", Long.valueOf(this.countIgnoredMessages));
                            } else {
                                try {
                                    addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), kafkaIncomingEvent);
                                    this.connector.distributeToListeners(kafkaIncomingEvent);
                                    this.countMessagesToProcess++;
                                    log.debug("Metrics: messagesToProcess: {}", Long.valueOf(this.countMessagesToProcess));
                                } catch (Exception e) {
                                    this.countMessagesFailedToProcess++;
                                    log.debug("Metrics: messagesFailedToProcess: {}", Long.valueOf(this.countMessagesFailedToProcess));
                                    log.warn("Error distributing inbound event: {}", e.getMessage());
                                    if (this.auditLog != null) {
                                        this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT.getMessageDefinition(this.topicToSubscribe, e.getClass().getName(), str, e.getMessage()), e);
                                    }
                                }
                            }
                            if (this.isAutoCommitEnabled) {
                                this.currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                                this.countCommits++;
                                log.debug("Metrics: messageCommits: {}", Long.valueOf(this.countCommits));
                            }
                        }
                        awaitNextPollingTime();
                    } else {
                        log.debug("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", Integer.valueOf(numberOfUnprocessedEvents), Long.valueOf(this.maxQueueSize));
                        awaitNextPollingTime();
                        awaitNextPollingTime();
                    }
                } catch (Exception e2) {
                    log.warn("Unexpected error: {}", e2.getMessage());
                    if (this.auditLog != null) {
                        this.auditLog.logException("run", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT.getMessageDefinition(this.topicToSubscribe, e2.getClass().getName(), e2.getMessage()), e2);
                    }
                    recoverAfterError();
                    awaitNextPollingTime();
                } catch (WakeupException e3) {
                    log.debug("Received wakeup call, proceeding with graceful shutdown");
                    awaitNextPollingTime();
                }
            } catch (Throwable th) {
                awaitNextPollingTime();
                throw th;
            }
        }
        if (this.consumer != null) {
            try {
                if (!checkForFullyProcessedMessages()) {
                    if (!this.isAutoCommitEnabled) {
                        int numberOfUnprocessedMessages = getNumberOfUnprocessedMessages();
                        if (numberOfUnprocessedMessages > 0) {
                            log.warn("Consumer shut down before all message processing completed! unprocessed messages: {}", Integer.valueOf(numberOfUnprocessedMessages));
                        } else {
                            log.info("All messages processed.  Consumer is shutting down.");
                        }
                    }
                    log.info("Committing current offset {} before shutdown.", this.currentOffsets);
                    try {
                        this.consumer.commitSync(this.currentOffsets);
                    } catch (WakeupException e4) {
                    } catch (Exception e5) {
                        if (this.auditLog != null) {
                            this.auditLog.logException("consumer.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(e5.getClass().getName(), this.topicToSubscribe, e5.getMessage()), e5);
                        }
                    }
                }
                this.consumer = null;
            } finally {
                this.consumer.close();
            }
        }
        log.info("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);
    }

    private void addUnprocessedEvent(int i, String str, KafkaIncomingEvent kafkaIncomingEvent) {
        if (this.isAutoCommitEnabled) {
            return;
        }
        TopicPartition topicPartition = new TopicPartition(str, i);
        BlockingDeque<KafkaIncomingEvent> blockingDeque = this.unprocessedEventQueues.get(topicPartition);
        if (blockingDeque == null) {
            blockingDeque = new LinkedBlockingDeque();
            this.unprocessedEventQueues.put(topicPartition, blockingDeque);
        }
        blockingDeque.add(kafkaIncomingEvent);
    }

    private void checkForFullyProcessedMessagesIfNeeded() {
        if (!this.isAutoCommitEnabled && System.currentTimeMillis() >= this.nextMessageProcessingStatusCheckTime) {
            checkForFullyProcessedMessages();
            this.nextMessageProcessingStatusCheckTime = System.currentTimeMillis() + this.messageProcessingStatusCheckIntervalMs;
        }
    }

    private boolean checkForFullyProcessedMessages() {
        if (this.isAutoCommitEnabled) {
            return false;
        }
        log.debug("Checking for fully processed messages whose offsets need to be committed");
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, BlockingDeque<KafkaIncomingEvent>> entry : this.unprocessedEventQueues.entrySet()) {
            KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue = removeFullyProcessedEventsFromBeginningOfQueue(entry.getValue());
            if (removeFullyProcessedEventsFromBeginningOfQueue != null) {
                hashMap.put(entry.getKey(), new OffsetAndMetadata(removeFullyProcessedEventsFromBeginningOfQueue.getOffset() + 1));
            }
        }
        if (hashMap.isEmpty()) {
            return false;
        }
        this.currentOffsets.putAll(hashMap);
        log.debug("Committing: {}", hashMap);
        try {
            this.consumer.commitSync(hashMap);
            return true;
        } catch (WakeupException e) {
            return false;
        } catch (Exception e2) {
            if (this.auditLog == null) {
                return false;
            }
            this.auditLog.logException("checkForFullyProcessedMessages.commitSync", KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_COMMITTING_OFFSETS.getMessageDefinition(e2.getClass().getName(), this.topicToSubscribe, e2.getMessage()), e2);
            return false;
        }
    }

    private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent kafkaIncomingEvent = null;
        while (isFirstEventFullyProcessed(queue)) {
            kafkaIncomingEvent = queue.remove();
            log.debug("Message with offset {} has been fully processed.", Long.valueOf(kafkaIncomingEvent.getOffset()));
            this.countCommits++;
            log.debug("Metrics: commits: {}", Long.valueOf(this.countCommits));
        }
        KafkaIncomingEvent peek = queue.peek();
        if (peek != null) {
            log.debug("Waiting for completing of processing of message with offset {}", Long.valueOf(peek.getOffset()));
        }
        return kafkaIncomingEvent;
    }

    private boolean isFirstEventFullyProcessed(Queue<KafkaIncomingEvent> queue) {
        KafkaIncomingEvent peek = queue.peek();
        if (peek == null) {
            return false;
        }
        if (this.messageProcessingTimeoutMs < 0 || !peek.hasTimeElapsedSinceCreation(this.messageProcessingTimeoutMs)) {
            return peek.isFullyProcessed();
        }
        log.warn("Processing of message at offset {} timed out.", Long.valueOf(peek.getOffset()));
        return true;
    }

    private int getNumberOfUnprocessedMessages() {
        if (this.isAutoCommitEnabled) {
            return 0;
        }
        int i = 0;
        Iterator<BlockingDeque<KafkaIncomingEvent>> it = this.unprocessedEventQueues.values().iterator();
        while (it.hasNext()) {
            if (!it.next().isEmpty()) {
                i++;
            }
        }
        return i;
    }

    private void awaitNextPollingTime() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            log.debug("Interrupted whilst sleeping:");
            Thread.currentThread().interrupt();
        }
    }

    private void recoverAfterError() {
        log.info("Waiting {} seconds to recover", Long.valueOf(this.recoverySleepTimeSec));
        try {
            Thread.sleep(this.recoverySleepTimeSec * 1000);
        } catch (InterruptedException e) {
            log.debug("Interrupted while recovering");
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void safeCloseConsumer() {
        log.debug("Closing consumer");
        stopRunning();
        if (this.consumer != null) {
            log.debug("Waking up consumer thread");
            this.consumer.wakeup();
        }
    }

    private boolean isRunning() {
        return this.running.get();
    }

    private void stopRunning() {
        log.debug("Set running to false");
        this.running.set(false);
    }
}
