package io.confluent.csid.utils;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/csid/utils/LongPollingMockConsumer.class */
public class LongPollingMockConsumer<K, V> extends MockConsumer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(LongPollingMockConsumer.class);
    private final List<Map<TopicPartition, OffsetAndMetadata>> commitHistoryInt;

    public LongPollingMockConsumer(OffsetResetStrategy offsetResetStrategy) {
        super(offsetResetStrategy);
        this.commitHistoryInt = new ArrayList();
    }

    public synchronized ConsumerRecords<K, V> poll(Duration duration) {
        ConsumerRecords<K, V> poll = super.poll(duration);
        if (poll.isEmpty()) {
            log.debug("No records returned, simulating long poll with sleep for requested long poll timeout of {}...", duration);
            try {
                synchronized (this) {
                    wait(duration.toMillis());
                }
            } catch (InterruptedException e) {
                log.warn("Interrupted", e);
            }
            log.debug("Simulated long poll of ({}) finished.", duration);
        } else {
            log.debug("Polled and found {} records...", Integer.valueOf(poll.count()));
        }
        return poll;
    }

    public synchronized void wakeup() {
        log.debug("Interrupting mock long poll...");
        synchronized (this) {
            notifyAll();
        }
    }

    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        this.commitHistoryInt.add(map);
        super.commitAsync(map, offsetCommitCallback);
    }

    private List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> injectConsumerGroupId(List<Map<TopicPartition, OffsetAndMetadata>> list) {
        String groupId = groupMetadata().groupId();
        return (List) list.stream().map(map -> {
            HashMap hashMap = new HashMap();
            hashMap.put(groupId, map);
            return hashMap;
        }).collect(Collectors.toList());
    }

    public List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> getCommitHistoryWithGropuId() {
        return injectConsumerGroupId(getCommitHistoryInt());
    }

    public synchronized void close(long j, TimeUnit timeUnit) {
        revokeAssignment();
        super.close(j, timeUnit);
    }

    private void revokeAssignment() throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = MockConsumer.class.getDeclaredField("subscriptions");
        declaredField.setAccessible(true);
        ConsumerRebalanceListener rebalanceListener = ((SubscriptionState) declaredField.get(this)).rebalanceListener();
        if (rebalanceListener == null) {
            log.warn("No rebalance listener assigned - on revoke can't fire");
        } else {
            rebalanceListener.onPartitionsRevoked(super.assignment());
        }
    }

    public void subscribeWithRebalanceAndAssignment(List<String> list, int i) {
        List list2 = (List) list.stream().flatMap(str -> {
            return Range.rangeStream(i).boxed().map(num -> {
                return new TopicPartition(str, num.intValue());
            });
        }).collect(Collectors.toList());
        super.rebalance(list2);
        HashMap hashMap = new HashMap();
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            hashMap.put((TopicPartition) it.next(), 0L);
        }
        super.updateBeginningOffsets(hashMap);
    }

    public List<Map<TopicPartition, OffsetAndMetadata>> getCommitHistoryInt() {
        return this.commitHistoryInt;
    }
}
