package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.micrometer.core.instrument.search.Search;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/MultiInstanceMetricsTest.class */
class MultiInstanceMetricsTest extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceMetricsTest.class);
    SimpleMeterRegistry simpleMeterRegistry;
    private String outputTopic;

    MultiInstanceMetricsTest() {
        this.numPartitions = 2;
    }

    @BeforeEach
    void setup() {
        setupTopic();
        this.simpleMeterRegistry = new SimpleMeterRegistry();
    }

    @AfterEach
    void cleanup() {
        this.simpleMeterRegistry.close();
    }

    @Test
    void twoInstancePCMetricsRecordedIndependently() {
        long j = 100;
        getKcu().produceMessages(this.topic, 100L);
        String uuid = UUID.randomUUID().toString();
        ParallelConsumerOptions<String, String> options = getOptions(uuid, KafkaClientUtils.GroupOption.NEW_GROUP);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(options, new PCModule(options));
        parallelEoSStreamProcessor.subscribe(UniSets.of(this.topic));
        String uuid2 = UUID.randomUUID().toString();
        ParallelConsumerOptions<String, String> options2 = getOptions(uuid2, KafkaClientUtils.GroupOption.REUSE_GROUP);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(options2, new PCModule(options2));
        parallelEoSStreamProcessor2.subscribe(UniSets.of(this.topic));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        parallelEoSStreamProcessor.poll(pollContext -> {
            atomicInteger.incrementAndGet();
        });
        parallelEoSStreamProcessor2.poll(pollContext2 -> {
            atomicInteger2.incrementAndGet();
        });
        Awaitility.await().timeout(Duration.ofSeconds(30L)).until(() -> {
            return Boolean.valueOf(((long) (atomicInteger.get() + atomicInteger2.get())) == j);
        });
        Assertions.assertThat(Search.in(this.simpleMeterRegistry).tag("pcinstance", uuid).name(PCMetricsDef.PROCESSED_RECORDS.getName()).counter().count()).isEqualTo(atomicInteger.get());
        Assertions.assertThat(Search.in(this.simpleMeterRegistry).tag("pcinstance", uuid2).name(PCMetricsDef.PROCESSED_RECORDS.getName()).counter().count()).isEqualTo(atomicInteger2.get());
        parallelEoSStreamProcessor.close();
        parallelEoSStreamProcessor2.close();
    }

    @Test
    void sameRegistryCanBeReusedAfterPcInstanceClosed() {
        int i = 20;
        getKcu().produceMessages(this.topic, 20);
        ParallelConsumerOptions<String, String> options = getOptions(null, KafkaClientUtils.GroupOption.NEW_GROUP);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(options, new PCModule(options));
        parallelEoSStreamProcessor.subscribe(UniSets.of(this.topic));
        AtomicInteger atomicInteger = new AtomicInteger();
        parallelEoSStreamProcessor.poll(pollContext -> {
            atomicInteger.incrementAndGet();
        });
        Awaitility.await().timeout(Duration.ofSeconds(30L)).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == i);
        });
        Assertions.assertThat(Search.in(this.simpleMeterRegistry).name(PCMetricsDef.PROCESSED_RECORDS.getName()).counters().stream().mapToDouble((v0) -> {
            return v0.count();
        }).sum()).isEqualTo(atomicInteger.get());
        parallelEoSStreamProcessor.close();
        getKcu().produceMessages(this.topic, 20);
        ParallelConsumerOptions<String, String> options2 = getOptions(null, KafkaClientUtils.GroupOption.NEW_GROUP);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor2 = new ParallelEoSStreamProcessor(options2, new PCModule(options2));
        parallelEoSStreamProcessor2.subscribe(UniSets.of(this.topic));
        AtomicInteger atomicInteger2 = new AtomicInteger();
        parallelEoSStreamProcessor2.poll(pollContext2 -> {
            atomicInteger2.incrementAndGet();
        });
        Awaitility.await().timeout(Duration.ofSeconds(30L)).until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == i * 2);
        });
        Assertions.assertThat(Search.in(this.simpleMeterRegistry).name(PCMetricsDef.PROCESSED_RECORDS.getName()).counters().stream().mapToDouble((v0) -> {
            return v0.count();
        }).sum()).isEqualTo(atomicInteger2.get());
        parallelEoSStreamProcessor2.close();
    }

    @Test
    void allMetersRemovedFromRegistryOnClose() {
        long j = 10;
        getKcu().produceMessages(this.topic, 10L);
        String uuid = UUID.randomUUID().toString();
        ParallelConsumerOptions<String, String> options = getOptions(uuid, KafkaClientUtils.GroupOption.NEW_GROUP);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(options, new PCModule(options));
        parallelEoSStreamProcessor.subscribe(UniSets.of(this.topic));
        AtomicInteger atomicInteger = new AtomicInteger();
        parallelEoSStreamProcessor.poll(pollContext -> {
            atomicInteger.incrementAndGet();
        });
        Awaitility.await().timeout(Duration.ofSeconds(30L)).until(() -> {
            return Boolean.valueOf(((long) atomicInteger.get()) == j);
        });
        Assertions.assertThat(Search.in(this.simpleMeterRegistry).tag("pcinstance", uuid).name(PCMetricsDef.PROCESSED_RECORDS.getName()).counters().stream().mapToDouble((v0) -> {
            return v0.count();
        }).sum()).isEqualTo(atomicInteger.get());
        parallelEoSStreamProcessor.close();
        Assertions.assertThat(this.simpleMeterRegistry.getMeters().size()).isEqualTo(0);
    }

    ParallelConsumerOptions<String, String> getOptions(String str, KafkaClientUtils.GroupOption groupOption) {
        return ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS).consumer(getKcu().createNewConsumer(groupOption)).meterRegistry(this.simpleMeterRegistry).pcInstanceTag(str).ordering(ParallelConsumerOptions.ProcessingOrder.PARTITION).build();
    }
}
