package io.confluent.parallelconsumer.integrationTests;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ProgressTracker;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.BrokerCommitAsserter;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerWrapper;
import io.confluent.parallelconsumer.truth.CommitHistorySubject;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniLists;

@Tag("transactions")
/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/TransactionTimeoutsTest.class */
class TransactionTimeoutsTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(TransactionTimeoutsTest.class);
    public static final int NUMBER_TO_SEND = 5;
    public static final int SMALL_TIMEOUT = 2;
    private ParallelEoSStreamProcessor<String, String> pc;
    private String originalGroupId;
    BrokerCommitAsserter assertConsumer;

    TransactionTimeoutsTest() {
    }

    void setup(PCModule<String, String> pCModule) {
        setupTopic(TransactionTimeoutsTest.class.getSimpleName());
        this.pc = new ParallelEoSStreamProcessor<>(pCModule.options(), pCModule);
        getKcu().produceMessages(getTopic(), 5L);
        this.pc.subscribe(UniLists.of(getTopic()));
        this.originalGroupId = getKcu().getConsumer().groupMetadata().groupId();
        this.assertConsumer = new BrokerCommitAsserter(getTopic(), getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP));
    }

    private ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> createOptions() {
        return ParallelConsumerOptions.builder().consumer(getKcu().createNewConsumer()).producer(getKcu().createNewProducer(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).commitLockAcquisitionTimeout(Duration.ofSeconds(1L)).defaultMessageRetryDelay(Duration.ofMillis(100L)).produceLockAcquisitionTimeout(Duration.ofSeconds(2L)).commitInterval(Duration.ofSeconds(1L)).allowEagerProcessingDuringTransactionCommit(true);
    }

    @ValueSource(ints = {SMALL_TIMEOUT, ProgressTracker.WARMED_UP_AFTER_X_MESSAGES})
    @ParameterizedTest
    void commitTimeout(int i) {
        setup(new PCModule<>(createOptions().allowEagerProcessingDuringTransactionCommit(false).build()));
        String str = getTopic() + "-output";
        int i2 = 12;
        this.pc.pollAndProduce(pollContext -> {
            log.debug("Processing {}", Long.valueOf(pollContext.offset()));
            long offset = pollContext.offset();
            if (offset == 8) {
                log.debug("Processing offset {} - simulating a long processing phase with timeout multiple {}", 8, Integer.valueOf(i));
                ThreadUtils.sleepQuietly(1000 * i);
                log.debug("Processing offset {} - simulating a long processing phase COMPLETE", 8);
            } else if (offset == i2) {
                throw new FakeRuntimeException("fail");
            }
            return new ProducerRecord(str, "output-value,source-offset: " + offset);
        });
        this.assertConsumer.assertConsumedAtLeastOffset(str, 4);
        getKcu().produceMessages(getTopic(), 10L);
        this.pc.requestCommitAsap();
        Awaitility.await().untilAsserted(() -> {
            ManagedTruth.assertThat(this.pc).isClosedOrFailed();
        });
        ManagedTruth.assertThat(this.pc).getFailureCause().hasMessageThat().contains("timeout");
        CommitHistorySubject hasCommittedToPartition = ManagedTruth.assertThat((Consumer) getKcu().createNewConsumer(this.originalGroupId)).hasCommittedToPartition(getTopic(), this.partitionNumber);
        hasCommittedToPartition.offset(8L);
        hasCommittedToPartition.encodedIncomplete(8, 12);
    }

    @Test
    void produceTimeout() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        setup(new PCModule<String, String>(createOptions().build()) { // from class: io.confluent.parallelconsumer.integrationTests.TransactionTimeoutsTest.1
            protected ProducerWrapper<String, String> producerWrap() {
                ProducerWrapper<String, String> producerWrapper = (ProducerWrapper) Mockito.spy(super.producerWrap());
                ((ProducerWrapper) Mockito.doAnswer(this::maybeSleep).when(producerWrapper)).sendOffsetsToTransaction(ArgumentMatchers.anyMap(), (ConsumerGroupMetadata) ArgumentMatchers.any(ConsumerGroupMetadata.class));
                return producerWrapper;
            }

            private Object maybeSleep(InvocationOnMock invocationOnMock) {
                long offset = ((OffsetAndMetadata) ((Map) invocationOnMock.getArguments()[0]).get(new TopicPartition(TransactionTimeoutsTest.this.getTopic(), 0))).offset();
                boolean z = countDownLatch.getCount() > 0;
                if (offset == 7 && z) {
                    TransactionTimeoutsTest.log.debug("Causing commit to take too long which will trigger produce lock timeout");
                    countDownLatch.countDown();
                    ThreadUtils.sleepQuietly(5000);
                    TransactionTimeoutsTest.log.debug("Causing commit to take too long COMPLETE");
                }
                return invocationOnMock.callRealMethod();
            }
        });
        AtomicInteger atomicInteger = new AtomicInteger();
        String str = getTopic() + "-output";
        this.pc.pollAndProduce(pollContext -> {
            long offset = pollContext.offset();
            log.debug("Processing {}", Long.valueOf(pollContext.offset()));
            if (offset == 7) {
                int numberOfFailedAttempts = pollContext.getSingleRecord().getNumberOfFailedAttempts();
                log.debug("Updating failed attempts to {}", Integer.valueOf(numberOfFailedAttempts));
                atomicInteger.set(numberOfFailedAttempts);
                if (countDownLatch.getCount() > 0) {
                    log.debug("Waiting for commit to start before trying to acquire produce lock");
                    LatchTestUtils.awaitLatch(countDownLatch);
                    ThreadUtils.sleepQuietly(1000);
                }
            }
            return new ProducerRecord(str, "random");
        });
        this.assertConsumer.assertConsumedAtLeastOffset(str, 4);
        log.debug("Send more records to trigger timeout condition above...");
        getKcu().produceMessages(getTopic(), 4L);
        this.assertConsumer.assertConsumedAtMostOffset(str, 4);
        Awaitility.await().untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isAtLeast(1);
        });
        this.assertConsumer.assertConsumedAtLeastOffset(str, 9);
    }
}
