package io.confluent.parallelconsumer;

import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.ProgressBarUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import me.tongfei.progressbar.ProgressBar;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/BatchTestMethods.class */
public abstract class BatchTestMethods<POLL_RETURN> {
    private static final Logger log = LoggerFactory.getLogger(BatchTestMethods.class);
    public static final long FAILURE_TARGET = 5;
    private final ParallelEoSStreamProcessorTestBase baseTest;

    protected abstract KafkaTestUtils getKtu();

    protected void setupParallelConsumer(int i, int i2, ParallelConsumerOptions.ProcessingOrder processingOrder) {
        this.baseTest.setupParallelConsumerInstance(ParallelConsumerOptions.builder().batchSize(Integer.valueOf(i)).ordering(processingOrder).maxConcurrency(i2).build());
        this.baseTest.parentParallelConsumer.setTimeBetweenCommits(Duration.ofSeconds(5L));
    }

    protected abstract AbstractParallelEoSStreamProcessor getPC();

    public void averageBatchSizeTest(int i) {
        ProgressBar newMessagesBar = ProgressBarUtils.getNewMessagesBar(log, i);
        setupParallelConsumer(20, 8, ParallelConsumerOptions.ProcessingOrder.UNORDERED);
        getKtu().sendRecords(i);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        long currentTimeMillis = System.currentTimeMillis();
        averageBatchSizeTestPoll(atomicInteger, atomicInteger2, new RateLimiter(1));
        Awaitility.waitAtMost(AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).alias("expected number of records").failFast(() -> {
            return Boolean.valueOf(getPC().isClosedOrFailed());
        }).untilAsserted(() -> {
            newMessagesBar.stepTo(atomicInteger2.get());
            Assertions.assertThat(atomicInteger2.get()).isEqualTo(i);
        });
        newMessagesBar.close();
        double calcAverage = calcAverage(atomicInteger2, atomicInteger);
        Assertions.assertThat(calcAverage).isGreaterThan(20.0d * 0.999d);
        this.baseTest.parentParallelConsumer.requestCommitAsap();
        this.baseTest.awaitForCommit(i);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        log.info("Processed {} records in {} ms. Average batch size was: {}. {} records per second.", new Object[]{Integer.valueOf(i), Long.valueOf(currentTimeMillis2), Double.valueOf(calcAverage), Double.valueOf(i / (currentTimeMillis2 / 1000.0d))});
    }

    protected abstract void averageBatchSizeTestPoll(AtomicInteger atomicInteger, AtomicInteger atomicInteger2, RateLimiter rateLimiter);

    /* JADX INFO: Access modifiers changed from: protected */
    public POLL_RETURN averageBatchSizeTestPollInner(AtomicInteger atomicInteger, AtomicInteger atomicInteger2, RateLimiter rateLimiter, PollContext<String, String> pollContext) {
        int size = (int) pollContext.size();
        rateLimiter.performIfNotLimited(() -> {
            try {
                log.debug("Processed {} records in {} batches with average size {}", new Object[]{Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger.get()), Double.valueOf(calcAverage(atomicInteger2, atomicInteger))});
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        });
        try {
            log.trace("Batch size {}", Integer.valueOf(size));
            POLL_RETURN averageBatchSizeTestPollStep = averageBatchSizeTestPollStep(pollContext);
            atomicInteger.getAndIncrement();
            atomicInteger2.addAndGet(size);
            return averageBatchSizeTestPollStep;
        } catch (Throwable th) {
            atomicInteger.getAndIncrement();
            atomicInteger2.addAndGet(size);
            throw th;
        }
    }

    protected abstract POLL_RETURN averageBatchSizeTestPollStep(PollContext<String, String> pollContext);

    private double calcAverage(AtomicInteger atomicInteger, AtomicInteger atomicInteger2) {
        return atomicInteger.get() / (0.0d + atomicInteger2.get());
    }

    public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        int i = 2;
        getPC().setTimeBetweenCommits(Duration.ofSeconds(1L));
        setupParallelConsumer(2, 16, processingOrder);
        List<ConsumerRecord<String, String>> sendRecords = getKtu().sendRecords(5);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        simpleBatchTestPoll(copyOnWriteArrayList);
        int ceil = processingOrder == ParallelConsumerOptions.ProcessingOrder.PARTITION ? 5 : (int) Math.ceil(5 / 2);
        Awaitility.waitAtMost(AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).alias("expected number of batches").failFast(() -> {
            return Boolean.valueOf(getPC().isClosedOrFailed());
        }).untilAsserted(() -> {
            Assertions.assertThat(copyOnWriteArrayList).hasSize(ceil);
        });
        Assertions.assertThat(copyOnWriteArrayList).as("batch size", new Object[0]).allSatisfy(pollContext -> {
            Assertions.assertThat(pollContext).hasSizeLessThanOrEqualTo(i);
        }).as("all messages processed", new Object[0]).flatExtracting((v0) -> {
            return v0.getConsumerRecordsFlattened();
        }).hasSameElementsAs(sendRecords);
        Assertions.assertThat(getPC().isClosedOrFailed()).isFalse();
        this.baseTest.awaitForCommit(5);
        getPC().closeDrainFirst();
    }

    public abstract void simpleBatchTestPoll(List<PollContext<String, String>> list);

    public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        int i = 5;
        setupParallelConsumer(5, 16, processingOrder);
        List<ConsumerRecord<String, String>> sendRecords = getKtu().sendRecords(20);
        List<PollContext<String, String>> synchronizedList = Collections.synchronizedList(new ArrayList());
        batchFailPoll(synchronizedList);
        int ceil = (int) Math.ceil(20 / 5);
        this.baseTest.awaitForCommit(20);
        Assertions.assertThat(synchronizedList).hasSizeGreaterThanOrEqualTo(ceil);
        Assertions.assertThat(synchronizedList).as("batch size", new Object[0]).allSatisfy(pollContext -> {
            Assertions.assertThat(pollContext).hasSizeLessThanOrEqualTo(i);
        }).as("all messages processed", new Object[0]).flatExtracting((v0) -> {
            return v0.getConsumerRecordsFlattened();
        }).hasSameElementsAs(sendRecords);
        Assertions.assertThat(getPC().isClosedOrFailed()).isFalse();
    }

    protected abstract void batchFailPoll(List<PollContext<String, String>> list);

    /* JADX INFO: Access modifiers changed from: protected */
    public POLL_RETURN batchFailPollInner(PollContext<String, String> pollContext) {
        List offsetsFlattened = pollContext.getOffsetsFlattened();
        log.debug("Got batch {}", offsetsFlattened);
        if (offsetsFlattened.contains(5L)) {
            int numberOfFailedAttempts = ((RecordContext) pollContext.stream().filter(recordContext -> {
                return recordContext.offset() == 5;
            }).findFirst().get()).getNumberOfFailedAttempts();
            if (numberOfFailedAttempts < 3) {
                log.debug("Failing batch containing target offset {}", 5L);
                throw new FakeRuntimeError(StringUtils.msg("Testing failure processing a batch - pretend attempt #{}", new Object[]{Integer.valueOf(numberOfFailedAttempts)}));
            }
            log.debug("Failing target {} now completing as has has reached target attempts {}", offsetsFlattened, 3);
        }
        log.debug("Completing batch {}", offsetsFlattened);
        return null;
    }

    public BatchTestMethods(ParallelEoSStreamProcessorTestBase parallelEoSStreamProcessorTestBase) {
        this.baseTest = parallelEoSStreamProcessorTestBase;
    }
}
