/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

public class OffsetCommittingSanityTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(OffsetCommittingSanityTest.class);

    @Test
    void shouldNotSkipAnyMessagesOnRestartRoot() throws Exception {
        String topicNameForTest = this.setupTopic("foo");
        ArrayList<Long> producedOffsets = new ArrayList<Long>();
        ArrayList<Long> consumedOffsets = new ArrayList<Long>();
        KafkaProducer kafkaProducer = this.getKcu().createNewProducer(false);
        this.sendCheckClose(topicNameForTest, producedOffsets, consumedOffsets, kafkaProducer, "key-0", "value-0", true);
        this.assertCommittedOffset(topicNameForTest, 1L);
        this.sendCheckClose(topicNameForTest, producedOffsets, consumedOffsets, kafkaProducer, "key-1", "value-1", true);
        this.assertCommittedOffset(topicNameForTest, 2L);
        Assertions.assertThat(producedOffsets).containsExactly((Object[])new Long[]{0L, 1L});
        Assertions.assertThat(consumedOffsets).containsExactly((Object[])new Long[]{0L, 1L});
    }

    @Test
    void shouldNotSkipAnyMessagesOnRestartAsDescribed() throws Exception {
        String topicNameForTest = this.setupTopic("foo");
        ArrayList<Long> producedOffsets = new ArrayList<Long>();
        ArrayList<Long> consumedOffsets = new ArrayList<Long>();
        KafkaProducer kafkaProducer = this.getKcu().createNewProducer(KafkaClientUtils.ProducerMode.NOT_TRANSACTIONAL);
        this.sendCheckClose(topicNameForTest, producedOffsets, consumedOffsets, kafkaProducer, "key-0", "value-0", CheckMode.CHECK_CONSUMED);
        this.assertCommittedOffset(topicNameForTest, 1L);
        this.sendCheckClose(topicNameForTest, producedOffsets, consumedOffsets, kafkaProducer, "key-1", "value-1", CheckMode.JUST_SLEEP);
        this.sendCheckClose(topicNameForTest, producedOffsets, consumedOffsets, kafkaProducer, "key-2", "value-2", CheckMode.CHECK_CONSUMED);
    }

    private void sendCheckClose(String topic, List<Long> producedOffsets, List<Long> consumedOffsets, KafkaProducer<String, String> kafkaProducer, String key, String val, boolean check) throws Exception {
        this.sendCheckClose(topic, producedOffsets, consumedOffsets, kafkaProducer, key, val, check ? CheckMode.CHECK_CONSUMED : CheckMode.JUST_SLEEP);
    }

    private void sendCheckClose(String topic, List<Long> producedOffsets, List<Long> consumedOffsets, KafkaProducer<String, String> kafkaProducer, String key, String val, CheckMode check) throws Exception {
        ProducerRecord record = new ProducerRecord(topic, (Object)key, (Object)val);
        Future send = kafkaProducer.send(record);
        long offset = ((RecordMetadata)send.get()).offset();
        producedOffsets.add(offset);
        KafkaConsumer newConsumer = this.getKcu().createNewConsumer(false);
        ParallelEoSStreamProcessor<String, String> pc = this.createParallelConsumer(topic, (Consumer)newConsumer);
        pc.poll(consumerRecord -> consumedOffsets.add(consumerRecord.offset()));
        if (check.equals((Object)CheckMode.CHECK_CONSUMED)) {
            Assertions.assertThatCode(() -> Awaitility.waitAtMost((Duration)Duration.ofSeconds(ParallelEoSStreamProcessorTestBase.defaultTimeoutSeconds)).alias("all produced messages consumed").untilAsserted(() -> Assertions.assertThat((List)consumedOffsets).isEqualTo((Object)producedOffsets))).doesNotThrowAnyException();
        } else {
            Thread.sleep(2000L);
        }
        pc.closeDrainFirst();
    }

    private void assertCommittedOffset(String topicNameForTest, long expectedOffset) {
        KafkaConsumer newConsumer = this.getKcu().createNewConsumer(false);
        newConsumer.subscribe((Collection)UniSets.of((Object)topicNameForTest));
        newConsumer.poll(Duration.ofSeconds(5L));
        Set assignment = newConsumer.assignment();
        Truth.assertWithMessage((String)"Should be assigned some partitions").that((Iterable)assignment).isNotEmpty();
        Map committed = newConsumer.committed(assignment);
        Truth.assertThat((Map)committed).isNotEmpty();
        TopicPartition tp = new TopicPartition(topicNameForTest, 0);
        OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)committed.get(tp);
        ((ObjectAssert)Assertions.assertThat((Object)offsetAndMetadata).as("Should have commit history for this partition {}", new Object[]{tp})).isNotNull();
        long offset = offsetAndMetadata.offset();
        Assertions.assertThat((long)offset).isEqualTo(expectedOffset);
        newConsumer.close();
    }

    private ParallelEoSStreamProcessor<String, String> createParallelConsumer(String topicName, Consumer consumer) {
        ParallelEoSStreamProcessor pc = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().consumer(consumer).build());
        pc.subscribe((Collection)UniLists.of((Object)topicName));
        return pc;
    }

    public static enum CheckMode {
        CHECK_CONSUMED,
        JUST_SLEEP;

    }
}

