/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.csid.utils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

public class LongPollingMockConsumer<K, V>
extends MockConsumer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(LongPollingMockConsumer.class);
    private final CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> commitHistoryInt = new CopyOnWriteArrayList();
    private final AtomicBoolean statePretendingToLongPoll = new AtomicBoolean(false);

    public LongPollingMockConsumer(OffsetResetStrategy offsetResetStrategy) {
        super(offsetResetStrategy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
        ConsumerRecords records = super.poll(timeout);
        if (records.isEmpty()) {
            log.debug("No records returned, simulating long poll with sleep for requested long poll timeout of {}...", (Object)timeout);
            LongPollingMockConsumer longPollingMockConsumer = this;
            synchronized (longPollingMockConsumer) {
                Instant sleepUntil = Instant.now().plus(timeout);
                this.statePretendingToLongPoll.set(true);
                while (this.statePretendingToLongPoll.get() && !this.timeoutReached(sleepUntil)) {
                    Duration left = Duration.between(Instant.now(), sleepUntil);
                    log.debug("Time remaining: {}", (Object)left);
                    try {
                        long msLeft = left.toMillis();
                        if (msLeft <= 0L) continue;
                        ((Object)((Object)this)).wait(msLeft);
                    }
                    catch (InterruptedException e) {
                        log.warn("Interrupted, ending this long poll early", (Throwable)e);
                        this.statePretendingToLongPoll.set(false);
                    }
                }
                if (this.statePretendingToLongPoll.get() && !this.timeoutReached(sleepUntil)) {
                    log.debug("Don't know why I was notified to wake up");
                } else if (this.statePretendingToLongPoll.get() && this.timeoutReached(sleepUntil)) {
                    log.debug("Simulated long poll of ({}) finished. Now: {} vs sleep until: {}", new Object[]{timeout, Instant.now(), sleepUntil});
                } else if (!this.statePretendingToLongPoll.get()) {
                    log.debug("Simulated long poll was interrupted by by WAKEUP command...");
                }
                this.statePretendingToLongPoll.set(false);
            }
        } else {
            log.debug("Polled and found {} records...", (Object)records.count());
        }
        return records;
    }

    private boolean timeoutReached(Instant sleepUntil) {
        long until;
        long now = Instant.now().toEpochMilli();
        return now + 1L >= (until = sleepUntil.toEpochMilli());
    }

    public synchronized void addRecord(ConsumerRecord<K, V> record) {
        super.addRecord(record);
        this.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void wakeup() {
        if (this.statePretendingToLongPoll.get()) {
            this.statePretendingToLongPoll.set(false);
            log.debug("Interrupting mock long poll...");
            LongPollingMockConsumer longPollingMockConsumer = this;
            synchronized (longPollingMockConsumer) {
                ((Object)((Object)this)).notifyAll();
            }
        }
    }

    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.commitHistoryInt.add(offsets);
        super.commitAsync(offsets, callback);
    }

    private List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> injectConsumerGroupId(List<Map<TopicPartition, OffsetAndMetadata>> commitHistory) {
        String groupId = this.groupMetadata().groupId();
        return commitHistory.stream().map(x -> UniMaps.of((Object)groupId, (Object)x)).collect(Collectors.toList());
    }

    public List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> getCommitHistoryWithGroupId() {
        CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> commitHistoryInt = this.getCommitHistoryInt();
        return this.injectConsumerGroupId(commitHistoryInt);
    }

    public synchronized void close(Duration timeout) {
        this.revokeAssignment();
        super.close(timeout);
    }

    private void revokeAssignment() throws NoSuchFieldException, IllegalAccessException {
        ConsumerRebalanceListener consumerRebalanceListener = this.getRebalanceListener();
        if (consumerRebalanceListener == null) {
            log.warn("No rebalance listener assigned - on revoke can't fire");
        } else {
            Set assignment = super.assignment();
            consumerRebalanceListener.onPartitionsRevoked((Collection)assignment);
        }
    }

    private ConsumerRebalanceListener getRebalanceListener() throws NoSuchFieldException, IllegalAccessException {
        Field subscriptionsField = MockConsumer.class.getDeclaredField("subscriptions");
        subscriptionsField.setAccessible(true);
        SubscriptionState subscriptionState = (SubscriptionState)subscriptionsField.get((Object)this);
        ConsumerRebalanceListener consumerRebalanceListener = subscriptionState.rebalanceListener();
        return consumerRebalanceListener;
    }

    public void subscribeWithRebalanceAndAssignment(List<String> topics, int partitions) {
        List<TopicPartition> topicPartitions = topics.stream().flatMap(y -> IntStream.range(0, partitions).boxed().map(x -> new TopicPartition(y, x.intValue()))).collect(Collectors.toList());
        this.rebalance(topicPartitions);
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : topicPartitions) {
            beginningOffsets.put(tp, 0L);
        }
        super.updateBeginningOffsets(beginningOffsets);
    }

    public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
        super.rebalance(newAssignment);
        ConsumerRebalanceListener rebalanceListeners = this.getRebalanceListener();
        if (rebalanceListeners != null) {
            rebalanceListeners.onPartitionsAssigned(newAssignment);
        }
    }

    public void revoke(Collection<TopicPartition> newAssignment) {
        ConsumerRebalanceListener rebalanceListeners = this.getRebalanceListener();
        if (rebalanceListeners != null) {
            rebalanceListeners.onPartitionsRevoked(newAssignment);
        }
    }

    public synchronized void assign(Collection<TopicPartition> newAssignment) {
        ConsumerRebalanceListener rebalanceListeners = this.getRebalanceListener();
        if (rebalanceListeners != null) {
            rebalanceListeners.onPartitionsAssigned(newAssignment);
        }
    }

    public synchronized void rebalanceWithoutAssignment(Collection<TopicPartition> newAssignment) {
        super.rebalance(newAssignment);
    }

    public String toString() {
        return "LongPollingMockConsumer(commitHistoryInt=" + this.getCommitHistoryInt() + ", statePretendingToLongPoll=" + this.statePretendingToLongPoll + ")";
    }

    public CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> getCommitHistoryInt() {
        return this.commitHistoryInt;
    }
}

