/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.model.CommitHistory;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.confluent.parallelconsumer.truth.CommitHistorySubject;
import io.confluent.parallelconsumer.truth.LongPollingMockConsumerSubject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

public abstract class AbstractParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(AbstractParallelEoSStreamProcessorTestBase.class);
    public String INPUT_TOPIC;
    public String OUTPUT_TOPIC;
    public String CONSUMER_GROUP_ID;
    public ConsumerGroupMetadata DEFAULT_GROUP_METADATA;
    public static final int DEFAULT_BROKER_POLL_FREQUENCY_MS = 500;
    public static final int DEFAULT_COMMIT_INTERVAL_MAX_MS = 100;
    protected LongPollingMockConsumer<String, String> consumerSpy;
    protected MockProducer<String, String> producerSpy;
    protected AbstractParallelEoSStreamProcessor<String, String> parentParallelConsumer;
    public static int defaultTimeoutSeconds = 30;
    public static Duration defaultTimeout = Duration.ofSeconds(defaultTimeoutSeconds);
    protected static long defaultTimeoutMs = defaultTimeout.toMillis();
    protected static Duration effectivelyInfiniteTimeout = Duration.ofMinutes(20L);
    ParallelEoSStreamProcessorTest.MyAction myRecordProcessingAction;
    ConsumerRecord<String, String> firstRecord;
    ConsumerRecord<String, String> secondRecord;
    protected KafkaTestUtils ktu;
    protected AtomicReference<Integer> loopCountRef;
    volatile CountDownLatch loopLatchV = new CountDownLatch(0);
    volatile CountDownLatch controlLoopPauseLatch = new CountDownLatch(0);
    protected AtomicReference<Integer> loopCount;
    long verificationWaitDelay;
    protected TopicPartition topicPartition;

    public void setupTopicNames() {
        this.INPUT_TOPIC = "input-" + Math.random();
        this.OUTPUT_TOPIC = "output-" + Math.random();
        this.CONSUMER_GROUP_ID = "my-group" + Math.random();
        this.topicPartition = new TopicPartition(this.INPUT_TOPIC, 0);
        this.DEFAULT_GROUP_METADATA = new ConsumerGroupMetadata(this.CONSUMER_GROUP_ID);
    }

    @BeforeEach
    public void setupAsyncConsumerTestBase() {
        this.setupTopicNames();
        ParallelConsumerOptions<Object, Object> options = this.getOptions();
        this.setupParallelConsumerInstance(options);
    }

    protected ParallelConsumerOptions<Object, Object> getOptions() {
        ParallelConsumerOptions options = this.getDefaultOptions().build();
        return options;
    }

    protected ParallelConsumerOptions.ParallelConsumerOptionsBuilder<Object, Object> getDefaultOptions() {
        return ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC).ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED);
    }

    @AfterEach
    public void close() {
        if (!this.parentParallelConsumer.isClosedOrFailed()) {
            if (this.parentParallelConsumer.getFailureCause() != null) {
                log.error("PC has error - test failed");
            }
            log.debug("Test ended (maybe a failure), closing pc...");
            this.parentParallelConsumer.close();
        } else {
            log.debug("Test finished, pc already closed.");
        }
    }

    protected void injectWorkSuccessListener(WorkManager<String, String> wm, List<WorkContainer<String, String>> customSuccessfulWork) {
        wm.getSuccessfulWorkListeners().add(work -> {
            log.debug("Test work listener heard some successful work: {}", work);
            List list = customSuccessfulWork;
            synchronized (list) {
                customSuccessfulWork.add((WorkContainer<String, String>)work);
            }
        });
    }

    protected void primeFirstRecord() {
        this.firstRecord = this.ktu.makeRecord("key-0", "v0-first-primed-record");
        this.consumerSpy.addRecord(this.firstRecord);
    }

    protected MockConsumer<String, String> setupClients() {
        this.instantiateConsumerProducer();
        this.ktu = new KafkaTestUtils(this.INPUT_TOPIC, this.CONSUMER_GROUP_ID, this.consumerSpy);
        return this.consumerSpy;
    }

    protected void instantiateConsumerProducer() {
        LongPollingMockConsumer consumer = new LongPollingMockConsumer(OffsetResetStrategy.EARLIEST);
        MockProducer producer = new MockProducer(true, Serdes.String().serializer(), Serdes.String().serializer());
        this.producerSpy = (MockProducer)Mockito.spy((Object)producer);
        this.consumerSpy = (LongPollingMockConsumer)((Object)Mockito.spy(consumer));
        this.myRecordProcessingAction = (ParallelEoSStreamProcessorTest.MyAction)Mockito.spy(ParallelEoSStreamProcessorTest.MyAction.class);
        Mockito.when((Object)this.consumerSpy.groupMetadata()).thenReturn((Object)this.DEFAULT_GROUP_METADATA);
    }

    protected void subscribeParallelConsumerAndMockConsumerTo(String topic) {
        List of = UniLists.of((Object)topic);
        this.parentParallelConsumer.subscribe((Collection)of);
        this.consumerSpy.subscribeWithRebalanceAndAssignment(of, 2);
    }

    protected void setupParallelConsumerInstance(ParallelConsumerOptions.ProcessingOrder order) {
        this.setupParallelConsumerInstance(ParallelConsumerOptions.builder().ordering(order).build());
    }

    protected void setupParallelConsumerInstance(ParallelConsumerOptions parallelConsumerOptions) {
        this.setupClients();
        ParallelConsumerOptions optionsWithClients = parallelConsumerOptions.toBuilder().consumer(this.consumerSpy).producer(this.producerSpy).build();
        this.parentParallelConsumer = this.initAsyncConsumer((ParallelConsumerOptions<String, String>)optionsWithClients);
        this.subscribeParallelConsumerAndMockConsumerTo(this.INPUT_TOPIC);
        this.parentParallelConsumer.setLongPollTimeout(Duration.ofMillis(500L));
        this.parentParallelConsumer.setTimeBetweenCommits(Duration.ofMillis(100L));
        this.verificationWaitDelay = this.parentParallelConsumer.getTimeBetweenCommits().multipliedBy(2L).toMillis();
        this.loopCountRef = this.attachLoopCounter(this.parentParallelConsumer);
    }

    protected abstract AbstractParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions<String, String> var1);

    protected void sendSecondRecord(MockConsumer<String, String> consumer) {
        this.secondRecord = this.ktu.makeRecord("key-0", "v1");
        consumer.addRecord(this.secondRecord);
    }

    protected AtomicReference<Integer> attachLoopCounter(AbstractParallelEoSStreamProcessor parallelConsumer) {
        AtomicReference<Integer> currentLoop = new AtomicReference<Integer>(0);
        this.parentParallelConsumer.addLoopEndCallBack(() -> {
            Integer currentNumber = (Integer)currentLoop.get();
            int newLoopNumber = currentNumber + 1;
            currentLoop.compareAndSet(currentNumber, newLoopNumber);
            log.trace("Counting down latch from {}", (Object)this.loopLatchV.getCount());
            this.loopLatchV.countDown();
            log.trace("Loop latch remaining: {}", (Object)this.loopLatchV.getCount());
            if (this.controlLoopPauseLatch.getCount() > 0L) {
                log.debug("Waiting on pause latch ({})...", (Object)this.controlLoopPauseLatch.getCount());
                try {
                    this.controlLoopPauseLatch.await();
                }
                catch (InterruptedException e) {
                    log.error(e.getMessage(), (Throwable)e);
                }
                log.trace("Completed waiting on pause latch");
            }
            log.trace("Loop count {}", currentLoop.get());
        });
        return currentLoop;
    }

    protected void pauseControlLoop() {
        log.trace("Pause loop");
        this.controlLoopPauseLatch = new CountDownLatch(1);
    }

    protected void resumeControlLoop() {
        log.trace("Resume loop");
        this.controlLoopPauseLatch.countDown();
    }

    protected void awaitForOneLoopCycle() {
        this.awaitForSomeLoopCycles(1);
    }

    protected void awaitForSomeLoopCycles(int thisManyMore) {
        log.debug("Waiting for {} more iterations of the control loop.", (Object)thisManyMore);
        this.blockingLoopLatchTrigger(thisManyMore);
        log.debug("Completed waiting on {} loop(s)", (Object)thisManyMore);
    }

    protected void awaitUntilTrue(Callable<Boolean> booleanCallable) {
        Awaitility.waitAtMost((Duration)defaultTimeout).until(booleanCallable);
    }

    private void blockingLoopLatchTrigger(int waitForCount) {
        log.debug("Waiting on {} cycles on loop latch for {}...", (Object)waitForCount, (Object)defaultTimeout);
        this.loopLatchV = new CountDownLatch(waitForCount);
        try {
            boolean timeout;
            boolean bl = timeout = !this.loopLatchV.await(defaultTimeoutSeconds, TimeUnit.SECONDS);
            if (timeout || this.parentParallelConsumer.isClosedOrFailed()) {
                throw new TimeoutException(StringUtils.msg((String)"Timeout of {}, waiting for {} counts, on latch with {} left", (Object[])new Object[]{defaultTimeout, waitForCount, this.loopLatchV.getCount()}));
            }
        }
        catch (InterruptedException e) {
            log.error("Interrupted while waiting for loop latch - timeout was {}", (Object)defaultTimeout);
            throw e;
        }
    }

    private void awaitForLoopCount(int waitForCount) {
        log.debug("Waiting on {} cycles on loop latch...", (Object)waitForCount);
        Awaitility.waitAtMost((Duration)defaultTimeout.multipliedBy(100L)).until(() -> this.loopCount.get() > waitForCount);
    }

    protected void awaitForCommit(int offset) {
        log.debug("Waiting for commit offset {}", (Object)offset);
        Awaitility.await().timeout(defaultTimeout).untilAsserted(() -> this.assertCommitsContains(UniLists.of((Object)offset)));
    }

    protected void awaitForCommitExact(int offset) {
        log.debug("Waiting for EXACTLY commit offset {}", (Object)offset);
        Awaitility.await().timeout(defaultTimeout).failFast(StringUtils.msg((String)"Commit was not exact - contained offsets that weren't '{}'", (Object[])new Object[]{offset}), () -> {
            List<Integer> offsets = this.extractAllPartitionsOffsetsSequentially(false);
            return offsets.size() > 1 && !offsets.contains(offset);
        }).untilAsserted(() -> this.assertCommits(UniLists.of((Object)offset)));
    }

    protected void awaitForCommitExact(int partition, int offset) {
        log.debug("Waiting for EXACTLY commit offset {} on partition {}", (Object)offset, (Object)partition);
        OffsetAndMetadata expectedOffset = new OffsetAndMetadata((long)offset, "");
        TopicPartition partitionNumber = new TopicPartition(this.INPUT_TOPIC, partition);
        Map expectedOffsetMap = UniMaps.of((Object)partitionNumber, (Object)expectedOffset);
        ((MockProducer)Mockito.verify(this.producerSpy, (VerificationMode)Mockito.timeout((long)defaultTimeoutMs).times(1))).sendOffsetsToTransaction((Map)Mockito.argThat(offsetMap -> offsetMap.equals(expectedOffsetMap)), (ConsumerGroupMetadata)Mockito.any(ConsumerGroupMetadata.class));
    }

    public void assertCommitsContains(List<Integer> offsets) {
        List<Integer> commits = this.getCommitHistoryFlattened();
        Assertions.assertThat(commits).containsAll(offsets);
    }

    protected List<Integer> getCommitHistoryFlattened() {
        return this.isUsingTransactionalProducer() ? this.ktu.getProducerCommitsFlattened(this.producerSpy) : this.extractAllPartitionsOffsetsSequentially(false);
    }

    private List<OffsetAndMetadata> getCommitHistoryFlattenedMeta() {
        return this.isUsingTransactionalProducer() ? this.ktu.getProducerCommitsMeta(this.producerSpy) : this.extractAllPartitionsOffsetsSequentiallyMeta(true);
    }

    public void assertCommits(List<Integer> offsets, String description) {
        this.assertCommits(offsets, Optional.of(description));
    }

    public void assertCommits(List<Integer> offsets, Optional<String> description) {
        boolean trimGenesis;
        boolean bl = trimGenesis = !offsets.contains(0);
        if (this.isUsingTransactionalProducer()) {
            this.ktu.assertCommits(this.producerSpy, offsets, description);
            Assertions.assertThat(this.extractAllPartitionsOffsetsSequentially(trimGenesis)).isEmpty();
        } else {
            List<Integer> collect = this.extractAllPartitionsOffsetsSequentially(trimGenesis);
            if (description.isPresent()) {
                ((ListAssert)Assertions.assertThat(collect).as(description.get(), new Object[0])).hasSameElementsAs(offsets);
            } else {
                Assertions.assertThat(collect).hasSameElementsAs(offsets);
            }
            this.ktu.assertCommits(this.producerSpy, UniLists.of(), Optional.of("Empty"));
        }
    }

    protected List<Integer> extractAllPartitionsOffsetsSequentially(boolean trimGenesis) {
        return this.extractAllPartitionsOffsetsSequentiallyMeta(trimGenesis).stream().map(x -> (int)x.offset()).collect(Collectors.toList());
    }

    protected List<OffsetAndMetadata> extractAllPartitionsOffsetsSequentiallyMeta(boolean trimGenesis) {
        ArrayList<Map<TopicPartition, OffsetAndMetadata>> history = new ArrayList<Map<TopicPartition, OffsetAndMetadata>>(this.consumerSpy.getCommitHistoryInt());
        return history.stream().flatMap(commits -> {
            Stream rawValues = new ArrayList(commits.values()).stream();
            if (trimGenesis) {
                return rawValues.filter(x -> x.offset() != 0L);
            }
            return rawValues;
        }).collect(Collectors.toList());
    }

    protected List<OffsetAndMetadata> extractAllPartitionsOffsetsAndMetadataSequentially() {
        ArrayList<Map<TopicPartition, OffsetAndMetadata>> history = new ArrayList<Map<TopicPartition, OffsetAndMetadata>>(this.consumerSpy.getCommitHistoryInt());
        return history.stream().flatMap(commits -> {
            ArrayList values = new ArrayList(commits.values());
            return values.stream();
        }).collect(Collectors.toList());
    }

    public void assertCommits(List<Integer> offsets) {
        this.assertCommits(offsets, Optional.empty());
    }

    public CommitHistorySubject assertCommits() {
        List<OffsetAndMetadata> commitHistoryFlattened = this.getCommitHistoryFlattenedMeta();
        CommitHistory actual = new CommitHistory(commitHistoryFlattened);
        return CommitHistorySubject.assertThat(actual);
    }

    public void assertCommitLists(List<List<Integer>> offsets) {
        if (this.isUsingTransactionalProducer()) {
            this.ktu.assertCommitLists(this.producerSpy, offsets, Optional.empty());
        } else {
            List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> commitHistoryWithGropuId = this.consumerSpy.getCommitHistoryWithGroupId();
            this.ktu.assertCommitLists(commitHistoryWithGropuId, offsets, Optional.empty());
        }
    }

    protected List<Map<String, Map<TopicPartition, OffsetAndMetadata>>> getCommitHistory() {
        if (this.isUsingTransactionalProducer()) {
            return this.producerSpy.consumerGroupOffsetsHistory();
        }
        return this.consumerSpy.getCommitHistoryWithGroupId();
    }

    protected boolean isUsingTransactionalProducer() {
        ParallelConsumerOptions.CommitMode commitMode = this.parentParallelConsumer.getWm().getOptions().getCommitMode();
        return commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER);
    }

    protected boolean isUsingAsyncCommits() {
        ParallelConsumerOptions.CommitMode commitMode = this.parentParallelConsumer.getWm().getOptions().getCommitMode();
        return commitMode.equals((Object)ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS);
    }

    protected void releaseAndWait(List<CountDownLatch> locks, List<Integer> lockIndexes) {
        for (Integer i : lockIndexes) {
            log.debug("Releasing {}...", (Object)i);
            locks.get(i).countDown();
        }
        this.awaitForSomeLoopCycles(1);
    }

    protected void releaseAndWait(List<CountDownLatch> locks, int lockIndex) {
        log.debug("Releasing {}...", (Object)lockIndex);
        locks.get(lockIndex).countDown();
        this.awaitForSomeLoopCycles(1);
    }

    protected void pauseControlToAwaitForLatch(CountDownLatch latch) {
        this.pauseControlLoop();
        LatchTestUtils.awaitLatch(latch);
        this.resumeControlLoop();
        this.awaitForOneLoopCycle();
    }

    protected LongPollingMockConsumerSubject<String, String> assertThatConsumer(String msg) {
        return (LongPollingMockConsumerSubject)Truth.assertWithMessage((String)msg).about(LongPollingMockConsumerSubject.mockConsumers()).that(this.consumerSpy);
    }
}

