package org.odpi.openmetadata.adapters.eventbus.topic.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditLog;
import org.odpi.openmetadata.repositoryservices.auditlog.OMRSAuditingComponent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer.class */
public class KafkaOpenMetadataEventConsumer implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaOpenMetadataEventConsumer.class);
    private static final OMRSAuditLog auditLog = new OMRSAuditLog(OMRSAuditingComponent.OPEN_METADATA_TOPIC_CONNECTOR);
    private static final long recoverySleepTimeSec = 10;
    private static final long defaultPollTimeout = 1000;
    private KafkaConsumer<String, String> consumer;
    private String topicToSubscribe;
    private String localServerId;
    private KafkaOpenMetadataTopicConnector connector;
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
    private Boolean running = true;

    /* loaded from: input_file:org/odpi/openmetadata/adapters/eventbus/topic/kafka/KafkaOpenMetadataEventConsumer$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            KafkaOpenMetadataEventConsumer.log.info("Lost partitions in rebalance. Committing current offsets:" + KafkaOpenMetadataEventConsumer.this.currentOffsets);
            KafkaOpenMetadataEventConsumer.this.consumer.commitSync(KafkaOpenMetadataEventConsumer.this.currentOffsets);
        }
    }

    public KafkaOpenMetadataEventConsumer(String str, String str2, Properties properties, KafkaOpenMetadataTopicConnector kafkaOpenMetadataTopicConnector) {
        this.consumer = new KafkaConsumer<>(properties);
        this.topicToSubscribe = str;
        this.consumer.subscribe(Collections.singletonList(this.topicToSubscribe), new HandleRebalance());
        this.connector = kafkaOpenMetadataTopicConnector;
        this.localServerId = str2;
        KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.SERVICE_CONSUMER_PROPERTIES;
        auditLog.logRecord("initialize", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(properties.toString()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
    }

    public void stop() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running.booleanValue()) {
            ConsumerRecords poll = this.consumer.poll(defaultPollTimeout);
            try {
                try {
                    log.debug("Found records: " + poll.count());
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        String str = (String) consumerRecord.value();
                        log.debug("Received message: " + str);
                        if (this.localServerId.equals(consumerRecord.key())) {
                            log.debug("Ignoring message with key: " + ((String) consumerRecord.key()) + " and value " + ((String) consumerRecord.value()));
                        } else {
                            try {
                                this.connector.distributeToListeners(str);
                            } catch (Exception e) {
                                log.error(String.format("Error distributing inbound event: %s", e.getMessage()), e);
                                KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_DISTRIBUTING_EVENT;
                                auditLog.logRecord("run", kafkaOpenMetadataTopicConnectorAuditCode.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode.getFormattedLogMessage(this.topicToSubscribe, e.getClass().getName(), str, e.getMessage()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode.getUserAction());
                            }
                        }
                        this.currentOffsets.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                    }
                    try {
                        Thread.sleep(defaultPollTimeout);
                    } catch (InterruptedException e2) {
                        log.error(String.format("Interruption error: %s", e2.getMessage()), e2);
                    }
                } catch (Throwable th) {
                    try {
                        Thread.sleep(defaultPollTimeout);
                    } catch (InterruptedException e3) {
                        log.error(String.format("Interruption error: %s", e3.getMessage()), e3);
                    }
                    throw th;
                }
            } catch (WakeupException e4) {
                log.debug("Received wakeup call, proceeding with graceful shutdown", e4);
                try {
                    Thread.sleep(defaultPollTimeout);
                } catch (InterruptedException e5) {
                    log.error(String.format("Interruption error: %s", e5.getMessage()), e5);
                }
            } catch (Exception e6) {
                log.error(String.format("Unexpected error: %s", e6.getMessage()), e6);
                KafkaOpenMetadataTopicConnectorAuditCode kafkaOpenMetadataTopicConnectorAuditCode2 = KafkaOpenMetadataTopicConnectorAuditCode.EXCEPTION_RECEIVING_EVENT;
                auditLog.logRecord("run", kafkaOpenMetadataTopicConnectorAuditCode2.getLogMessageId(), kafkaOpenMetadataTopicConnectorAuditCode2.getSeverity(), kafkaOpenMetadataTopicConnectorAuditCode2.getFormattedLogMessage(this.topicToSubscribe, e6.getClass().getName(), e6.getMessage()), (String) null, kafkaOpenMetadataTopicConnectorAuditCode2.getSystemAction(), kafkaOpenMetadataTopicConnectorAuditCode2.getUserAction());
                recoverAfterError();
                try {
                    Thread.sleep(defaultPollTimeout);
                } catch (InterruptedException e7) {
                    log.error(String.format("Interruption error: %s", e7.getMessage()), e7);
                }
            }
        }
    }

    protected void recoverAfterError() {
        log.info(String.format("Waiting %s seconds to recover", Long.valueOf(recoverySleepTimeSec)));
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            log.debug("Interrupted while recovering", e);
        }
    }

    public void safeCloseConsumer() {
        if (this.consumer != null) {
            try {
                stopConsumption();
                this.consumer.commitSync(this.currentOffsets);
                this.consumer = null;
            } finally {
                this.consumer.close();
            }
        }
    }

    public void stopConsumption() {
        synchronized (this.running) {
            this.running = false;
        }
    }
}
