package io.confluent.parallelconsumer;

import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.csid.utils.Range;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

/* loaded from: input_file:io/confluent/parallelconsumer/ParallelEoSStreamProcessorTestBase.class */
public class ParallelEoSStreamProcessorTestBase {
    public static final String INPUT_TOPIC = "input";
    public static final String OUTPUT_TOPIC = "output";
    public static final int DEFAULT_BROKER_POLL_FREQUENCY_MS = 100;
    public static final int DEFAULT_COMMIT_INTERVAL_MAX_MS = 100;
    protected LongPollingMockConsumer<String, String> consumerSpy;
    protected MockProducer<String, String> producerSpy;
    protected ParallelEoSStreamProcessor<String, String> parallelConsumer;
    ParallelEoSStreamProcessorTest.MyAction myRecordProcessingAction;
    ConsumerRecord<String, String> firstRecord;
    ConsumerRecord<String, String> secondRecord;
    KafkaTestUtils ktu;
    protected AtomicReference<Integer> loopCountRef;
    protected AtomicReference<Integer> loopCount;
    long verificationWaitDelay;
    private static final Logger log = LoggerFactory.getLogger(ParallelEoSStreamProcessorTestBase.class);
    public static final String CONSUMER_GROUP_ID = "my-group";
    public static final ConsumerGroupMetadata DEFAULT_GROUP_METADATA = new ConsumerGroupMetadata(CONSUMER_GROUP_ID);
    protected static int defaultTimeoutSeconds = 5;
    protected static Duration defaultTimeout = Duration.ofSeconds(defaultTimeoutSeconds);
    protected static long defaultTimeoutMs = defaultTimeout.toMillis();
    protected static Duration effectivelyInfiniteTimeout = Duration.ofMinutes(20);
    volatile CountDownLatch loopLatchV = new CountDownLatch(0);
    volatile CountDownLatch controlLoopPauseLatch = new CountDownLatch(0);
    protected List<WorkContainer<String, String>> successfulWork = Collections.synchronizedList(new ArrayList());

