package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessorTestBase;
import java.time.Duration;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/OffsetCommittingSanityTest.class */
class OffsetCommittingSanityTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(OffsetCommittingSanityTest.class);

    OffsetCommittingSanityTest() {
    }

    @Test
    void shouldNotSkipAnyMessagesOnRestartRoot() throws Exception {
        setupTopic("foo");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        KafkaProducer<Object, Object> createNewProducer = this.kcu.createNewProducer(false);
        sendCheckClose(arrayList, arrayList2, createNewProducer, "key-0", "value-0", true);
        assertCommittedOffset(1L);
        sendCheckClose(arrayList, arrayList2, createNewProducer, "key-1", "value-1", true);
        assertCommittedOffset(2L);
        Assertions.assertThat(arrayList).containsExactly(new Long[]{0L, 1L});
        Assertions.assertThat(arrayList2).containsExactly(new Long[]{0L, 1L});
    }

    @Test
    void shouldNotSkipAnyMessagesOnRestartAsDescribed() throws Exception {
        setupTopic("foo");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        KafkaProducer<Object, Object> createNewProducer = this.kcu.createNewProducer(false);
        sendCheckClose(arrayList, arrayList2, createNewProducer, "key-0", "value-0", true);
        assertCommittedOffset(1L);
        sendCheckClose(arrayList, arrayList2, createNewProducer, "key-1", "value-1", false);
        sendCheckClose(arrayList, arrayList2, createNewProducer, "key-2", "value-2", true);
    }

    private void sendCheckClose(List<Long> list, List<Long> list2, KafkaProducer<Object, Object> kafkaProducer, String str, String str2, boolean z) throws Exception {
        list.add(Long.valueOf(((RecordMetadata) kafkaProducer.send(new ProducerRecord(this.topic, str, str2)).get()).offset()));
        ParallelEoSStreamProcessor<String, String> createParallelConsumer = createParallelConsumer(this.topic, this.kcu.createNewConsumer(false));
        createParallelConsumer.poll(consumerRecord -> {
            list2.add(Long.valueOf(consumerRecord.offset()));
        });
        if (z) {
            try {
                Awaitility.waitAtMost(Duration.ofSeconds(ParallelEoSStreamProcessorTestBase.defaultTimeoutSeconds)).alias("all produced messages consumed").untilAsserted(() -> {
                    Assertions.assertThat(list2).isEqualTo(list);
                });
            } catch (ConcurrentModificationException e) {
                throw new AssertionError("Collection modified while testing", e);
            }
        } else {
            Thread.sleep(2000L);
        }
        createParallelConsumer.closeDrainFirst();
    }

    private void assertCommittedOffset(long j) {
        KafkaConsumer createNewConsumer = this.kcu.createNewConsumer(false);
        createNewConsumer.subscribe(UniSets.of(this.topic));
        createNewConsumer.poll(Duration.ofSeconds(1L));
        Assertions.assertThat(((OffsetAndMetadata) createNewConsumer.committed(createNewConsumer.assignment()).get(new TopicPartition(this.topic, 0))).offset()).isEqualTo(j);
        createNewConsumer.close();
    }

    private ParallelEoSStreamProcessor<String, String> createParallelConsumer(String str, Consumer consumer) {
        ParallelEoSStreamProcessor<String, String> parallelEoSStreamProcessor = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder().consumer(consumer).build());
        parallelEoSStreamProcessor.subscribe(UniLists.of(str));
        return parallelEoSStreamProcessor;
    }
}
