package io.confluent.parallelconsumer.integrationTests;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniSets;

@Tag("transactions")
/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.class */
public class TransactionMarkersTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(TransactionMarkersTest.class);
    final int LIMIT = 1;
    AtomicInteger receivedRecordCount = new AtomicInteger();
    Producer<String, String> txProducer;
    Producer<String, String> txProducerTwo;
    Producer<String, String> txProducerThree;
    Producer<String, String> normalProducer;
    Consumer<String, String> consumer;
    protected ParallelEoSStreamProcessor<String, String> pc;

    @BeforeEach
    void setup() {
        setupTopic();
        this.consumer = getKcu().getConsumer();
        this.txProducer = getKcu().createAndInitNewTransactionalProducer();
        this.txProducerTwo = getKcu().createAndInitNewTransactionalProducer();
        this.txProducerThree = getKcu().createAndInitNewTransactionalProducer();
        this.normalProducer = getKcu().createNewProducer(KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL);
        this.pc = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder().consumer(this.consumer).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build());
        this.pc.subscribe(UniSets.of(this.topic));
    }

    @Override // io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest
    @AfterEach
    void close() {
        this.pc.close();
    }

    @Test
    void single() {
        sendOneTransaction();
        sendRecordsNonTransactionally(1);
        runPcAndBlockRecordsOverLimitIndex();
        waitForRecordsToBeReceived();
        this.pc.close();
    }

    @Test
    void doubleTransaction() {
        sendOneTransaction();
        sendOneTransaction();
        runPcAndBlockRecordsOverLimitIndex();
        waitForRecordsToBeReceived();
        this.pc.close();
    }

    private void waitForRecordsToBeReceived() {
        waitForRecordsToBeReceived(2);
    }

    private void waitForRecordsToBeReceived(int i) {
        log.debug("Awaiting {} records to be received...", Integer.valueOf(i));
        Awaitility.await().untilAsserted(() -> {
            Truth.assertThat(Integer.valueOf(this.receivedRecordCount.get())).isAtLeast(Integer.valueOf(i));
        });
        log.debug("Awaiting {} records to be received - done.", Integer.valueOf(i));
    }

    private void runPcAndBlockRecordsOverLimitIndex() {
        runPcAndBlockRecordsOverLimitIndex(1);
    }

    private void runPcAndBlockRecordsOverLimitIndex(int i) {
        this.pc.poll(pollContext -> {
            int incrementAndGet = this.receivedRecordCount.incrementAndGet();
            log.debug("Got record index: {} ...", Integer.valueOf(incrementAndGet));
            if (incrementAndGet > i) {
                try {
                    log.debug(StringUtils.msg("{} over block limit of {}, blocking...", new Object[]{Integer.valueOf(incrementAndGet), Integer.valueOf(i)}));
                    Thread.sleep(Long.MAX_VALUE);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    private void sendOneTransaction() {
        this.txProducer.beginTransaction();
        this.txProducer.send(createRecordToSend());
        this.txProducer.commitTransaction();
    }

    @NotNull
    private ProducerRecord<String, String> createRecordToSend() {
        return new ProducerRecord<>(this.topic, "");
    }

    protected List<Future<RecordMetadata>> sendRecordsNonTransactionally(int i) {
        return (List) IntStream.of(i).mapToObj(i2 -> {
            return this.normalProducer.send(createRecordToSend());
        }).collect(Collectors.toList());
    }

    @Test
    void several() {
        sendSeveralTransaction();
        sendRecordsNonTransactionally(10);
        runPcAndBlockRecordsOverLimitIndex();
        waitForRecordsToBeReceived();
        this.pc.close();
    }

    private void sendSeveralTransaction() {
        IntStream.of(10).forEach(i -> {
            sendOneTransaction();
        });
    }

    @Test
    void dontBlockFirstRecords() {
        sendSeveralTransaction();
        sendRecordsNonTransactionally(10);
        runPcAndBlockRecordsOverLimitIndex(3);
        waitForRecordsToBeReceived();
        this.pc.close();
    }

    @Test
    void dontBlockAnyRecords() {
        sendSeveralTransaction();
        sendRecordsNonTransactionally(10);
        runPcAndBlockRecordsOverLimitIndex(Integer.MAX_VALUE);
        waitForRecordsToBeReceived();
        this.pc.close();
    }

    @Test
    void overLappingTransactions() {
        startAndOneRecord(this.txProducer);
        startAndOneRecord(this.txProducerTwo);
        startAndOneRecord(this.txProducerThree);
        commitTx(this.txProducer);
        commitTx(this.txProducerTwo);
        commitTx(this.txProducerThree);
        sendRecordsNonTransactionally(2);
        runPcAndBlockRecordsOverLimitIndex(3);
        waitForRecordsToBeReceived(3);
        this.pc.close();
    }

    private void commitTx(Producer<String, String> producer) {
        producer.commitTransaction();
    }

    private void startAndOneRecord(Producer<String, String> producer) {
        producer.beginTransaction();
        producer.send(createRecordToSend());
    }
}
