package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import one.util.streamex.StreamEx;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiTopicTest.class */
class MultiTopicTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiTopicTest.class);

    MultiTopicTest() {
    }

    @EnumSource(ParallelConsumerOptions.ProcessingOrder.class)
    @ParameterizedTest
    void multiTopic(ParallelConsumerOptions.ProcessingOrder processingOrder) {
        List<NewTopic> createTopics = getKcu().createTopics(3);
        int i = 1;
        createTopics.forEach(newTopic -> {
            sendMessages(newTopic, i);
        });
        ParallelEoSStreamProcessor<String, String> buildPc = getKcu().buildPc(processingOrder);
        buildPc.subscribe(StreamEx.of(createTopics).map((v0) -> {
            return v0.name();
        }).toList());
        AtomicInteger atomicInteger = new AtomicInteger();
        buildPc.poll(pollContext -> {
            log.debug(pollContext.toString());
            atomicInteger.incrementAndGet();
        });
        Awaitility.await().untilAtomic(atomicInteger, Matchers.is(Matchers.equalTo(Integer.valueOf(1 * 3))));
        buildPc.requestCommitAsap();
        buildPc.close();
        KafkaConsumer createNewConsumer = this.kcu.createNewConsumer(false);
        Awaitility.await().atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
            assertSeparateConsumerCommit(createNewConsumer, new HashSet<>(createTopics), i);
        });
    }

    private void assertSeparateConsumerCommit(Consumer<?, ?> consumer, HashSet<NewTopic> hashSet, int i) {
        Set<TopicPartition> set = (Set) hashSet.stream().map(newTopic -> {
            return new TopicPartition(newTopic.name(), 0);
        }).collect(Collectors.toSet());
        consumer.committed(set);
        ManagedTruth.assertThat(consumer).hasCommittedToPartition(set).forEach((topicPartition, commitHistorySubject) -> {
            commitHistorySubject.atLeastOffset(i);
        });
    }

    private void sendMessages(NewTopic newTopic, int i) {
        getKcu().produceMessages(newTopic.name(), i);
    }
}
