/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.truth.CommitHistorySubject;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import one.util.streamex.StreamEx;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MultiTopicTest
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiTopicTest.class);

    MultiTopicTest() {
    }

    @ParameterizedTest
    @EnumSource(value=ParallelConsumerOptions.ProcessingOrder.class)
    void multiTopic(ParallelConsumerOptions.ProcessingOrder order) {
        int numTopics = 3;
        List<NewTopic> multiTopics = this.getKcu().createTopics(numTopics);
        int recordsPerTopic = 1;
        multiTopics.forEach(singleTopic -> this.sendMessages((NewTopic)singleTopic, recordsPerTopic));
        ParallelEoSStreamProcessor<String, String> pc = this.getKcu().buildPc(order);
        pc.subscribe((Collection)StreamEx.of(multiTopics).map(NewTopic::name).toList());
        AtomicInteger messageProcessedCount = new AtomicInteger();
        pc.poll(pollContext -> {
            log.debug(pollContext.toString());
            messageProcessedCount.incrementAndGet();
        });
        int expectedMessagesCount = recordsPerTopic * numTopics;
        Awaitility.await().untilAtomic(messageProcessedCount, Matchers.is((Matcher)Matchers.equalTo((Object)expectedMessagesCount)));
        pc.requestCommitAsap();
        pc.close();
        KafkaConsumer assertingConsumer = this.getKcu().createNewConsumer(false);
        Awaitility.await().atMost(Duration.ofSeconds(10L)).untilAsserted(() -> this.assertSeparateConsumerCommit((Consumer<?, ?>)assertingConsumer, new HashSet<NewTopic>(multiTopics), recordsPerTopic));
    }

    private void assertSeparateConsumerCommit(Consumer<?, ?> assertingConsumer, HashSet<NewTopic> topics, int expectedOffset) {
        Set<TopicPartition> partitions = topics.stream().map(newTopic -> new TopicPartition(newTopic.name(), 0)).collect(Collectors.toSet());
        Map committed = assertingConsumer.committed(partitions);
        Map<TopicPartition, CommitHistorySubject> partitionSubjects = ManagedTruth.assertThat(assertingConsumer).hasCommittedToPartition(partitions);
        partitionSubjects.forEach((topicPartition, commitHistorySubject) -> commitHistorySubject.atLeastOffset(expectedOffset));
    }

    private void sendMessages(NewTopic newTopic, int recordsPerTopic) {
        this.getKcu().produceMessages(newTopic.name(), recordsPerTopic);
    }
}