    @BeforeEach
    public void setupAsyncConsumerTestBase() {
        setupParallelConsumerInstance(ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.CONSUMER_SYNC).build());
    }

    private void setupWorkManager(WorkManager<String, String> workManager) {
        workManager.getSuccessfulWorkListeners().add(workContainer -> {
            log.debug("Test work listener heard some successful work: {}", workContainer);
            this.successfulWork.add(workContainer);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void primeFirstRecord() {
        this.firstRecord = this.ktu.makeRecord("key-0", "v0");
        this.consumerSpy.addRecord(this.firstRecord);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MockConsumer<String, String> setupClients() {
        instantiateConsumerProducer();
        this.ktu = new KafkaTestUtils(this.consumerSpy);
        return this.consumerSpy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void instantiateConsumerProducer() {
        LongPollingMockConsumer longPollingMockConsumer = new LongPollingMockConsumer(OffsetResetStrategy.EARLIEST);
        this.producerSpy = (MockProducer) Mockito.spy(new MockProducer(true, (Serializer) null, (Serializer) null));
        this.consumerSpy = (LongPollingMockConsumer) Mockito.spy(longPollingMockConsumer);
        this.myRecordProcessingAction = (ParallelEoSStreamProcessorTest.MyAction) Mockito.mock(ParallelEoSStreamProcessorTest.MyAction.class);
        Mockito.when(this.consumerSpy.groupMetadata()).thenReturn(DEFAULT_GROUP_METADATA);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribeParallelConsumerAndMockConsumerTo(String str) {
        List<String> of = UniLists.of(str);
        this.parallelConsumer.subscribe(of);
        this.consumerSpy.subscribeWithRebalanceAndAssignment(of, 2);
    }

    protected void setupParallelConsumerInstance(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        setupParallelConsumerInstance(ParallelConsumerOptions.builder().ordering(processingOrder).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setupParallelConsumerInstance(ParallelConsumerOptions parallelConsumerOptions) {
        setupClients();
        this.parallelConsumer = initAsyncConsumer(parallelConsumerOptions.toBuilder().consumer(this.consumerSpy).producer(this.producerSpy).build());
        subscribeParallelConsumerAndMockConsumerTo("input");
        this.parallelConsumer.setLongPollTimeout(Duration.ofMillis(100L));
        this.parallelConsumer.setTimeBetweenCommits(Duration.ofMillis(100L));
        this.verificationWaitDelay = this.parallelConsumer.getTimeBetweenCommits().multipliedBy(2L).toMillis();
        this.loopCountRef = attachLoopCounter(this.parallelConsumer);
        setupWorkManager(this.parallelConsumer.getWm());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
        this.parallelConsumer = new ParallelEoSStreamProcessor<>(parallelConsumerOptions);
        return this.parallelConsumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendSecondRecord(MockConsumer<String, String> mockConsumer) {
        this.secondRecord = this.ktu.makeRecord("key-0", "v1");
        mockConsumer.addRecord(this.secondRecord);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AtomicReference<Integer> attachLoopCounter(ParallelEoSStreamProcessor parallelEoSStreamProcessor) {
        AtomicReference<Integer> atomicReference = new AtomicReference<>(0);
        parallelEoSStreamProcessor.addLoopEndCallBack(() -> {
            Integer num = (Integer) atomicReference.get();
            atomicReference.compareAndSet(num, Integer.valueOf(num.intValue() + 1));
            log.trace("Counting down latch from {}", Long.valueOf(this.loopLatchV.getCount()));
            this.loopLatchV.countDown();
            log.trace("Loop latch remaining: {}", Long.valueOf(this.loopLatchV.getCount()));
            if (this.controlLoopPauseLatch.getCount() > 0) {
                log.debug("Waiting on pause latch ({})...", Long.valueOf(this.controlLoopPauseLatch.getCount()));
                try {
                    this.controlLoopPauseLatch.await();
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                }
                log.trace("Completed waiting on pause latch");
            }
            log.trace("Loop count {}", atomicReference.get());
        });
        return atomicReference;
    }

    protected void pauseControlLoop() {
        log.trace("Pause loop");
        this.controlLoopPauseLatch = new CountDownLatch(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resumeControlLoop() {
        log.trace("Resume loop");
        this.controlLoopPauseLatch.countDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForOneLoopCycle() {
        waitForSomeLoopCycles(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForSomeLoopCycles(int i) {
        log.debug("Waiting for {} more iterations of the control loop.", Integer.valueOf(i));
        blockingLoopLatchTrigger(i);
        log.debug("Completed waiting on {} loop(s)", Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitUntilTrue(Callable<Boolean> callable) {
        Awaitility.waitAtMost(defaultTimeout).until(callable);
    }

    private void blockingLoopLatchTrigger(int i) {
        log.debug("Waiting on {} cycles on loop latch...", Integer.valueOf(i));
        this.loopLatchV = new CountDownLatch(i);
        if (!this.loopLatchV.await((long) defaultTimeoutSeconds, TimeUnit.SECONDS)) {
            throw new TimeoutException(StringUtils.msg("Timeout {} waiting for latch", new Object[]{Integer.valueOf(defaultTimeoutSeconds)}));
        }
    }

    private void waitForLoopCount(int i) {
        log.debug("Waiting on {} cycles on loop latch...", Integer.valueOf(i));
        Awaitility.waitAtMost(defaultTimeout.multipliedBy(100L)).until(() -> {
            return Boolean.valueOf(this.loopCount.get().intValue() > i);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForCommitExact(int i, int i2) {
        log.debug("Waiting for commit offset {} on partition {}", Integer.valueOf(i2), Integer.valueOf(i));
        Map of = UniMaps.of(new TopicPartition("input", i), new OffsetAndMetadata(i2, ""));
        ((MockProducer) Mockito.verify(this.producerSpy, Mockito.timeout(defaultTimeoutMs).times(1))).sendOffsetsToTransaction((Map) Mockito.argThat(map -> {
            return map.equals(of);
        }), (ConsumerGroupMetadata) Mockito.any(ConsumerGroupMetadata.class));
    }

    public void assertCommits(List<Integer> list, String str) {
        assertCommits(list, Optional.of(str));
    }

    public void assertCommits(List<Integer> list, Optional<String> optional) {
        if (isUsingTransactionalProducer()) {
            KafkaTestUtils.assertCommits(this.producerSpy, list, optional);
            Assertions.assertThat(extractAllPartitionsOffsetsSequentially()).isEmpty();
            return;
        }
        List<Integer> trimAllGeneisOffset = KafkaTestUtils.trimAllGeneisOffset(extractAllPartitionsOffsetsSequentially());
        if (optional.isPresent()) {
            Assertions.assertThat(trimAllGeneisOffset).as(optional.get(), new Object[0]).hasSameElementsAs(list);
        } else {
            try {
                Assertions.assertThat(trimAllGeneisOffset).hasSameElementsAs(list);
            } catch (AssertionError e) {
                log.error("", e);
                throw e;
            }
        }
        KafkaTestUtils.assertCommits(this.producerSpy, UniLists.of(), Optional.of("Empty"));
    }

    private List<Integer> extractAllPartitionsOffsetsSequentially() {
        new ArrayList();
        return (List) new ArrayList(this.consumerSpy.getCommitHistoryInt()).stream().flatMap(map -> {
            return new ArrayList(map.values()).stream().map(offsetAndMetadata -> {
                return Integer.valueOf((int) offsetAndMetadata.offset());
            });
        }).collect(Collectors.toList());
    }

    public void assertCommits(List<Integer> list) {
        assertCommits(list, Optional.empty());
    }

    public void assertCommitLists(List<List<Integer>> list) {
        if (isUsingTransactionalProducer()) {
            KafkaTestUtils.assertCommitLists(this.producerSpy, list, (Optional<String>) Optional.empty());
        } else {
            KafkaTestUtils.assertCommitLists(this.consumerSpy.getCommitHistoryWithGropuId(), list, (Optional<String>) Optional.empty());
        }
    }

    protected List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> getCommitHistory() {
        return isUsingTransactionalProducer() ? this.producerSpy.consumerGroupOffsetsHistory() : this.consumerSpy.getCommitHistoryWithGropuId();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isUsingTransactionalProducer() {
        return this.parallelConsumer.getWm().getOptions().getCommitMode().equals(ParallelConsumerOptions.CommitMode.TRANSACTIONAL_PRODUCER);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isUsingAsyncCommits() {
        return this.parallelConsumer.getWm().getOptions().getCommitMode().equals(ParallelConsumerOptions.CommitMode.CONSUMER_ASYNCHRONOUS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitLatch(List<CountDownLatch> list, int i) {
        log.trace("Waiting on latch {}", Integer.valueOf(i));
        awaitLatch(list.get(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void awaitLatch(CountDownLatch countDownLatch) {
        log.trace("Waiting on latch with timeout {}", defaultTimeout);
        if (!countDownLatch.await(defaultTimeoutSeconds, TimeUnit.SECONDS)) {
            throw new AssertionError("Latch await timeout - " + countDownLatch.getCount() + " remaining");
        }
        log.trace("Latch released");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseAndWait(List<CountDownLatch> list, List<Integer> list2) {
        for (Integer num : list2) {
            log.debug("Releasing {}...", num);
            list.get(num.intValue()).countDown();
        }
        waitForSomeLoopCycles(1);
    }

    protected void release(List<CountDownLatch> list, int i) {
        log.debug("Releasing {}...", Integer.valueOf(i));
        list.get(i).countDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseAndWait(List<CountDownLatch> list, int i) {
        log.debug("Releasing {}...", Integer.valueOf(i));
        list.get(i).countDown();
        waitForSomeLoopCycles(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<CountDownLatch> constructLatches(int i) {
        ArrayList arrayList = new ArrayList(i);
        Iterator it = Range.range(i).iterator();
        while (it.hasNext()) {
            arrayList.add(new CountDownLatch(1));
        }
        return arrayList;
    }

    protected void pauseControlToAwaitForLatch(CountDownLatch countDownLatch) {
        pauseControlLoop();
        awaitLatch(countDownLatch);
        resumeControlLoop();
        waitForOneLoopCycle();
    }
}
