package org.exploit.signalix.core;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.Closeable;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.exploit.signalix.annotation.KafkaEventListener;
import org.exploit.signalix.event.DeserializationFailureEvent;
import org.exploit.signalix.marker.Event;
import org.exploit.signalix.utils.UtilityMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/exploit/signalix/core/KafkaEventConsumer.class */
public class KafkaEventConsumer extends Thread implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaEventConsumer.class);
    private final AtomicBoolean stop = new AtomicBoolean(false);
    private final String id;
    private final KafkaEventManager eventManager;
    private final KafkaEventListener annotation;
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executor;

    public KafkaEventConsumer(String str, KafkaEventManager kafkaEventManager, KafkaEventListener kafkaEventListener) {
        this.id = str;
        this.eventManager = kafkaEventManager;
        this.annotation = kafkaEventListener;
        Map<String, Object> convertToMap = UtilityMethods.convertToMap(kafkaEventManager.getProperties());
        convertToMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        convertToMap.putIfAbsent("group.id", kafkaEventListener.groupId());
        convertToMap.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaEventListener.autoOffsetReset());
        convertToMap.putAll(UtilityMethods.convertToMap(kafkaEventListener.properties()));
        this.consumer = new KafkaConsumer<>(convertToMap);
        this.executor = Executors.newFixedThreadPool(kafkaEventListener.concurrency());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.consumer.subscribe(Set.of((Object[]) this.annotation.topics()));
        while (!this.stop.get()) {
            try {
                ConsumerRecords<String, String> poll = this.consumer.poll(Duration.ofMillis(this.annotation.pollMillis()));
                CountDownLatch countDownLatch = new CountDownLatch(poll.count());
                Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord<String, String> next = it.next();
                    this.executor.execute(() -> {
                        String str = (String) next.value();
                        try {
                            try {
                                this.eventManager.getEventScope().innerCall((Event) this.eventManager.getEventObjectMapper().readValue(str, Event.class));
                                countDownLatch.countDown();
                            } catch (JsonProcessingException e) {
                                LOGGER.error("Error occurred while deserializing event: ", (Throwable) e);
                                this.eventManager.getEventScope().innerCall(new DeserializationFailureEvent(str, e));
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            countDownLatch.countDown();
                            throw th;
                        }
                    });
                }
                countDownLatch.await();
                this.consumer.commitSync();
            } catch (WakeupException e) {
                LOGGER.info("Received WAKEUP for poll kafka task");
            } catch (Exception e2) {
                LOGGER.error("Unknown error occurred while fetching from Kafka: ", (Throwable) e2);
            }
        }
    }

    public String getConsumerId() {
        return this.id;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.stop.set(true);
        this.consumer.wakeup();
        this.consumer.close();
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(this.annotation.terminateTimeout(), TimeUnit.MILLISECONDS)) {
                this.executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}
