package org.jlab.kafka.eventsource;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/jlab/kafka/eventsource/EventSourceTable.class */
public class EventSourceTable<K, V> implements AutoCloseable {
    private final KafkaConsumer<K, V> consumer;
    private final EventSourceConfig config;
    private final Logger log = LoggerFactory.getLogger(EventSourceTable.class);
    private final Set<EventSourceListener<K, V>> listeners = Collections.newSetFromMap(new ConcurrentHashMap());
    private long endOffset = 0;
    private boolean endReached = false;
    private AtomicReference<CONSUMER_STATE> consumerState = new AtomicReference<>(CONSUMER_STATE.INITIALIZING);
    private final CountDownLatch highWaterSignal = new CountDownLatch(1);
    private ExecutorService pollExecutor = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jlab/kafka/eventsource/EventSourceTable$CONSUMER_STATE.class */
    public enum CONSUMER_STATE {
        INITIALIZING,
        RUNNING,
        CLOSED
    }

    public EventSourceTable(Properties properties) {
        this.config = new EventSourceConfig(properties);
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_BOOTSTRAP_SERVERS, this.config.getString(EventSourceConfig.EVENT_SOURCE_BOOTSTRAP_SERVERS));
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_GROUP, this.config.getString(EventSourceConfig.EVENT_SOURCE_GROUP));
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_KEY_DESERIALIZER, this.config.getString(EventSourceConfig.EVENT_SOURCE_KEY_DESERIALIZER));
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_VALUE_DESERIALIZER, this.config.getString(EventSourceConfig.EVENT_SOURCE_VALUE_DESERIALIZER));
        this.consumer = new KafkaConsumer<>(properties2);
    }

    public void addListener(EventSourceListener<K, V> eventSourceListener) {
        this.listeners.add(eventSourceListener);
    }

    public void removeListener(EventSourceListener<K, V> eventSourceListener) {
        this.listeners.remove(eventSourceListener);
    }

    public boolean awaitHighWaterOffset(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.highWaterSignal.await(j, timeUnit);
    }

    public void start() throws IllegalStateException {
        if (!this.consumerState.compareAndSet(CONSUMER_STATE.INITIALIZING, CONSUMER_STATE.RUNNING)) {
            throw new IllegalStateException("Start has already been called, or the Table has already been closed");
        }
        this.pollExecutor = Executors.newSingleThreadExecutor();
        this.pollExecutor.execute(new Runnable() { // from class: org.jlab.kafka.eventsource.EventSourceTable.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    EventSourceTable.this.init();
                    EventSourceTable.this.monitorChanges();
                } catch (WakeupException e) {
                    EventSourceTable.this.pollExecutor.shutdown();
                    if (EventSourceTable.this.consumerState.get() != CONSUMER_STATE.CLOSED) {
                        throw e;
                    }
                } finally {
                    EventSourceTable.this.consumer.close();
                }
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void init() {
        this.log.debug("subscribing to topic: {}", this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC));
        this.consumer.subscribe(Collections.singletonList(this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC)), new ConsumerRebalanceListener() { // from class: org.jlab.kafka.eventsource.EventSourceTable.2
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                EventSourceTable.this.log.debug("Seeking to beginning of topic");
                if (collection.size() != 1) {
                    throw new IllegalStateException("We only support single partition Event Sourced topics at this time");
                }
                long longValue = EventSourceTable.this.config.getLong(EventSourceConfig.EVENT_SOURCE_RESUME_OFFSET).longValue();
                TopicPartition next = collection.iterator().next();
                if (longValue < 0) {
                    EventSourceTable.this.consumer.seekToBeginning(collection);
                } else {
                    EventSourceTable.this.consumer.seek(next, longValue);
                }
                Map endOffsets = EventSourceTable.this.consumer.endOffsets(collection);
                EventSourceTable.this.endOffset = ((Long) endOffsets.get(next)).longValue();
                if (EventSourceTable.this.endOffset == 0 || longValue >= EventSourceTable.this.endOffset) {
                    EventSourceTable.this.log.debug("No events at resumeOffset or empty topic");
                    EventSourceTable.this.endReached = true;
                }
            }
        });
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.schedule(new Runnable() { // from class: org.jlab.kafka.eventsource.EventSourceTable.3
            @Override // java.lang.Runnable
            public void run() {
                atomicBoolean.set(true);
            }
        }, this.config.getLong(EventSourceConfig.EVENT_SOURCE_HIGH_WATER_TIMEMOUT).longValue(), TimeUnit.valueOf(this.config.getString(EventSourceConfig.EVENT_SOURCE_HIGH_WATER_UNITS)));
        boolean booleanValue = this.config.getBoolean(EventSourceConfig.EVENT_SOURCE_COMPACTED_CACHE).booleanValue();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        while (!this.endReached && !atomicBoolean.get() && this.consumerState.get() == CONSUMER_STATE.RUNNING) {
            this.log.debug("polling for changes ({})", this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC));
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.config.getLong(EventSourceConfig.EVENT_SOURCE_POLL_MILLIS).longValue()));
            ArrayList arrayList = new ArrayList();
            if (poll.count() > 0) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    EventSourceRecord consumerToEvent = consumerToEvent(consumerRecord);
                    arrayList.add(consumerToEvent);
                    if (booleanValue) {
                        linkedHashMap.put(consumerRecord.key(), consumerToEvent);
                    }
                    if (consumerRecord.offset() + 1 == this.endOffset) {
                        this.log.debug("end of partition {} reached", Integer.valueOf(consumerRecord.partition()));
                        this.endReached = true;
                    }
                }
                notifyListenersChanges(arrayList, false);
            }
        }
        newSingleThreadScheduledExecutor.shutdown();
        if (atomicBoolean.get()) {
            notifyListenersTimeout();
        } else {
            notifyListenersHighWaterOffset(linkedHashMap);
        }
    }

    private void monitorChanges() {
        while (this.consumerState.get() == CONSUMER_STATE.RUNNING) {
            this.log.debug("polling for changes ({})", this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC));
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.config.getLong(EventSourceConfig.EVENT_SOURCE_POLL_MILLIS).longValue()));
            ArrayList arrayList = new ArrayList();
            if (poll.count() > 0) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    arrayList.add(consumerToEvent((ConsumerRecord) it.next()));
                }
                notifyListenersChanges(arrayList, true);
            }
        }
    }

    private void notifyListenersHighWaterOffset(LinkedHashMap<K, EventSourceRecord<K, V>> linkedHashMap) {
        Iterator<EventSourceListener<K, V>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().highWaterOffset(new LinkedHashMap<>(linkedHashMap));
        }
        this.highWaterSignal.countDown();
    }

    private void notifyListenersTimeout() {
        Iterator<EventSourceListener<K, V>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().highWaterOffsetTimeout();
        }
    }

    private void notifyListenersChanges(List<EventSourceRecord<K, V>> list, boolean z) {
        Iterator<EventSourceListener<K, V>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().batch(new ArrayList(list), z);
        }
    }

    private EventSourceRecord<K, V> consumerToEvent(ConsumerRecord<K, V> consumerRecord) {
        this.log.debug("Consumer to Event Record: {}={}", consumerRecord.key(), consumerRecord.value());
        return new EventSourceRecord<>(consumerRecord.key(), consumerRecord.value(), consumerRecord.offset(), consumerRecord.timestamp());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.consumerState.getAndSet(CONSUMER_STATE.CLOSED) == CONSUMER_STATE.INITIALIZING) {
            this.consumer.close();
        } else {
            this.consumer.wakeup();
        }
    }
}
