package io.confluent.parallelconsumer;

import com.google.common.truth.Truth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import pl.tlinkowski.unij.api.UniLists;

@Timeout(60000)
/* loaded from: input_file:io/confluent/parallelconsumer/MockConsumerTestWithCommitTimeoutException.class */
class MockConsumerTestWithCommitTimeoutException {
    private static final Logger log = LoggerFactory.getLogger(MockConsumerTestWithCommitTimeoutException.class);
    private final String topic = MockConsumerTestWithCommitTimeoutException.class.getSimpleName();

    MockConsumerTestWithCommitTimeoutException() {
    }

    /* JADX WARN: Type inference failed for: r0v17, types: [io.confluent.parallelconsumer.MockConsumerTestWithCommitTimeoutException$2] */
    @Test
    void mockConsumer() {
        final AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() + 20000);
        final Consumer consumer = new MockConsumer<String, String>(OffsetResetStrategy.EARLIEST) { // from class: io.confluent.parallelconsumer.MockConsumerTestWithCommitTimeoutException.1
            public synchronized ConsumerRecords<String, String> poll(Duration duration) {
                return super.poll(duration);
            }

            public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
                if (System.currentTimeMillis() >= atomicLong.get()) {
                    super.commitSync(map);
                    return;
                }
                try {
                    Thread.sleep(5000L);
                    throw new TimeoutException("Timeout after 5 seconds (mocking)");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        };
        HashMap hashMap = new HashMap();
        TopicPartition topicPartition = new TopicPartition(this.topic, 0);
        hashMap.put(topicPartition, 0L);
        ParallelEoSStreamProcessor parallelEoSStreamProcessor = new ParallelEoSStreamProcessor(ParallelConsumerOptions.builder().consumer(consumer).offsetCommitTimeout(Duration.ofSeconds(25L)).commitInterval(Duration.ofSeconds(1L)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_CONSUMER_SYNC).build());
        parallelEoSStreamProcessor.subscribe(UniLists.of(this.topic));
        consumer.rebalance(Collections.singletonList(topicPartition));
        parallelEoSStreamProcessor.onPartitionsAssigned(UniLists.of(topicPartition));
        consumer.updateBeginningOffsets(hashMap);
        new Thread() { // from class: io.confluent.parallelconsumer.MockConsumerTestWithCommitTimeoutException.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                MockConsumerTestWithCommitTimeoutException.this.addRecords(consumer);
            }
        }.start();
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        parallelEoSStreamProcessor.poll(pollContext -> {
            pollContext.forEach(recordContext -> {
                log.warn("Processing: {}", recordContext);
                concurrentLinkedQueue.add(recordContext);
            });
        });
        Awaitility.setDefaultTimeout(Duration.ofSeconds(50L));
        Awaitility.await().untilAsserted(() -> {
            Truth.assertThat(concurrentLinkedQueue).hasSize(10);
        });
        Awaitility.reset();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addRecords(MockConsumer<String, String> mockConsumer) {
        for (int i = 0; i < 10; i++) {
            mockConsumer.addRecord(new ConsumerRecord(this.topic, 0, i, "key", "value"));
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
