/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniSets;

@Tag(value="transactions")
public class TransactionMarkersTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(TransactionMarkersTest.class);
    final int LIMIT = 1;
    AtomicInteger receivedRecordCount = new AtomicInteger();
    Producer<String, String> txProducer;
    Producer<String, String> txProducerTwo;
    Producer<String, String> txProducerThree;
    Producer<String, String> normalProducer;
    Consumer<String, String> consumer;
    protected ParallelEoSStreamProcessor<String, String> pc;

    @BeforeEach
    void setup() {
        this.setupTopic();
        this.consumer = this.getKcu().getConsumer();
        this.txProducer = this.getKcu().createAndInitNewTransactionalProducer();
        this.txProducerTwo = this.getKcu().createAndInitNewTransactionalProducer();
        this.txProducerThree = this.getKcu().createAndInitNewTransactionalProducer();
        this.normalProducer = this.getKcu().createNewProducer(KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL);
        this.pc = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        this.pc.subscribe((Collection)UniSets.of((Object)this.topic));
    }

    @Override
    @AfterEach
    void close() {
        this.pc.close();
    }

    @Test
    void single() {
        this.sendOneTransaction();
        this.sendRecordsNonTransactionally(1);
        this.runPcAndBlockRecordsOverLimitIndex();
        this.waitForRecordsToBeReceived();
        this.pc.close();
    }

    @Test
    void doubleTransaction() {
        this.sendOneTransaction();
        this.sendOneTransaction();
        this.runPcAndBlockRecordsOverLimitIndex();
        this.waitForRecordsToBeReceived();
        this.pc.close();
    }

    private void waitForRecordsToBeReceived() {
        int expected = 2;
        this.waitForRecordsToBeReceived(expected);
    }

    private void waitForRecordsToBeReceived(int expected) {
        log.debug("Awaiting {} records to be received...", (Object)expected);
        Awaitility.await().untilAsserted(() -> Truth.assertThat((Integer)this.receivedRecordCount.get()).isAtLeast((Comparable)Integer.valueOf(expected)));
        log.debug("Awaiting {} records to be received - done.", (Object)expected);
    }

    private void runPcAndBlockRecordsOverLimitIndex() {
        int blockOver = 1;
        this.runPcAndBlockRecordsOverLimitIndex(blockOver);
    }

    private void runPcAndBlockRecordsOverLimitIndex(int blockOver) {
        this.pc.poll(recordContexts -> {
            int index = this.receivedRecordCount.incrementAndGet();
            log.debug("Got record index: {} ...", (Object)index);
            if (index > blockOver) {
                try {
                    log.debug(StringUtils.msg((String)"{} over block limit of {}, blocking...", (Object[])new Object[]{index, blockOver}));
                    Thread.sleep(Long.MAX_VALUE);
                }
                catch (InterruptedException e) {
                    throw new FakeRuntimeException(e);
                }
            }
        });
    }

    private void sendOneTransaction() {
        this.txProducer.beginTransaction();
        this.txProducer.send(this.createRecordToSend());
        this.txProducer.commitTransaction();
    }

    @NotNull
    private ProducerRecord<String, String> createRecordToSend() {
        return new ProducerRecord(this.topic, (Object)"");
    }

    protected List<Future<RecordMetadata>> sendRecordsNonTransactionally(int count) {
        return IntStream.of(count).mapToObj(ignored -> this.normalProducer.send(this.createRecordToSend())).collect(Collectors.toList());
    }

    @Test
    void several() {
        this.sendSeveralTransaction();
        this.sendRecordsNonTransactionally(10);
        this.runPcAndBlockRecordsOverLimitIndex();
        this.waitForRecordsToBeReceived();
        this.pc.close();
    }

    private void sendSeveralTransaction() {
        IntStream.of(10).forEach(value -> this.sendOneTransaction());
    }

    @Test
    void dontBlockFirstRecords() {
        this.sendSeveralTransaction();
        this.sendRecordsNonTransactionally(10);
        this.runPcAndBlockRecordsOverLimitIndex(3);
        this.waitForRecordsToBeReceived();
        this.pc.close();
    }

    @Test
    void dontBlockAnyRecords() {
        this.sendSeveralTransaction();
        this.sendRecordsNonTransactionally(10);
        this.runPcAndBlockRecordsOverLimitIndex(Integer.MAX_VALUE);
        this.waitForRecordsToBeReceived();
        this.pc.close();
    }

    @Test
    void overLappingTransactions() {
        int numberOfBaseRecords = 3;
        this.startAndOneRecord(this.txProducer);
        this.startAndOneRecord(this.txProducerTwo);
        this.startAndOneRecord(this.txProducerThree);
        this.commitTx(this.txProducer);
        this.commitTx(this.txProducerTwo);
        this.commitTx(this.txProducerThree);
        this.sendRecordsNonTransactionally(2);
        this.runPcAndBlockRecordsOverLimitIndex(numberOfBaseRecords);
        this.waitForRecordsToBeReceived(numberOfBaseRecords);
        this.pc.close();
    }

    private void commitTx(Producer<String, String> txProducer) {
        txProducer.commitTransaction();
    }

    private void startAndOneRecord(Producer<String, String> txProducer) {
        txProducer.beginTransaction();
        txProducer.send(this.createRecordToSend());
    }
}

