package io.confluent.csid.utils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/csid/utils/LongPollingMockConsumer.class */
public class LongPollingMockConsumer<K, V> extends MockConsumer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(LongPollingMockConsumer.class);
    private final CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> commitHistoryInt;
    private final AtomicBoolean statePretendingToLongPoll;

    public LongPollingMockConsumer(OffsetResetStrategy offsetResetStrategy) {
        super(offsetResetStrategy);
        this.commitHistoryInt = new CopyOnWriteArrayList<>();
        this.statePretendingToLongPoll = new AtomicBoolean(false);
    }

    public synchronized ConsumerRecords<K, V> poll(Duration duration) {
        ConsumerRecords<K, V> poll = super.poll(duration);
        if (poll.isEmpty()) {
            log.debug("No records returned, simulating long poll with sleep for requested long poll timeout of {}...", duration);
            synchronized (this) {
                Instant plus = Instant.now().plus((TemporalAmount) duration);
                this.statePretendingToLongPoll.set(true);
                while (this.statePretendingToLongPoll.get() && !timeoutReached(plus)) {
                    Duration between = Duration.between(Instant.now(), plus);
                    log.debug("Time remaining: {}", between);
                    try {
                        long millis = between.toMillis();
                        if (millis > 0) {
                            wait(millis);
                        }
                    } catch (InterruptedException e) {
                        log.warn("Interrupted, ending this long poll early", e);
                        this.statePretendingToLongPoll.set(false);
                    }
                }
                if (this.statePretendingToLongPoll.get() && !timeoutReached(plus)) {
                    log.debug("Don't know why I was notified to wake up");
                } else if (this.statePretendingToLongPoll.get() && timeoutReached(plus)) {
                    log.debug("Simulated long poll of ({}) finished. Now: {} vs sleep until: {}", new Object[]{duration, Instant.now(), plus});
                } else if (!this.statePretendingToLongPoll.get()) {
                    log.debug("Simulated long poll was interrupted by by WAKEUP command...");
                }
                this.statePretendingToLongPoll.set(false);
            }
        } else {
            log.debug("Polled and found {} records...", Integer.valueOf(poll.count()));
        }
        return poll;
    }

    private boolean timeoutReached(Instant instant) {
        return Instant.now().toEpochMilli() + 1 >= instant.toEpochMilli();
    }

    public synchronized void addRecord(ConsumerRecord<K, V> consumerRecord) {
        super.addRecord(consumerRecord);
        wakeup();
    }

    public synchronized void wakeup() {
        if (this.statePretendingToLongPoll.get()) {
            this.statePretendingToLongPoll.set(false);
            log.debug("Interrupting mock long poll...");
            synchronized (this) {
                notifyAll();
            }
        }
    }

    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        this.commitHistoryInt.add(map);
        super.commitAsync(map, offsetCommitCallback);
    }

    private List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> injectConsumerGroupId(List<Map<TopicPartition, OffsetAndMetadata>> list) {
        String groupId = groupMetadata().groupId();
        return (List) list.stream().map(map -> {
            return UniMaps.of(groupId, map);
        }).collect(Collectors.toList());
    }

    public List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> getCommitHistoryWithGroupId() {
        return injectConsumerGroupId(getCommitHistoryInt());
    }

    public synchronized void close(Duration duration) {
        revokeAssignment();
        super.close(duration);
    }

    private void revokeAssignment() throws NoSuchFieldException, IllegalAccessException {
        ConsumerRebalanceListener rebalanceListener = getRebalanceListener();
        if (rebalanceListener == null) {
            log.warn("No rebalance listener assigned - on revoke can't fire");
        } else {
            rebalanceListener.onPartitionsRevoked(super.assignment());
        }
    }

    private ConsumerRebalanceListener getRebalanceListener() throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = MockConsumer.class.getDeclaredField("subscriptions");
        declaredField.setAccessible(true);
        return ((SubscriptionState) declaredField.get(this)).rebalanceListener();
    }

    public void subscribeWithRebalanceAndAssignment(List<String> list, int i) {
        List list2 = (List) list.stream().flatMap(str -> {
            return IntStream.range(0, i).boxed().map(num -> {
                return new TopicPartition(str, num.intValue());
            });
        }).collect(Collectors.toList());
        rebalance(list2);
        HashMap hashMap = new HashMap();
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            hashMap.put((TopicPartition) it.next(), 0L);
        }
        super.updateBeginningOffsets(hashMap);
    }

    public synchronized void rebalance(Collection<TopicPartition> collection) {
        super.rebalance(collection);
        ConsumerRebalanceListener rebalanceListener = getRebalanceListener();
        if (rebalanceListener != null) {
            rebalanceListener.onPartitionsAssigned(collection);
        }
    }

    public void revoke(Collection<TopicPartition> collection) {
        ConsumerRebalanceListener rebalanceListener = getRebalanceListener();
        if (rebalanceListener != null) {
            rebalanceListener.onPartitionsRevoked(collection);
        }
    }

    public synchronized void assign(Collection<TopicPartition> collection) {
        ConsumerRebalanceListener rebalanceListener = getRebalanceListener();
        if (rebalanceListener != null) {
            rebalanceListener.onPartitionsAssigned(collection);
        }
    }

    public synchronized void rebalanceWithoutAssignment(Collection<TopicPartition> collection) {
        super.rebalance(collection);
    }

    public String toString() {
        return "LongPollingMockConsumer(commitHistoryInt=" + getCommitHistoryInt() + ", statePretendingToLongPoll=" + this.statePretendingToLongPoll + ")";
    }

    public CopyOnWriteArrayList<Map<TopicPartition, OffsetAndMetadata>> getCommitHistoryInt() {
        return this.commitHistoryInt;
    }
}
