/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.Range;
import io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import io.confluent.parallelconsumer.offsets.OffsetSimultaneousEncoder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

@Timeout(value=60L)
class CloseAndOpenOffsetTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(CloseAndOpenOffsetTest.class);
    Duration normalTimeout = Duration.ofSeconds(5L);
    Duration debugTimeout = Duration.ofMinutes(1L);
    Duration timeoutToUse = this.normalTimeout;
    String rebalanceTopic;

    CloseAndOpenOffsetTest() {
    }

    @BeforeEach
    void setup() {
        this.rebalanceTopic = "close-and-open-" + RandomUtils.nextInt();
    }

    @Timeout(value=60L)
    @ParameterizedTest
    @EnumSource
    @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ)
    void offsetsOpenClose(OffsetEncoding encoding) {
        List skip = UniLists.of((Object)OffsetEncoding.ByteArray, (Object)OffsetEncoding.ByteArrayCompressed, (Object)OffsetEncoding.KafkaStreams, (Object)OffsetEncoding.KafkaStreamsV2);
        Assumptions.assumeFalse((boolean)skip.contains(encoding));
        OffsetMapCodecManager.forcedCodec = Optional.of(encoding);
        OffsetSimultaneousEncoder.compressionForced = true;
        try {
            this.ensureTopic(this.rebalanceTopic, 1);
        }
        catch (Exception e) {
            log.warn(e.getMessage(), (Throwable)e);
        }
        KafkaConsumer newConsumerOne = this.getKcu().createNewConsumer();
        KafkaProducer producerOne = this.getKcu().createNewProducer(true);
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).consumer(newConsumerOne).producer(producerOne).build();
        ParallelEoSStreamProcessor asyncOne = new ParallelEoSStreamProcessor(options);
        asyncOne.subscribe((Collection)UniLists.of((Object)this.rebalanceTopic));
        ConcurrentLinkedQueue successfullInOne = new ConcurrentLinkedQueue();
        asyncOne.poll(x -> {
            log.info("Read by consumer ONE: {}", x);
            if (((String)x.value()).equals("4")) {
                log.info("Throwing fake error for message 4");
                throw new FakeRuntimeException("Fake error - Message 4");
            }
            if (((String)x.value()).equals("2")) {
                log.info("Throwing fake error for message 2");
                throw new FakeRuntimeException("Fake error - Message 2");
            }
            successfullInOne.add(x.getSingleConsumerRecord());
        });
        Thread.sleep(500L);
        this.send(this.rebalanceTopic, 0, (Integer)0);
        this.send(this.rebalanceTopic, 0, (Integer)1);
        this.send(this.rebalanceTopic, 0, (Integer)2);
        this.send(this.rebalanceTopic, 0, (Integer)3);
        this.send(this.rebalanceTopic, 0, (Integer)4);
        this.send(this.rebalanceTopic, 0, (Integer)5);
        Awaitility.await().alias("check all except 2 and 4 are processed").atMost(this.normalTimeout).untilAsserted(() -> {
            ArrayList copy = new ArrayList(successfullInOne);
            Assertions.assertThat(copy.stream().map(ConsumerRecord::value).collect(Collectors.toList())).containsOnly((Object[])new String[]{"0", "1", "3", "5"});
        });
        Thread.sleep(500L);
        log.info("Closing consumer, committing offset map");
        asyncOne.closeDontDrainFirst();
        Awaitility.await().alias("check all except 2 and 4 are processed").atMost(this.normalTimeout).untilAsserted(() -> Assertions.assertThat(successfullInOne.stream().map(x -> (String)x.value()).collect(Collectors.toList())).containsOnly((Object[])new String[]{"0", "1", "3", "5"}));
        Assertions.assertThat((Throwable)asyncOne.getFailureCause()).isNull();
        KafkaConsumer newConsumerThree = this.getKcu().createNewConsumer(this.customClientId("THREE-my-client"));
        KafkaProducer producerThree = this.getKcu().createNewProducer(true);
        ParallelConsumerOptions optionsThree = options.toBuilder().consumer(newConsumerThree).producer(producerThree).build();
        try (ParallelEoSStreamProcessor asyncThree = new ParallelEoSStreamProcessor(optionsThree);){
            asyncThree.subscribe((Collection)UniLists.of((Object)this.rebalanceTopic));
            ConcurrentLinkedQueue processedByThree = new ConcurrentLinkedQueue();
            asyncThree.poll(x -> {
                log.info("Read by consumer THREE: {}", x.value());
                processedByThree.add(x.getSingleConsumerRecord());
            });
            Awaitility.await().alias("only 2 and 4 should be delivered again, as everything else was processed successfully").atMost(this.timeoutToUse).untilAsserted(() -> Assertions.assertThat((Collection)processedByThree).extracting(ConsumerRecord::value).containsExactlyInAnyOrder((Object[])new String[]{"2", "4"}));
        }
        OffsetMapCodecManager.forcedCodec = Optional.empty();
        OffsetSimultaneousEncoder.compressionForced = false;
    }

    private Properties customClientId(String id) {
        Properties properties = new Properties();
        properties.put("client.id", id);
        return properties;
    }

    private void send(String topic, int partition, Integer value) throws InterruptedException, ExecutionException {
        RecordMetadata recordMetadata = (RecordMetadata)this.getKcu().getProducer().send(new ProducerRecord(topic, Integer.valueOf(partition), (Object)value.toString(), (Object)value.toString())).get();
    }

    private void send(int quantity, String topic, int partition) throws InterruptedException, ExecutionException {
        log.debug("Sending {} messages to {}", (Object)quantity, (Object)topic);
        ArrayList<Future> futures = new ArrayList<Future>();
        for (Long index : Range.range((long)quantity)) {
            Future send = this.getKcu().getProducer().send(new ProducerRecord(topic, Integer.valueOf(partition), (Object)index.toString(), (Object)index.toString()));
            futures.add(send);
        }
        for (Future future : futures) {
            future.get();
        }
        log.debug("Finished sending {} messages", (Object)quantity);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void correctOffsetVerySimple() {
        this.setupTopic();
        String expectedPayload = "0";
        this.getKcu().getProducer().send(new ProducerRecord(this.topic, (Object)expectedPayload, (Object)expectedPayload));
        KafkaConsumer consumer = this.getKcu().createNewConsumer();
        KafkaProducer producerOne = this.getKcu().createNewProducer(true);
        ParallelConsumerOptions options = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).consumer(consumer).producer(producerOne).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build();
        try (ParallelEoSStreamProcessor asyncOne = new ParallelEoSStreamProcessor(options);){
            asyncOne.subscribe((Collection)UniLists.of((Object)this.topic));
            ArrayList readByOne = new ArrayList();
            asyncOne.poll(msg -> {
                log.debug("Reading {}", msg);
                readByOne.add(msg.getSingleConsumerRecord());
            });
            Awaitility.await().untilAsserted(() -> Assertions.assertThat((List)readByOne).extracting(ConsumerRecord::value).containsExactly((Object[])new String[]{expectedPayload}));
        }
        finally {
            log.debug("asyncOne closed");
        }
        log.debug("Starting up new client");
        KafkaConsumer newConsumerThree = this.getKcu().createNewConsumer(this.customClientId("THREE-my-client"));
        KafkaProducer producerThree = this.getKcu().createNewProducer(true);
        ParallelConsumerOptions optionsThree = options.toBuilder().consumer(newConsumerThree).producer(producerThree).build();
        try (ParallelEoSStreamProcessor asyncThree = new ParallelEoSStreamProcessor(optionsThree);){
            asyncThree.subscribe((Collection)UniLists.of((Object)this.topic));
            ArrayList readByThree = new ArrayList();
            asyncThree.poll(x -> {
                log.info("Three read: {}", x.value());
                readByThree.add(x.getSingleConsumerRecord());
            });
            Awaitility.await().alias("nothing should be read back (will be long enough to be sure it never is)").pollDelay(Duration.ofSeconds(1L)).atMost(Duration.ofSeconds(2L)).atLeast(Duration.ofSeconds(1L)).untilAsserted(() -> ((ListAssert)Assertions.assertThat((List)readByThree).as("Nothing should be read into the collection", new Object[0])).extracting(ConsumerRecord::value).isEmpty());
        }
    }

    @Test
    void largeNumberOfMessagesSmallOffsetBitmap() {
        this.setupTopic();
        int quantity = 10000;
        ((AbstractIntegerAssert)Assertions.assertThat((int)quantity).as("Test expects to process all the produced messages in a single poll", new Object[0])).isLessThanOrEqualTo(10000);
        this.send(quantity, this.topic, 0);
        ParallelConsumerOptions baseOptions = ParallelConsumerOptions.builder().ordering(ParallelConsumerOptions.ProcessingOrder.UNORDERED).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build();
        Set failingMessages = UniSets.of((Object)"123", (Object)"2345", (Object)"8765");
        int numberOfFailingMessages = failingMessages.size();
        KafkaConsumer consumer = this.getKcu().createNewConsumer();
        KafkaProducer producerOne = this.getKcu().createNewProducer(true);
        ParallelConsumerOptions options = baseOptions.toBuilder().consumer(consumer).producer(producerOne).build();
        ParallelEoSStreamProcessor asyncOne = new ParallelEoSStreamProcessor(options);
        asyncOne.subscribe((Collection)UniLists.of((Object)this.topic));
        ConcurrentSkipListSet readByOne = new ConcurrentSkipListSet();
        asyncOne.poll(x -> {
            String value = (String)x.value();
            if (failingMessages.contains(value)) {
                throw new FakeRuntimeException("Fake error for message " + value);
            }
            readByOne.add(value);
        });
        Awaitility.await().atMost(AbstractParallelEoSStreamProcessorTestBase.defaultTimeout).untilAsserted(() -> Assertions.assertThat((int)readByOne.size()).isEqualTo(quantity - numberOfFailingMessages));
        asyncOne.closeDontDrainFirst();
        Assertions.assertThat((int)readByOne.size()).isEqualTo(quantity - numberOfFailingMessages);
        KafkaConsumer newConsumerThree = this.getKcu().createNewConsumer(this.customClientId("THREE-my-client"));
        KafkaProducer producerThree = this.getKcu().createNewProducer(true);
        ParallelConsumerOptions optionsThree = baseOptions.toBuilder().consumer(newConsumerThree).producer(producerThree).build();
        try (ParallelEoSStreamProcessor asyncThree = new ParallelEoSStreamProcessor(optionsThree);){
            asyncThree.subscribe((Collection)UniLists.of((Object)this.topic));
            ConcurrentSkipListSet readByThree = new ConcurrentSkipListSet();
            asyncThree.poll(x -> {
                log.info("Three read: {}", x.value());
                readByThree.add((String)x.value());
            });
            Awaitility.await().alias("Only the one remaining failing message should be submitted for processing").pollDelay(Duration.ofMillis(1000L)).atLeast(Duration.ofMillis(500L)).untilAsserted(() -> ((AbstractCollectionAssert)Assertions.assertThat((Collection)readByThree).as("Contains only previously failed messages", new Object[0])).hasSize(numberOfFailingMessages));
            Assertions.assertThat(readByThree).hasSize(numberOfFailingMessages);
        }
    }
}

