package io.confluent.parallelconsumer.integrationTests;

import io.confluent.csid.utils.GeneralTestUtils;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
import java.time.Duration;
import java.util.HashMap;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/KafkaSanityTests.class */
public class KafkaSanityTests extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSanityTests.class);

    @Timeout(20)
    @Test
    public void pausedConsumerStillLongPollsForNothing() {
        log.info("Setup topic");
        setupTopic();
        KafkaConsumer<String, String> consumer = this.kcu.getConsumer();
        log.info("Subscribe to topic");
        consumer.subscribe(UniLists.of(this.topic));
        Set assignment = consumer.assignment();
        log.info("Pause subscription");
        consumer.pause(assignment);
        log.info("Initial poll that can trigger some actions and take longer than expected");
        consumer.poll(Duration.ofSeconds(0L));
        log.info("Second poll which is measured");
        Duration ofSeconds = Duration.ofSeconds(1L);
        Duration time = GeneralTestUtils.time(() -> {
            consumer.poll(ofSeconds);
        });
        log.info("Poll blocked my thread for {}, hopefully slightly longer than {}", time, ofSeconds);
        Assertions.assertThat(time.plus(Duration.ofMillis(100L))).as("Even though the consumer is paused ALL it's subscribed partitions, it will still perform a long poll against the server", new Object[0]).isGreaterThan(ofSeconds);
    }

    @ResourceLock(value = "Value doesn't matter, just needs a constant", mode = ResourceAccessMode.READ)
    @Test
    void offsetMetadataSpaceAvailable() {
        this.numPartitions = 5;
        setupTopic();
        int i = OffsetMapCodecManager.DefaultMaxMetadataSize;
        Assertions.assertThat(i).as("approximate sanity - ensure start state settings (shared static state :`( )", new Object[0]).isGreaterThan(3000);
        KafkaConsumer<String, String> consumer = this.kcu.getConsumer();
        TopicPartition topicPartition = new TopicPartition(this.topic, 0);
        TopicPartition topicPartition2 = new TopicPartition(this.topic, 1);
        HashMap hashMap = new HashMap();
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(i);
        hashMap.put(topicPartition, new OffsetAndMetadata(0L, randomAlphanumeric));
        consumer.commitSync(hashMap);
        hashMap.put(topicPartition, new OffsetAndMetadata(0L, randomAlphanumeric + "!"));
        Assertions.assertThatThrownBy(() -> {
            consumer.commitSync(hashMap);
        }).isInstanceOf(OffsetMetadataTooLarge.class).hasMessageContainingAll(new CharSequence[]{"metadata", "offset request", "too large"});
        hashMap.put(topicPartition, new OffsetAndMetadata(0L, randomAlphanumeric + randomAlphanumeric));
        Assertions.assertThatThrownBy(() -> {
            consumer.commitSync(hashMap);
        }).isInstanceOf(OffsetMetadataTooLarge.class).hasMessageContainingAll(new CharSequence[]{"metadata", "offset request", "too large"});
        hashMap.put(topicPartition, new OffsetAndMetadata(0L, randomAlphanumeric));
        hashMap.put(topicPartition2, new OffsetAndMetadata(0L, randomAlphanumeric));
        consumer.commitSync(hashMap);
        hashMap.put(topicPartition, new OffsetAndMetadata(0L, randomAlphanumeric));
        hashMap.put(topicPartition2, new OffsetAndMetadata(0L, randomAlphanumeric));
        hashMap.put(new TopicPartition(this.topic, 2), new OffsetAndMetadata(0L, randomAlphanumeric));
        hashMap.put(new TopicPartition(this.topic, 3), new OffsetAndMetadata(0L, randomAlphanumeric));
        hashMap.put(new TopicPartition(this.topic, 4), new OffsetAndMetadata(0L, randomAlphanumeric));
        consumer.commitSync(hashMap);
        hashMap.put(topicPartition, new OffsetAndMetadata(0L, randomAlphanumeric));
        hashMap.put(topicPartition2, new OffsetAndMetadata(0L, randomAlphanumeric + randomAlphanumeric));
        Assertions.assertThatThrownBy(() -> {
            consumer.commitSync(hashMap);
        }).isInstanceOf(OffsetMetadataTooLarge.class).hasMessageContainingAll(new CharSequence[]{"metadata", "offset request", "too large"});
    }
}
