/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.integrationTests.utils.BrokerCommitAsserter;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerWrapper;
import io.confluent.parallelconsumer.truth.CommitHistorySubject;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniLists;

@Tag(value="transactions")
class TransactionTimeoutsTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(TransactionTimeoutsTest.class);
    public static final int NUMBER_TO_SEND = 5;
    public static final int SMALL_TIMEOUT = 2;
    private ParallelEoSStreamProcessor<String, String> pc;
    private String originalGroupId;
    BrokerCommitAsserter assertConsumer;

    TransactionTimeoutsTest() {
    }

    void setup(PCModule<String, String> module) {
        this.setupTopic(TransactionTimeoutsTest.class.getSimpleName());
        this.pc = new ParallelEoSStreamProcessor(module.options(), module);
        this.getKcu().produceMessages(this.getTopic(), 5L);
        this.pc.subscribe((Collection)UniLists.of((Object)this.getTopic()));
        this.originalGroupId = this.getKcu().getConsumer().groupMetadata().groupId();
        this.assertConsumer = new BrokerCommitAsserter(this.getTopic(), this.getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP));
    }

    private ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> createOptions() {
        return ParallelConsumerOptions.builder().consumer(this.getKcu().createNewConsumer()).producer(this.getKcu().createNewProducer(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).commitLockAcquisitionTimeout(Duration.ofSeconds(1L)).defaultMessageRetryDelay(Duration.ofMillis(100L)).produceLockAcquisitionTimeout(Duration.ofSeconds(2L)).commitInterval(Duration.ofSeconds(1L)).allowEagerProcessingDuringTransactionCommit(true);
    }

    @ParameterizedTest
    @ValueSource(ints={2, 50})
    void commitTimeout(int multiple) {
        ParallelConsumerOptions options = this.createOptions().allowEagerProcessingDuringTransactionCommit(false).build();
        this.setup((PCModule<String, String>)new PCModule(options));
        int offsetToGoVerySlow = 8;
        String outputTopic = this.getTopic() + "-output";
        int offsetToError = 12;
        this.pc.pollAndProduce(recordContexts -> {
            log.debug("Processing {}", (Object)recordContexts.offset());
            long offset = recordContexts.offset();
            if (offset == 8L) {
                log.debug("Processing offset {} - simulating a long processing phase with timeout multiple {}", (Object)8, (Object)multiple);
                ThreadUtils.sleepQuietly(1000 * multiple);
                log.debug("Processing offset {} - simulating a long processing phase COMPLETE", (Object)8);
            } else if (offset == (long)offsetToError) {
                throw new FakeRuntimeException("fail");
            }
            return new ProducerRecord(outputTopic, (Object)("output-value,source-offset: " + offset));
        });
        int target = 4;
        this.assertConsumer.assertConsumedAtLeastOffset(outputTopic, target);
        this.getKcu().produceMessages(this.getTopic(), 10L);
        this.pc.requestCommitAsap();
        Awaitility.await().untilAsserted(() -> ManagedTruth.assertThat(this.pc).isClosedOrFailed());
        ManagedTruth.assertThat(this.pc).getFailureCause().hasMessageThat().contains((CharSequence)"timeout");
        KafkaConsumer newConsumer = this.getKcu().createNewConsumer(this.originalGroupId);
        CommitHistorySubject assertCommittedToPartition = ManagedTruth.assertThat(newConsumer).hasCommittedToPartition(this.getTopic(), this.partitionNumber);
        assertCommittedToPartition.offset(8L);
        assertCommittedToPartition.encodedIncomplete(8, offsetToError);
    }

    @Test
    void produceTimeout() {
        int OFFSET_TO_PRODUCE_SLOWLY = 7;
        final CountDownLatch produceLock = new CountDownLatch(1);
        PCModule<String, String> slowCommitModule = new PCModule<String, String>(this.createOptions().build()){

            protected ProducerWrapper<String, String> producerWrap() {
                ProducerWrapper pw = (ProducerWrapper)Mockito.spy((Object)super.producerWrap());
                ((ProducerWrapper)Mockito.doAnswer(this::maybeSleep).when((Object)pw)).sendOffsetsToTransaction(ArgumentMatchers.anyMap(), (ConsumerGroupMetadata)ArgumentMatchers.any(ConsumerGroupMetadata.class));
                return pw;
            }

            private Object maybeSleep(InvocationOnMock invocation) {
                boolean firstCycle;
                Map offsets = (Map)invocation.getArguments()[0];
                OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)offsets.get(new TopicPartition(TransactionTimeoutsTest.this.getTopic(), 0));
                long offset = offsetAndMetadata.offset();
                boolean bl = firstCycle = produceLock.getCount() > 0L;
                if (offset == 7L && firstCycle) {
                    log.debug("Causing commit to take too long which will trigger produce lock timeout");
                    produceLock.countDown();
                    ThreadUtils.sleepQuietly(5000);
                    log.debug("Causing commit to take too long COMPLETE");
                }
                return invocation.callRealMethod();
            }
        };
        this.setup(slowCommitModule);
        AtomicInteger retryCount = new AtomicInteger();
        String OUTPUT_TOPIC = this.getTopic() + "-output";
        this.pc.pollAndProduce(recordContexts -> {
            long offset = recordContexts.offset();
            log.debug("Processing {}", (Object)recordContexts.offset());
            if (offset == 7L) {
                boolean firstCycle;
                int numberOfFailedAttempts = recordContexts.getSingleRecord().getNumberOfFailedAttempts();
                log.debug("Updating failed attempts to {}", (Object)numberOfFailedAttempts);
                retryCount.set(numberOfFailedAttempts);
                boolean bl = firstCycle = produceLock.getCount() > 0L;
                if (firstCycle) {
                    log.debug("Waiting for commit to start before trying to acquire produce lock");
                    LatchTestUtils.awaitLatch(produceLock);
                    ThreadUtils.sleepQuietly(1000);
                }
            }
            return new ProducerRecord(OUTPUT_TOPIC, (Object)"random");
        });
        this.assertConsumer.assertConsumedAtLeastOffset(OUTPUT_TOPIC, 4);
        log.debug("Send more records to trigger timeout condition above...");
        int EXTRA_TO_SEND = 4;
        this.getKcu().produceMessages(this.getTopic(), 4L);
        this.assertConsumer.assertConsumedAtMostOffset(OUTPUT_TOPIC, 4);
        Awaitility.await().untilAsserted(() -> Truth.assertThat((Integer)retryCount.get()).isAtLeast((Comparable)Integer.valueOf(1)));
        this.assertConsumer.assertConsumedAtLeastOffset(OUTPUT_TOPIC, 9);
    }
}

