/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.RecordContext;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BatchTestMethods<POLL_RETURN> {
    private static final Logger log = LoggerFactory.getLogger(BatchTestMethods.class);
    public static final long FAILURE_TARGET = 5L;
    private final ParallelEoSStreamProcessorTestBase baseTest;

    protected abstract KafkaTestUtils getKtu();

    protected void setupParallelConsumer(int targetBatchSize, int maxConcurrency, ParallelConsumerOptions.ProcessingOrder ordering) {
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().batchSize(Integer.valueOf(targetBatchSize)).ordering(ordering).maxConcurrency(maxConcurrency).build();
        this.baseTest.setupParallelConsumerInstance(options);
        this.baseTest.parentParallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(5L));
    }

    protected abstract AbstractParallelEoSStreamProcessor getPC();

    public void averageBatchSizeTest(int numRecsExpected) {
        int targetBatchSize = 20;
        int maxConcurrency = 8;
        ProgressBar bar = ProgressBarUtils.getNewMessagesBar(log, numRecsExpected);
        this.setupParallelConsumer(20, maxConcurrency, ParallelConsumerOptions.ProcessingOrder.UNORDERED);
        this.getKtu().sendRecords(numRecsExpected);
        AtomicInteger numBatches = new AtomicInteger(0);
        AtomicInteger numRecordsProcessed = new AtomicInteger(0);
        long start = System.currentTimeMillis();
        RateLimiter statusLogger = new RateLimiter(1);
        this.averageBatchSizeTestPoll(numBatches, numRecordsProcessed, statusLogger);
        Awaitility.waitAtMost((Duration)AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).alias("expected number of records").failFast(() -> this.getPC().isClosedOrFailed()).untilAsserted(() -> {
            bar.stepTo((long)numRecordsProcessed.get());
            Assertions.assertThat((int)numRecordsProcessed.get()).isEqualTo(numRecsExpected);
        });
        bar.close();
        double targetMetThreshold = 0.999;
        double acceptableAttainedBatchSize = 20.0 * targetMetThreshold;
        double averageBatchSize = this.calcAverage(numRecordsProcessed, numBatches);
        Assertions.assertThat((double)averageBatchSize).isGreaterThan(acceptableAttainedBatchSize);
        this.baseTest.parentParallelConsumer.requestCommitAsap();
        this.baseTest.awaitForCommit(numRecsExpected);
        long duration = System.currentTimeMillis() - start;
        log.info("Processed {} records in {} ms. Average batch size was: {}. {} records per second.", new Object[]{numRecsExpected, duration, averageBatchSize, (double)numRecsExpected / ((double)duration / 1000.0)});
    }

    protected abstract void averageBatchSizeTestPoll(AtomicInteger var1, AtomicInteger var2, RateLimiter var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected POLL_RETURN averageBatchSizeTestPollInner(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger, PollContext<String, String> pollBatch) {
        int size = (int)pollBatch.size();
        statusLogger.performIfNotLimited(() -> {
            try {
                log.debug("Processed {} records in {} batches with average size {}", new Object[]{numRecords.get(), numBatches.get(), this.calcAverage(numRecords, numBatches)});
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        });
        try {
            log.trace("Batch size {}", (Object)size);
            POLL_RETURN POLL_RETURN = this.averageBatchSizeTestPollStep(pollBatch);
            return POLL_RETURN;
        }
        finally {
            numBatches.getAndIncrement();
            numRecords.addAndGet(size);
        }
    }

    protected abstract POLL_RETURN averageBatchSizeTestPollStep(PollContext<String, String> var1);

    private double calcAverage(AtomicInteger numRecords, AtomicInteger numBatches) {
        return (double)numRecords.get() / (0.0 + (double)numBatches.get());
    }

    public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) {
        int batchSizeSetting = 2;
        int numRecsExpected = 5;
        this.getPC().setTimeBetweenCommits(Duration.ofSeconds(1L));
        this.setupParallelConsumer(batchSizeSetting, 16, order);
        List<ConsumerRecord<String, String>> recs = this.getKtu().sendRecords(numRecsExpected);
        CopyOnWriteArrayList<PollContext<String, String>> batchesReceived = new CopyOnWriteArrayList<PollContext<String, String>>();
        this.simpleBatchTestPoll(batchesReceived);
        int expectedNumOfBatches = order == ParallelConsumerOptions.ProcessingOrder.PARTITION ? numRecsExpected : (int)Math.ceil((double)numRecsExpected / (double)batchSizeSetting);
        Awaitility.waitAtMost((Duration)AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).alias("expected number of batches").failFast(() -> this.getPC().isClosedOrFailed()).untilAsserted(() -> Assertions.assertThat((List)batchesReceived).hasSize(expectedNumOfBatches));
        ((ListAssert)((ListAssert)((ListAssert)Assertions.assertThat(batchesReceived).as("batch size", new Object[0])).allSatisfy(receivedBatchEntry -> Assertions.assertThat((Iterable)receivedBatchEntry).hasSizeLessThanOrEqualTo(batchSizeSetting))).as("all messages processed", new Object[0])).flatExtracting(PollContext::getConsumerRecordsFlattened).hasSameElementsAs(recs);
        Assertions.assertThat((boolean)this.getPC().isClosedOrFailed()).isFalse();
        this.baseTest.awaitForCommit(numRecsExpected);
        this.getPC().closeDrainFirst();
    }

    public abstract void simpleBatchTestPoll(List<PollContext<String, String>> var1);

    public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) {
        int batchSize = 5;
        int expectedNumOfMessages = 20;
        this.setupParallelConsumer(batchSize, 16, order);
        List<ConsumerRecord<String, String>> recs = this.getKtu().sendRecords(expectedNumOfMessages);
        List<PollContext<String, String>> receivedBatches = Collections.synchronizedList(new ArrayList());
        this.batchFailPoll(receivedBatches);
        this.baseTest.awaitForCommit(expectedNumOfMessages);
        int expectedNumOfBatches = (int)Math.ceil((double)expectedNumOfMessages / (double)batchSize);
        Assertions.assertThat(receivedBatches).hasSizeGreaterThanOrEqualTo(expectedNumOfBatches);
        ((ListAssert)((ListAssert)((ListAssert)Assertions.assertThat(receivedBatches).as("batch size", new Object[0])).allSatisfy(receivedBatch -> Assertions.assertThat((Iterable)receivedBatch).hasSizeLessThanOrEqualTo(batchSize))).as("all messages processed", new Object[0])).flatExtracting(PollContext::getConsumerRecordsFlattened).hasSameElementsAs(recs);
        Assertions.assertThat((boolean)this.getPC().isClosedOrFailed()).isFalse();
    }

    protected abstract void batchFailPoll(List<PollContext<String, String>> var1);

    protected POLL_RETURN batchFailPollInner(PollContext<String, String> batchPollContext) {
        List offsets = batchPollContext.getOffsetsFlattened();
        boolean contains = offsets.contains(5L);
        if (contains) {
            int targetAttempts;
            RecordContext target = batchPollContext.stream().filter(x -> x.offset() == 5L).findFirst().get();
            int numberOfFailedAttempts = target.getNumberOfFailedAttempts();
            if (numberOfFailedAttempts < (targetAttempts = 3)) {
                log.debug("Failing batch containing target offset {}", (Object)5L);
                throw new FakeRuntimeException(StringUtils.msg((String)"Testing failure processing a batch - pretend attempt #{}", (Object[])new Object[]{numberOfFailedAttempts}));
            }
            log.debug("Failing target {} now completing as has has reached target attempts {}", (Object)offsets, (Object)targetAttempts);
        }
        log.debug("Completing batch {}", (Object)offsets);
        return null;
    }

    public BatchTestMethods(ParallelEoSStreamProcessorTestBase baseTest) {
        this.baseTest = baseTest;
    }
}

