/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests.utils;

import io.confluent.parallelconsumer.ManagedTruth;
import java.time.Duration;
import java.util.Collection;
import java.util.Set;
import lombok.NonNull;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniSets;

public class BrokerCommitAsserter {
    private static final Logger log = LoggerFactory.getLogger(BrokerCommitAsserter.class);
    @NonNull
    private final String defaultTopic;
    @NonNull
    private final KafkaConsumer<?, ?> assertConsumer;

    public void assertConsumedAtLeastOffset(int target) {
        this.assertConsumedAtLeastOffset(this.getDefaultTopic(), target);
    }

    public void assertConsumedAtLeastOffset(String topic, int target) {
        this.setup(topic, target);
        Awaitility.await().untilAsserted(() -> {
            ConsumerRecords poll = this.assertConsumer.poll(Duration.ofSeconds(1L));
            log.debug("Polled {} records, looking for at least offset {}", (Object)poll.count(), (Object)target);
            ManagedTruth.assertThat(poll).hasHeadOffsetAtLeastInAnyTopicPartition(target);
        });
        this.post();
    }

    private void post() {
        this.assertConsumer.unsubscribe();
    }

    private void setup(String topic, int target) {
        log.debug("Asserting against topic: {}, expecting to consume at LEAST offset {}", (Object)topic, (Object)target);
        Set topicSet = UniSets.of((Object)topic);
        this.assertConsumer.subscribe((Collection)topicSet);
        this.assertConsumer.seekToBeginning((Collection)UniSets.of());
    }

    public void assertConsumedAtMostOffset(String topic, int atMost) {
        this.setup(topic, atMost);
        Duration delay = Duration.ofSeconds(5L);
        log.debug("Delaying by {} to check consumption from topic {} by at most {}", new Object[]{delay, topic, atMost});
        Awaitility.await().pollDelay(delay).timeout(delay.plusSeconds(1L)).untilAsserted(() -> {
            ConsumerRecords poll = this.assertConsumer.poll(Duration.ofSeconds(1L));
            log.debug("Polled {} records, looking for at MOST offset {}", (Object)poll.count(), (Object)atMost);
            ManagedTruth.assertThat(poll).hasHeadOffsetAtMostInAnyTopicPartition(atMost);
        });
        this.post();
    }

    public BrokerCommitAsserter(@NonNull String defaultTopic, @NonNull KafkaConsumer<?, ?> assertConsumer) {
        if (defaultTopic == null) {
            throw new NullPointerException("defaultTopic is marked non-null but is null");
        }
        if (assertConsumer == null) {
            throw new NullPointerException("assertConsumer is marked non-null but is null");
        }
        this.defaultTopic = defaultTopic;
        this.assertConsumer = assertConsumer;
    }

    @NonNull
    private String getDefaultTopic() {
        return this.defaultTopic;
    }
}

