package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.lang.math.RandomUtils;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

@Timeout(60)
/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/CloseAndOpenOffsetTest.class */
public class CloseAndOpenOffsetTest extends KafkaTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(CloseAndOpenOffsetTest.class);
    Duration normalTimeout = Duration.ofSeconds(5);
    Duration debugTimeout = Duration.ofMinutes(10);
    Duration timeoutToUse = this.normalTimeout;
    String rebalanceTopic;

    @BeforeEach
    void setup() {
        this.rebalanceTopic = "close-and-open-" + RandomUtils.nextInt();
    }

    @Test
    void offsetsOpenClose() {
        try {
            ensureTopic(this.rebalanceTopic, 1);
        } catch (Exception e) {
            log.warn(e.getMessage(), e);
        }
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build();
        this.kcu.props.put("client.id", "ONE-my-client");
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(this.kcu.createNewConsumer(), this.kcu.createNewProducer(true), build);
        parallelEoSStreamProcessor.subscribe(UniLists.of(this.rebalanceTopic));
        ArrayList arrayList = new ArrayList();
        parallelEoSStreamProcessor.poll(consumerRecord -> {
            log.info("Read by consumer ONE: {}", consumerRecord);
            if (((String) consumerRecord.value()).equals("4")) {
                log.info("Throwing fake error for message 4");
                throw new RuntimeException("Message 4");
            }
            if (((String) consumerRecord.value()).equals("2")) {
                log.info("Throwing fake error for message 2");
                throw new RuntimeException("Message 2");
            }
            arrayList.add(consumerRecord);
        });
        send(this.rebalanceTopic, 0, (Integer) 0);
        send(this.rebalanceTopic, 0, (Integer) 1);
        send(this.rebalanceTopic, 0, (Integer) 2);
        send(this.rebalanceTopic, 0, (Integer) 3);
        send(this.rebalanceTopic, 0, (Integer) 4);
        send(this.rebalanceTopic, 0, (Integer) 5);
        Awaitility.await().atMost(this.debugTimeout).untilAsserted(() -> {
            Assertions.assertThat(arrayList).hasSize(4);
        });
        log.info("Closing consumer, committing offset map");
        parallelEoSStreamProcessor.closeDontDrainFirst();
        this.kcu.props.put("client.id", "THREE-my-client");
        ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(this.kcu.createNewConsumer(), this.kcu.createNewProducer(true), build);
        try {
            parallelEoSStreamProcessor2.subscribe(UniLists.of(this.rebalanceTopic));
            ArrayList arrayList2 = new ArrayList();
            parallelEoSStreamProcessor2.poll(consumerRecord2 -> {
                log.info("Read by consumer THREE: {}", consumerRecord2.value());
                arrayList2.add(consumerRecord2);
            });
            Awaitility.await().atMost(this.timeoutToUse).untilAsserted(() -> {
                Assertions.assertThat(arrayList2).extracting((v0) -> {
                    return v0.value();
                }).containsExactlyInAnyOrder(new String[]{"2", "4"});
            });
            parallelEoSStreamProcessor2.close();
        } finally {
        }
    }

    private void send(String str, int i, Integer num) throws InterruptedException, ExecutionException {
    }

    private void send(int i, String str, int i2) throws InterruptedException, ExecutionException {
        log.debug("Sending {} messages to {}", Integer.valueOf(i), str);
        ArrayList arrayList = new ArrayList();
        Iterator it = Range.range(i).iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            arrayList.add(this.kcu.producer.send(new ProducerRecord(str, Integer.valueOf(i2), num.toString(), num.toString())));
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Future) it2.next()).get();
        }
        log.debug("Finished sending {} messages", Integer.valueOf(i));
    }

    @Test
    void correctOffsetVerySimple() {
        setupTopic();
        this.kcu.producer.send(new ProducerRecord(this.topic, "0", "0"));
        KafkaConsumer createNewConsumer = this.kcu.createNewConsumer();
        KafkaProducer createNewProducer = this.kcu.createNewProducer(true);
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build();
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(createNewConsumer, createNewProducer, build);
        try {
            parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            parallelEoSStreamProcessor.poll((v1) -> {
                r1.add(v1);
            });
            Awaitility.await().untilAsserted(() -> {
                Assertions.assertThat(arrayList).extracting((v0) -> {
                    return v0.value();
                }).containsExactly(new String[]{"0"});
            });
            parallelEoSStreamProcessor.close();
            this.kcu.props.put("client.id", "THREE-my-client");
            parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(this.kcu.createNewConsumer(), this.kcu.createNewProducer(true), build);
            try {
                parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
                ArrayList arrayList2 = new ArrayList();
                parallelEoSStreamProcessor.poll(consumerRecord -> {
                    log.info("Three read: {}", consumerRecord.value());
                    arrayList2.add(consumerRecord);
                });
                Awaitility.await().alias("nothing should be read back (will be long enough to be sure it never is)").pollDelay(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(2L)).atLeast(Duration.ofSeconds(1L)).untilAsserted(() -> {
                    Assertions.assertThat(arrayList2).as("Nothing should be read into the collection", new Object[0]).extracting((v0) -> {
                        return v0.value();
                    }).isEmpty();
                });
                parallelEoSStreamProcessor.close();
            } finally {
            }
        } finally {
        }
    }

    @Test
    void largeNumberOfMessagesSmallOffsetBitmap() {
        setupTopic();
        int i = KafkaClientUtils.MAX_POLL_RECORDS;
        Assertions.assertThat(KafkaClientUtils.MAX_POLL_RECORDS).as("Test expects to process all the produced messages in a single poll", new Object[0]).isLessThanOrEqualTo(KafkaClientUtils.MAX_POLL_RECORDS);
        send(KafkaClientUtils.MAX_POLL_RECORDS, this.topic, 0);
        KafkaConsumer createNewConsumer = this.kcu.createNewConsumer();
        KafkaProducer createNewProducer = this.kcu.createNewProducer(true);
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).build();
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(createNewConsumer, createNewProducer, build);
        parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
        Set of = UniSets.of("123", "2345", "8765");
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        parallelEoSStreamProcessor.poll(consumerRecord -> {
            String str = (String) consumerRecord.value();
            if (of.contains(str)) {
                log.info("Throwing fake error for message {}", str);
                throw new RuntimeException("Message " + str);
            }
            concurrentSkipListSet.add(str);
        });
        int size = of.size();
        Awaitility.await().atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
            Assertions.assertThat(concurrentSkipListSet.size()).isEqualTo(i - size);
        });
        parallelEoSStreamProcessor.closeDontDrainFirst();
        this.kcu.props.put("client.id", "THREE-my-client");
        ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(this.kcu.createNewConsumer(), this.kcu.createNewProducer(true), build);
        try {
            parallelEoSStreamProcessor2.subscribe(UniLists.of(this.topic));
            ConcurrentSkipListSet concurrentSkipListSet2 = new ConcurrentSkipListSet();
            parallelEoSStreamProcessor2.poll(consumerRecord2 -> {
                log.trace("Three read: {}", consumerRecord2.value());
                concurrentSkipListSet2.add((String) consumerRecord2.value());
            });
            Awaitility.await().alias("Only the one remaining failing message should be submitted for processing").pollDelay(Duration.ofSeconds(1L)).atLeast(Duration.ofSeconds(1L)).untilAsserted(() -> {
                Assertions.assertThat(concurrentSkipListSet2.size()).as("Contains only previously failed messages", new Object[0]).isEqualTo(size);
            });
            Assertions.assertThat(concurrentSkipListSet2).hasSize(size);
            parallelEoSStreamProcessor2.close();
        } finally {
        }
    }
}
