package org.jlab.jaws.eventsource;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/jlab/jaws/eventsource/EventSourceTable.class */
public class EventSourceTable<K, V> extends Thread implements AutoCloseable {
    private final KafkaConsumer<K, V> consumer;
    private final EventSourceConfig config;
    private final Logger log = LoggerFactory.getLogger(EventSourceTable.class);
    private final Set<EventSourceListener<K, V>> listeners = Collections.newSetFromMap(new ConcurrentHashMap());
    private long endOffset = 0;
    private boolean endReached = false;
    private AtomicReference<CONSUMER_STATE> consumerState = new AtomicReference<>(CONSUMER_STATE.INITIALIZING);
    private final HashMap<K, EventSourceRecord<K, V>> state = new HashMap<>();
    private final List<EventSourceRecord<K, V>> changes = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jlab/jaws/eventsource/EventSourceTable$CONSUMER_STATE.class */
    public enum CONSUMER_STATE {
        INITIALIZING,
        RUNNING,
        CLOSED
    }

    public EventSourceTable(Properties properties) {
        this.config = new EventSourceConfig(properties);
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_BOOTSTRAP_SERVERS, this.config.getString(EventSourceConfig.EVENT_SOURCE_BOOTSTRAP_SERVERS));
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_GROUP, this.config.getString(EventSourceConfig.EVENT_SOURCE_GROUP));
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_KEY_DESERIALIZER, this.config.getString(EventSourceConfig.EVENT_SOURCE_KEY_DESERIALIZER));
        properties2.setProperty(EventSourceConfig.EVENT_SOURCE_VALUE_DESERIALIZER, this.config.getString(EventSourceConfig.EVENT_SOURCE_VALUE_DESERIALIZER));
        this.consumer = new KafkaConsumer<>(properties2);
    }

    public void addListener(EventSourceListener<K, V> eventSourceListener) {
        this.listeners.add(eventSourceListener);
    }

    public void removeListener(EventSourceListener<K, V> eventSourceListener) {
        this.listeners.remove(eventSourceListener);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        boolean compareAndSet = this.consumerState.compareAndSet(CONSUMER_STATE.INITIALIZING, CONSUMER_STATE.RUNNING);
        if (!compareAndSet) {
            this.log.debug("We must already be closed!");
        }
        if (compareAndSet) {
            try {
                init();
                monitorChanges();
            } catch (WakeupException e) {
                if (this.consumerState.get() != CONSUMER_STATE.CLOSED) {
                    throw e;
                }
            } finally {
                this.consumer.close();
            }
        }
    }

    private void init() {
        this.log.debug("subscribing to topic: {}", this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC));
        this.consumer.subscribe(Collections.singletonList(this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC)), new ConsumerRebalanceListener() { // from class: org.jlab.jaws.eventsource.EventSourceTable.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
                jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.jlab.jaws.eventsource.EventSourceTable.access$202(org.jlab.jaws.eventsource.EventSourceTable, long):long
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
                	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
                Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.jlab.jaws.eventsource.EventSourceTable
                	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
                	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
                	... 1 more
                */
            public void onPartitionsAssigned(java.util.Collection<org.apache.kafka.common.TopicPartition> r6) {
                /*
                    r5 = this;
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    org.slf4j.Logger r0 = org.jlab.jaws.eventsource.EventSourceTable.access$000(r0)
                    java.lang.String r1 = "Seeking to beginning of topic"
                    r0.debug(r1)
                    r0 = r6
                    int r0 = r0.size()
                    r1 = 1
                    if (r0 == r1) goto L22
                    java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
                    r1 = r0
                    java.lang.String r2 = "We only support single partition Event Sourced topics at this time"
                    r1.<init>(r2)
                    throw r0
                L22:
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    org.apache.kafka.clients.consumer.KafkaConsumer r0 = org.jlab.jaws.eventsource.EventSourceTable.access$100(r0)
                    r1 = r6
                    r0.seekToBeginning(r1)
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    org.apache.kafka.clients.consumer.KafkaConsumer r0 = org.jlab.jaws.eventsource.EventSourceTable.access$100(r0)
                    r1 = r6
                    java.util.Map r0 = r0.endOffsets(r1)
                    r7 = r0
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    r1 = r7
                    r2 = r6
                    java.util.Iterator r2 = r2.iterator()
                    java.lang.Object r2 = r2.next()
                    java.lang.Object r1 = r1.get(r2)
                    java.lang.Long r1 = (java.lang.Long) r1
                    long r1 = r1.longValue()
                    long r0 = org.jlab.jaws.eventsource.EventSourceTable.access$202(r0, r1)
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    long r0 = org.jlab.jaws.eventsource.EventSourceTable.access$200(r0)
                    r1 = 0
                    int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                    if (r0 != 0) goto L7b
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    org.slf4j.Logger r0 = org.jlab.jaws.eventsource.EventSourceTable.access$000(r0)
                    java.lang.String r1 = "Empty topic to begin with"
                    r0.debug(r1)
                    r0 = r5
                    org.jlab.jaws.eventsource.EventSourceTable r0 = org.jlab.jaws.eventsource.EventSourceTable.this
                    r1 = 1
                    boolean r0 = org.jlab.jaws.eventsource.EventSourceTable.access$302(r0, r1)
                L7b:
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: org.jlab.jaws.eventsource.EventSourceTable.AnonymousClass1.onPartitionsAssigned(java.util.Collection):void");
            }
        });
        int i = 0;
        while (!this.endReached && this.consumerState.get() == CONSUMER_STATE.RUNNING) {
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.config.getLong(EventSourceConfig.EVENT_SOURCE_POLL_MILLIS).longValue()));
            this.log.debug("found " + poll.count() + " records");
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
                updateState(consumerRecord);
                this.log.debug("Looking for last index: {}, found: {}", Long.valueOf(this.endOffset), Long.valueOf(consumerRecord.offset() + 1));
                if (consumerRecord.offset() + 1 == this.endOffset) {
                    this.log.debug("end of partition {} reached", Integer.valueOf(consumerRecord.partition()));
                    this.endReached = true;
                }
            }
            if (poll.count() == 0) {
                i++;
            }
            if (i > 10) {
                throw new RuntimeException("Took too long to obtain initial list; verify topic compact policy!");
            }
        }
        notifyListenersInitial();
        this.log.debug("Done with EventSourceConsumer init");
    }

    private void monitorChanges() {
        int i = 0;
        boolean z = false;
        while (this.consumerState.get() == CONSUMER_STATE.RUNNING) {
            this.log.debug("polling for changes ({})", this.config.getString(EventSourceConfig.EVENT_SOURCE_TOPIC));
            ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(this.config.getLong(EventSourceConfig.EVENT_SOURCE_POLL_MILLIS).longValue()));
            if (poll.count() > 0) {
                Iterator it = poll.iterator();
                while (it.hasNext()) {
                    updateState((ConsumerRecord) it.next());
                }
                this.log.debug("Change in topic: request update once settled");
                z = true;
            } else if (z) {
                this.log.debug("Flushing changes since we've settled (we had a poll with no changes)");
                notifyListenersChanges();
                z = false;
                i = 0;
            }
            if (i >= this.config.getLong(EventSourceConfig.EVENT_SOURCE_MAX_POLL_BEFORE_FLUSH).longValue()) {
                this.log.debug("Flushing changes due to max poll with changes");
                notifyListenersChanges();
                z = false;
                i = 0;
            }
            if (z) {
                i++;
            }
        }
    }

    private void notifyListenersInitial() {
        Iterator<EventSourceListener<K, V>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().initialState(new HashSet(this.state.values()));
        }
        this.changes.clear();
    }

    private void notifyListenersChanges() {
        Iterator<EventSourceListener<K, V>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().changes(new ArrayList(this.changes));
        }
        this.changes.clear();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void updateState(ConsumerRecord<K, V> consumerRecord) {
        this.log.debug("State update: {}={}", consumerRecord.key(), consumerRecord.value());
        EventSourceRecord<K, V> eventSourceRecord = new EventSourceRecord<>(consumerRecord.key(), consumerRecord.value());
        this.changes.add(eventSourceRecord);
        if (consumerRecord.value() == null) {
            this.log.debug("Removing record: {}", consumerRecord.key());
            this.state.remove(consumerRecord.key());
        } else {
            this.log.debug("Adding record: {}", consumerRecord.key());
            this.state.put(consumerRecord.key(), eventSourceRecord);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.consumerState.getAndSet(CONSUMER_STATE.CLOSED) == CONSUMER_STATE.INITIALIZING) {
            this.consumer.close();
        } else {
            this.consumer.wakeup();
        }
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.jlab.jaws.eventsource.EventSourceTable.access$202(org.jlab.jaws.eventsource.EventSourceTable, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$202(org.jlab.jaws.eventsource.EventSourceTable r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.endOffset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.jlab.jaws.eventsource.EventSourceTable.access$202(org.jlab.jaws.eventsource.EventSourceTable, long):long");
    }

    static /* synthetic */ long access$200(EventSourceTable eventSourceTable) {
        return eventSourceTable.endOffset;
    }

    static /* synthetic */ boolean access$302(EventSourceTable eventSourceTable, boolean z) {
        eventSourceTable.endReached = z;
        return z;
    }
}
