package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.simple.SimpleConfig;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.data.Offset;
import org.awaitility.Awaitility;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/PCMetricsTest.class */
class PCMetricsTest extends ParallelEoSStreamProcessorTestBase {
    private static final Logger log = LoggerFactory.getLogger(PCMetricsTest.class);
    private SimpleMeterRegistry registry;
    private final List<Tag> commonTags = UniLists.of(Tag.of("instance", "pc1"));

    PCMetricsTest() {
    }

    @Test
    void metricsRegisterBinding() {
        AtomicInteger atomicInteger = new AtomicInteger(200);
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(0, 1000));
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(1, 500));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        AtomicInteger atomicInteger3 = new AtomicInteger();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.parallelConsumer.poll(pollContext -> {
            pollContext.forEach(recordContext -> {
                AtomicInteger atomicInteger4;
                CountDownLatch countDownLatch3;
                log.trace("Processing: {}", recordContext);
                try {
                    if (recordContext.partition() == 0) {
                        atomicInteger4 = atomicInteger2;
                        countDownLatch3 = countDownLatch;
                    } else {
                        atomicInteger4 = atomicInteger3;
                        countDownLatch3 = countDownLatch2;
                    }
                    if (recordContext.partition() == 0 && atomicInteger4.get() > 700 && !atomicBoolean.getAndSet(true)) {
                        throw new RuntimeException("Failed a record to verify failed meter");
                    }
                    if (atomicInteger4.get() >= atomicInteger.get()) {
                        countDownLatch3.await();
                    } else {
                        Thread.sleep(5L);
                    }
                    atomicInteger4.incrementAndGet();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        });
        log.info(this.registry.getMetersAsString());
        Awaitility.await().atMost(Duration.ofSeconds(300L)).pollInterval(Duration.ofSeconds(2L)).untilAsserted(() -> {
            Assertions.assertFalse(this.registry.getMeters().isEmpty());
            Assertions.assertEquals(State.RUNNING.getValue(), registeredGaugeValueFor(PCMetricsDef.PC_STATUS, new String[0]));
            Assertions.assertEquals(2.0d, registeredGaugeValueFor(PCMetricsDef.NUMBER_OF_SHARDS, new String[0]));
            Assertions.assertEquals(2.0d, registeredGaugeValueFor(PCMetricsDef.NUMBER_OF_PARTITIONS, new String[0]));
        });
        Awaitility.await().untilAsserted(() -> {
            log.info("counterP0: {}, counterP1: {}", Integer.valueOf(atomicInteger2.get()), Integer.valueOf(atomicInteger3.get()));
            log.info(this.registry.getMetersAsString());
            org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.NUM_PAUSED_PARTITIONS, new String[0])).isEqualTo(2.0d);
        });
        Thread.sleep(1000L);
        log.info(this.registry.getMetersAsString());
        int i = 1000 - atomicInteger2.get();
        int i2 = 500 - atomicInteger3.get();
        int i3 = atomicInteger2.get() - 1;
        int i4 = (atomicInteger3.get() + 1000) - 1;
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_HIGHEST_COMPLETED_OFFSET, 0)).isEqualTo(i3);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_HIGHEST_SEEN_OFFSET, 0)).isEqualTo(999);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_HIGHEST_SEQUENTIAL_SUCCEEDED_OFFSET, 0)).isEqualTo(i3);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_INCOMPLETE_OFFSETS, 0)).isEqualTo(i);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_LAST_COMMITTED_OFFSET, 0)).isEqualTo(i3 + 1);
        org.assertj.core.api.Assertions.assertThat(registeredCounterValueFor(PCMetricsDef.PROCESSED_RECORDS, "topic", this.topicPartition.topic(), "partition", String.valueOf(0))).isEqualTo(atomicInteger2.get());
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_HIGHEST_COMPLETED_OFFSET, 1)).isEqualTo(i4);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_HIGHEST_SEEN_OFFSET, 1)).isEqualTo(1499);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_HIGHEST_SEQUENTIAL_SUCCEEDED_OFFSET, 1)).isEqualTo(i4);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_INCOMPLETE_OFFSETS, 1)).isEqualTo(i2);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PARTITION_LAST_COMMITTED_OFFSET, 1)).isEqualTo(i4 + 1);
        org.assertj.core.api.Assertions.assertThat(registeredCounterValueFor(PCMetricsDef.PROCESSED_RECORDS, "topic", this.topicPartition.topic(), "partition", String.valueOf(1))).isEqualTo(atomicInteger3.get());
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.SHARDS_SIZE, new String[0])).isEqualTo(i + i2);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.INCOMPLETE_OFFSETS_TOTAL, new String[0])).isEqualTo(i + i2);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.INFLIGHT_RECORDS, new String[0])).isGreaterThan(0.0d);
        org.assertj.core.api.Assertions.assertThat(registeredDistributionSummaryFor(PCMetricsDef.METADATA_SPACE_USED, new String[0])).isGreaterThan(0.0d);
        org.assertj.core.api.Assertions.assertThat(registeredTimerFor(PCMetricsDef.OFFSETS_ENCODING_TIME, new String[0])).isGreaterThan(0.0d);
        org.assertj.core.api.Assertions.assertThat(registeredCounterValueFor(PCMetricsDef.OFFSETS_ENCODING_USAGE, new String[0])).isGreaterThan(0.0d);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.NUMBER_OF_PARTITIONS, new String[0])).isEqualTo(2.0d);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.NUM_PAUSED_PARTITIONS, new String[0])).isEqualTo(2.0d);
        org.assertj.core.api.Assertions.assertThat(registeredDistributionSummaryFor(PCMetricsDef.PAYLOAD_RATIO_USED, new String[0])).isGreaterThan(-1.0d);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.NUMBER_OF_SHARDS, new String[0])).isEqualTo(2.0d);
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.PC_STATUS, new String[0])).isEqualTo(State.RUNNING.getValue());
        org.assertj.core.api.Assertions.assertThat(registeredGaugeValueFor(PCMetricsDef.WAITING_RECORDS, new String[0])).isCloseTo(i + i2, Offset.offset(Double.valueOf(100.0d)));
        org.assertj.core.api.Assertions.assertThat(registeredTimerFor(PCMetricsDef.USER_FUNCTION_PROCESSING_TIME, new String[0])).isGreaterThan(0.0d);
        atomicInteger.set(5000);
        countDownLatch.countDown();
        countDownLatch2.countDown();
        Awaitility.await().untilAsserted(() -> {
            org.assertj.core.api.Assertions.assertThat(atomicInteger2.get()).isEqualTo(1000);
        });
        Awaitility.await().atMost(Duration.ofSeconds(120L)).until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == 1000 && registeredGaugeValueFor(PCMetricsDef.WAITING_RECORDS, new String[0]) == 0.0d);
        });
        Awaitility.await().atMost(Duration.ofSeconds(120L)).pollInterval(Duration.ofSeconds(5L)).untilAsserted(() -> {
            log.info(this.registry.getMetersAsString());
            org.assertj.core.api.Assertions.assertThat(registeredCounterValueFor(PCMetricsDef.FAILED_RECORDS, "topic", this.topicPartition.topic(), "partition", String.valueOf(0))).isEqualTo(1.0d);
        });
    }

    @Test
    void pcStatusMetricUpdatesOnChange() {
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(0, 1000));
        this.parallelConsumer.poll(pollContext -> {
            pollContext.forEach(recordContext -> {
                log.trace("Processing: {}", recordContext);
                try {
                    Thread.sleep(5L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
        });
        log.info(this.registry.getMetersAsString());
        Awaitility.await().atMost(Duration.ofSeconds(20L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Assertions.assertFalse(this.registry.getMeters().isEmpty());
            Assertions.assertEquals(State.RUNNING.getValue(), registeredGaugeValueFor(PCMetricsDef.PC_STATUS, new String[0]));
        });
        this.parallelConsumer.pauseIfRunning();
        this.ktu.send(this.consumerSpy, this.ktu.generateRecords(0, 100));
        Awaitility.await().atMost(Duration.ofSeconds(20L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Assertions.assertEquals(State.PAUSED.getValue(), registeredGaugeValueFor(PCMetricsDef.PC_STATUS, new String[0]));
        });
        this.parallelConsumer.resumeIfPaused();
        Awaitility.await().atMost(Duration.ofSeconds(20L)).pollInterval(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Assertions.assertEquals(State.RUNNING.getValue(), registeredGaugeValueFor(PCMetricsDef.PC_STATUS, new String[0]));
        });
    }

    private double registeredGaugeValueFor(PCMetricsDef pCMetricsDef, String... strArr) {
        return ((Double) Optional.ofNullable(this.registry.find(pCMetricsDef.getName()).tags(strArr).gauge()).map((v0) -> {
            return v0.value();
        }).orElse(Double.valueOf(-1.0d))).doubleValue();
    }

    private double registeredGaugeValueFor(PCMetricsDef pCMetricsDef, int i) {
        return registeredGaugeValueFor(pCMetricsDef, "topic", this.topicPartition.topic(), "partition", String.valueOf(i));
    }

    private double registeredCounterValueFor(PCMetricsDef pCMetricsDef, String... strArr) {
        return ((Double) Optional.ofNullable(this.registry.find(pCMetricsDef.getName()).tags(strArr).counter()).map((v0) -> {
            return v0.count();
        }).orElse(Double.valueOf(-1.0d))).doubleValue();
    }

    private double registeredTimerFor(PCMetricsDef pCMetricsDef, String... strArr) {
        return ((Double) Optional.ofNullable(this.registry.find(pCMetricsDef.getName()).tags(strArr).timer()).map(timer -> {
            return Double.valueOf(timer.mean(TimeUnit.MILLISECONDS));
        }).orElse(Double.valueOf(-1.0d))).doubleValue();
    }

    private double registeredDistributionSummaryFor(PCMetricsDef pCMetricsDef, String... strArr) {
        return ((Double) Optional.ofNullable(this.registry.find(pCMetricsDef.getName()).tags(strArr).summary()).map((v0) -> {
            return v0.mean();
        }).orElse(Double.valueOf(-1.0d))).doubleValue();
    }

    @Override // io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase
    protected ParallelConsumerOptions<Object, Object> getOptions() {
        this.registry = new SimpleMeterRegistry(new SimpleConfig() { // from class: io.confluent.parallelconsumer.PCMetricsTest.1
            public String get(String str) {
                return null;
            }

            @NotNull
            public Duration step() {
                return Duration.ofSeconds(10L);
            }
        }, Clock.SYSTEM);
        return getDefaultOptions().meterRegistry(this.registry).metricsTags(this.commonTags).build();
    }
}
