package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

@Timeout(60)
/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/CloseAndOpenOffsetTest.class */
class CloseAndOpenOffsetTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(CloseAndOpenOffsetTest.class);
    Duration normalTimeout = Duration.ofSeconds(5);
    Duration debugTimeout = Duration.ofMinutes(1);
    Duration timeoutToUse = this.normalTimeout;
    String rebalanceTopic;

    CloseAndOpenOffsetTest() {
    }

    @BeforeEach
    void setup() {
        this.rebalanceTopic = "close-and-open-" + RandomUtils.nextInt();
    }

    @Timeout(60)
    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ)
    @EnumSource
    @ParameterizedTest
    void offsetsOpenClose(OffsetEncoding offsetEncoding) {
        Assumptions.assumeFalse(UniLists.of(OffsetEncoding.ByteArray, OffsetEncoding.ByteArrayCompressed, OffsetEncoding.KafkaStreams, OffsetEncoding.KafkaStreamsV2).contains(offsetEncoding));
        OffsetMapCodecManager.forcedCodec = Optional.of(offsetEncoding);
        OffsetSimultaneousEncoder.compressionForced = true;
        try {
            ensureTopic(this.rebalanceTopic, 1);
        } catch (Exception e) {
            log.warn(e.getMessage(), e);
        }
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer();
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).consumer(createNewConsumer).producer(getKcu().createNewProducer(true)).build();
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(build);
        parallelEoSStreamProcessor.subscribe(UniLists.of(this.rebalanceTopic));
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        parallelEoSStreamProcessor.poll(pollContext -> {
            log.info("Read by consumer ONE: {}", pollContext);
            if (((String) pollContext.value()).equals("4")) {
                log.info("Throwing fake error for message 4");
                throw new FakeRuntimeException("Fake error - Message 4");
            }
            if (((String) pollContext.value()).equals("2")) {
                log.info("Throwing fake error for message 2");
                throw new FakeRuntimeException("Fake error - Message 2");
            }
            concurrentLinkedQueue.add(pollContext.getSingleConsumerRecord());
        });
        Thread.sleep(500L);
        send(this.rebalanceTopic, 0, (Integer) 0);
        send(this.rebalanceTopic, 0, (Integer) 1);
        send(this.rebalanceTopic, 0, (Integer) 2);
        send(this.rebalanceTopic, 0, (Integer) 3);
        send(this.rebalanceTopic, 0, (Integer) 4);
        send(this.rebalanceTopic, 0, (Integer) 5);
        Awaitility.await().alias("check all except 2 and 4 are processed").atMost(this.normalTimeout).untilAsserted(() -> {
            Assertions.assertThat((List) new ArrayList(concurrentLinkedQueue).stream().map((v0) -> {
                return v0.value();
            }).collect(Collectors.toList())).containsOnly(new String[]{"0", "1", "3", "5"});
        });
        Thread.sleep(500L);
        log.info("Closing consumer, committing offset map");
        parallelEoSStreamProcessor.closeDontDrainFirst();
        Awaitility.await().alias("check all except 2 and 4 are processed").atMost(this.normalTimeout).untilAsserted(() -> {
            Assertions.assertThat((List) concurrentLinkedQueue.stream().map(consumerRecord -> {
                return (String) consumerRecord.value();
            }).collect(Collectors.toList())).containsOnly(new String[]{"0", "1", "3", "5"});
        });
        Assertions.assertThat(parallelEoSStreamProcessor.getFailureCause()).isNull();
        KafkaConsumer createNewConsumer2 = getKcu().createNewConsumer(customClientId("THREE-my-client"));
        ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(build.toBuilder().consumer(createNewConsumer2).producer(getKcu().createNewProducer(true)).build());
        try {
            parallelEoSStreamProcessor2.subscribe(UniLists.of(this.rebalanceTopic));
            ConcurrentLinkedQueue concurrentLinkedQueue2 = new ConcurrentLinkedQueue();
            parallelEoSStreamProcessor2.poll(pollContext2 -> {
                log.info("Read by consumer THREE: {}", pollContext2.value());
                concurrentLinkedQueue2.add(pollContext2.getSingleConsumerRecord());
            });
            Awaitility.await().alias("only 2 and 4 should be delivered again, as everything else was processed successfully").atMost(this.timeoutToUse).untilAsserted(() -> {
                Assertions.assertThat(concurrentLinkedQueue2).extracting((v0) -> {
                    return v0.value();
                }).containsExactlyInAnyOrder(new String[]{"2", "4"});
            });
            parallelEoSStreamProcessor2.close();
            OffsetMapCodecManager.forcedCodec = Optional.empty();
            OffsetSimultaneousEncoder.compressionForced = false;
        } finally {
        }
    }

    private Properties customClientId(String str) {
        Properties properties = new Properties();
        properties.put("client.id", str);
        return properties;
    }

    private void send(String str, int i, Integer num) throws InterruptedException, ExecutionException {
    }

    private void send(int i, String str, int i2) throws InterruptedException, ExecutionException {
        log.debug("Sending {} messages to {}", Integer.valueOf(i), str);
        ArrayList arrayList = new ArrayList();
        Iterator it = Range.range(i).iterator();
        while (it.hasNext()) {
            Long l = (Long) it.next();
            arrayList.add(getKcu().getProducer().send(new ProducerRecord(str, Integer.valueOf(i2), l.toString(), l.toString())));
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Future) it2.next()).get();
        }
        log.debug("Finished sending {} messages", Integer.valueOf(i));
    }

    @Test
    void correctOffsetVerySimple() {
        setupTopic();
        String str = "0";
        getKcu().getProducer().send(new ProducerRecord(this.topic, "0", "0"));
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer();
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).consumer(createNewConsumer).producer(getKcu().createNewProducer(true)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build();
        try {
            ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(build);
            try {
                parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
                ArrayList arrayList = new ArrayList();
                parallelEoSStreamProcessor.poll(pollContext -> {
                    log.debug("Reading {}", pollContext);
                    arrayList.add(pollContext.getSingleConsumerRecord());
                });
                Awaitility.await().untilAsserted(() -> {
                    Assertions.assertThat(arrayList).extracting((v0) -> {
                        return v0.value();
                    }).containsExactly(new String[]{str});
                });
                parallelEoSStreamProcessor.close();
                log.debug("asyncOne closed");
                log.debug("Starting up new client");
                KafkaConsumer createNewConsumer2 = getKcu().createNewConsumer(customClientId("THREE-my-client"));
                ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(build.toBuilder().consumer(createNewConsumer2).producer(getKcu().createNewProducer(true)).build());
                try {
                    parallelEoSStreamProcessor2.subscribe(UniLists.of(this.topic));
                    ArrayList arrayList2 = new ArrayList();
                    parallelEoSStreamProcessor2.poll(pollContext2 -> {
                        log.info("Three read: {}", pollContext2.value());
                        arrayList2.add(pollContext2.getSingleConsumerRecord());
                    });
                    Awaitility.await().alias("nothing should be read back (will be long enough to be sure it never is)").pollDelay(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(2L)).atLeast(Duration.ofSeconds(1L)).untilAsserted(() -> {
                        Assertions.assertThat(arrayList2).as("Nothing should be read into the collection", new Object[0]).extracting((v0) -> {
                            return v0.value();
                        }).isEmpty();
                    });
                    parallelEoSStreamProcessor2.close();
                } catch (Throwable th) {
                    try {
                        parallelEoSStreamProcessor2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            log.debug("asyncOne closed");
            throw th3;
        }
    }

    @Test
    void largeNumberOfMessagesSmallOffsetBitmap() {
        setupTopic();
        int i = KafkaClientUtils.MAX_POLL_RECORDS;
        Assertions.assertThat(KafkaClientUtils.MAX_POLL_RECORDS).as("Test expects to process all the produced messages in a single poll", new Object[0]).isLessThanOrEqualTo(KafkaClientUtils.MAX_POLL_RECORDS);
        send(KafkaClientUtils.MAX_POLL_RECORDS, this.topic, 0);
        ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build();
        Set of = UniSets.of("123", "2345", "8765");
        int size = of.size();
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer();
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(build.toBuilder().consumer(createNewConsumer).producer(getKcu().createNewProducer(true)).build());
        parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
        ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        parallelEoSStreamProcessor.poll(pollContext -> {
            String str = (String) pollContext.value();
            if (of.contains(str)) {
                throw new FakeRuntimeException("Fake error for message " + str);
            }
            concurrentSkipListSet.add(str);
        });
        Awaitility.await().atMost(AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).untilAsserted(() -> {
            Assertions.assertThat(concurrentSkipListSet.size()).isEqualTo(i - size);
        });
        parallelEoSStreamProcessor.closeDontDrainFirst();
        Assertions.assertThat(concurrentSkipListSet.size()).isEqualTo(KafkaClientUtils.MAX_POLL_RECORDS - size);
        KafkaConsumer createNewConsumer2 = getKcu().createNewConsumer(customClientId("THREE-my-client"));
        ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(build.toBuilder().consumer(createNewConsumer2).producer(getKcu().createNewProducer(true)).build());
        try {
            parallelEoSStreamProcessor2.subscribe(UniLists.of(this.topic));
            ConcurrentSkipListSet concurrentSkipListSet2 = new ConcurrentSkipListSet();
            parallelEoSStreamProcessor2.poll(pollContext2 -> {
                log.info("Three read: {}", pollContext2.value());
                concurrentSkipListSet2.add((String) pollContext2.value());
            });
            Awaitility.await().alias("Only the one remaining failing message should be submitted for processing").pollDelay(Duration.ofMillis(1000L)).atLeast(Duration.ofMillis(500L)).untilAsserted(() -> {
                Assertions.assertThat(concurrentSkipListSet2).as("Contains only previously failed messages", new Object[0]).hasSize(size);
            });
            Assertions.assertThat(concurrentSkipListSet2).hasSize(size);
            parallelEoSStreamProcessor2.close();
        } finally {
        }
    }
}
