/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.assertj.core.api.AbstractDurationAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

public class KafkaSanityTests
extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSanityTests.class);

    @Timeout(value=20L)
    @Test
    public void pausedConsumerStillLongPollsForNothing() {
        log.info("Setup topic");
        this.setupTopic();
        KafkaConsumer<String, String> consumer = this.getKcu().getConsumer();
        log.info("Subscribe to topic");
        consumer.subscribe((Collection)UniLists.of((Object)this.topic));
        Set assignment = consumer.assignment();
        log.info("Pause subscription");
        consumer.pause((Collection)assignment);
        log.info("Initial poll that can trigger some actions and take longer than expected");
        consumer.poll(Duration.ofSeconds(0L));
        log.info("Second poll which is measured");
        Duration longPollTime = Duration.ofSeconds(1L);
        Duration time = GeneralTestUtils.time(() -> consumer.poll(longPollTime));
        log.info("Poll blocked my thread for {}, hopefully slightly longer than {}", (Object)time, (Object)longPollTime);
        String desc = "Even though the consumer is paused ALL it's subscribed partitions, it will still perform a long poll against the server";
        Duration timePlusFluctuation = time.plus(Duration.ofMillis(100L));
        ((AbstractDurationAssert)Assertions.assertThat((Duration)timePlusFluctuation).as(desc, new Object[0])).isGreaterThan((Comparable)longPollTime);
    }

    @ResourceLock(value="Value doesn't matter, just needs a constant", mode=ResourceAccessMode.READ)
    @Test
    void offsetMetadataSpaceAvailable() {
        this.numPartitions = 5;
        this.setupTopic();
        int maxCapacity = OffsetMapCodecManager.DefaultMaxMetadataSize;
        ((AbstractIntegerAssert)Assertions.assertThat((int)maxCapacity).as("approximate sanity - ensure start state settings (shared static state :`( )", new Object[0])).isGreaterThan(3000);
        KafkaConsumer<String, String> consumer = this.getKcu().getConsumer();
        TopicPartition tpOne = new TopicPartition(this.topic, 0);
        TopicPartition tpTwo = new TopicPartition(this.topic, 1);
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        String payload = RandomStringUtils.randomAlphanumeric((int)maxCapacity);
        map.put(tpOne, new OffsetAndMetadata(0L, payload));
        consumer.commitSync(map);
        map.put(tpOne, new OffsetAndMetadata(0L, payload + "!"));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> consumer.commitSync((Map)map)).isInstanceOf(OffsetMetadataTooLarge.class)).hasMessageContainingAll(new CharSequence[]{"metadata", "offset request", "too large"});
        map.put(tpOne, new OffsetAndMetadata(0L, payload + payload));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> consumer.commitSync((Map)map)).isInstanceOf(OffsetMetadataTooLarge.class)).hasMessageContainingAll(new CharSequence[]{"metadata", "offset request", "too large"});
        map.put(tpOne, new OffsetAndMetadata(0L, payload));
        map.put(tpTwo, new OffsetAndMetadata(0L, payload));
        consumer.commitSync(map);
        map.put(tpOne, new OffsetAndMetadata(0L, payload));
        map.put(tpTwo, new OffsetAndMetadata(0L, payload));
        map.put(new TopicPartition(this.topic, 2), new OffsetAndMetadata(0L, payload));
        map.put(new TopicPartition(this.topic, 3), new OffsetAndMetadata(0L, payload));
        map.put(new TopicPartition(this.topic, 4), new OffsetAndMetadata(0L, payload));
        consumer.commitSync(map);
        map.put(tpOne, new OffsetAndMetadata(0L, payload));
        map.put(tpTwo, new OffsetAndMetadata(0L, payload + payload));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> consumer.commitSync((Map)map)).isInstanceOf(OffsetMetadataTooLarge.class)).hasMessageContainingAll(new CharSequence[]{"metadata", "offset request", "too large"});
    }
}

