package io.confluent.parallelconsumer.integrationTests.state;

import com.google.common.truth.StringSubject;
import com.google.common.truth.Truth;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.ThreadUtils;
import io.confluent.parallelconsumer.FakeRuntimeException;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.PollContext;
import io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest;
import io.confluent.parallelconsumer.integrationTests.utils.KafkaClientUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.awaitility.core.TerminalFailureException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;
import pl.tlinkowski.unij.api.UniSets;

/* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT.class */
class PartitionStateCommittedOffsetIT extends BrokerIntegrationTest<String, String> {
    private static final Logger log = LoggerFactory.getLogger(PartitionStateCommittedOffsetIT.class);
    public static final OffsetResetStrategy DEFAULT_OFFSET_RESET_POLICY = OffsetResetStrategy.EARLIEST;
    TopicPartition tp;
    int TO_PRODUCE = 200;
    private OffsetResetStrategy offsetResetStrategy = DEFAULT_OFFSET_RESET_POLICY;
    private ParallelEoSStreamProcessor<String, String> activePc;

    /* renamed from: io.confluent.parallelconsumer.integrationTests.state.PartitionStateCommittedOffsetIT$1, reason: invalid class name */
    /* loaded from: input_file:io/confluent/parallelconsumer/integrationTests/state/PartitionStateCommittedOffsetIT$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$clients$consumer$OffsetResetStrategy = new int[OffsetResetStrategy.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$clients$consumer$OffsetResetStrategy[OffsetResetStrategy.EARLIEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$clients$consumer$OffsetResetStrategy[OffsetResetStrategy.LATEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$clients$consumer$OffsetResetStrategy[OffsetResetStrategy.NONE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    PartitionStateCommittedOffsetIT() {
    }

    @BeforeEach
    void setup() {
        setupTopic();
        this.tp = new TopicPartition(getTopic(), 0);
    }

    @Test
    void compactedTopic() {
        KafkaContainer kafkaContainer = setupCompactingKafkaBroker();
        try {
            int i = this.TO_PRODUCE / 10;
            List<String> produceMessages = produceMessages(i);
            int i2 = i / 2;
            List<PollContext<String, String>> runPcUntilOffset = runPcUntilOffset(i2, i, UniSets.of(Long.valueOf(i - 3)));
            Truth.assertWithMessage("Last processed should be at least half of the total sent, so that there is incomplete data to track").that(Long.valueOf(((PollContext) JavaUtils.getLast(runPcUntilOffset).get()).offset())).isGreaterThan(i / 2);
            HashSet hashSet = new HashSet(sendRandomCompactionRecords(produceMessages, i));
            List list = (List) runPcUntilOffset.stream().filter(pollContext -> {
                return !hashSet.contains(pollContext.key());
            }).map((v0) -> {
                return v0.key();
            }).collect(Collectors.toList());
            Map map = (Map) runPcUntilOffset.stream().collect(Collectors.partitioningBy(pollContext2 -> {
                return hashSet.contains(pollContext2.key());
            }));
            List list2 = (List) map.get(Boolean.FALSE);
            List list3 = (List) map.get(Boolean.TRUE);
            log.debug("kept offsets: {}", list2.stream().mapToLong((v0) -> {
                return v0.offset();
            }).boxed().collect(Collectors.toList()));
            log.debug("kept keys: {}", list2.stream().map((v0) -> {
                return v0.key();
            }).collect(Collectors.toList()));
            log.debug("compacted offsets: {}", list3.stream().map((v0) -> {
                return v0.key();
            }).collect(Collectors.toList()));
            log.debug("compacted keys: {}", list3.stream().mapToLong((v0) -> {
                return v0.offset();
            }).boxed().collect(Collectors.toList()));
            List list4 = (List) list3.stream().filter(pollContext3 -> {
                return hashSet.contains(pollContext3.key());
            }).map((v0) -> {
                return v0.offset();
            }).collect(Collectors.toList());
            log.debug("First run produced, with compaction targets removed: {}", list);
            triggerCompactionProcessing();
            List list5 = (List) runPcUntilOffset(i + hashSet.size(), KafkaClientUtils.GroupOption.REUSE_GROUP).stream().filter(pollContext4 -> {
                return !((String) pollContext4.key()).contains("compaction-trigger");
            }).collect(Collectors.toList());
            List list6 = (List) list5.stream().map((v0) -> {
                return v0.offset();
            }).collect(Collectors.toList());
            Truth.assertWithMessage("Finish reading rest of records from %s to %s", new Object[]{Integer.valueOf(i2), Integer.valueOf(i)}).that(Integer.valueOf(list5.size())).isGreaterThan(Integer.valueOf(i - i2));
            Truth.assertWithMessage("Off the offsets read on the second run, offsets that were compacted (below the initial produce target) should now be removed, as they were replaced with newer ones.").that(list6).containsNoneIn(list4);
            if (kafkaContainer != null) {
                kafkaContainer.close();
            }
        } catch (Throwable th) {
            if (kafkaContainer != null) {
                try {
                    kafkaContainer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @NonNull
    private KafkaContainer setupCompactingKafkaBroker() {
        KafkaContainer createKafkaContainer = BrokerIntegrationTest.createKafkaContainer("40000");
        createKafkaContainer.start();
        setup();
        setupCompactedEnvironment();
        return createKafkaContainer;
    }

    private List<PollContext<String, String>> runPcUntilOffset(int i) {
        return runPcUntilOffset(DEFAULT_OFFSET_RESET_POLICY, i);
    }

    private List<PollContext<String, String>> runPcUntilOffset(OffsetResetStrategy offsetResetStrategy, int i) {
        return runPcUntilOffset(offsetResetStrategy, i, i, UniSets.of(), KafkaClientUtils.GroupOption.NEW_GROUP);
    }

    private List<PollContext<String, String>> runPcUntilOffset(int i, KafkaClientUtils.GroupOption groupOption) {
        return runPcUntilOffset(DEFAULT_OFFSET_RESET_POLICY, Long.MAX_VALUE, i, UniSets.of(), groupOption);
    }

    private static long getOffsetFromKey(String str) {
        return Long.parseLong(str.substring(str.indexOf("-") + 1));
    }

    private void setupCompactedEnvironment() {
        log.debug("Setting up aggressive compaction...");
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, getTopic());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET));
        arrayList.add(new AlterConfigOp(new ConfigEntry("max.compaction.lag.ms", "1"), AlterConfigOp.OpType.SET));
        arrayList.add(new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0"), AlterConfigOp.OpType.SET));
        getKcu().getAdmin().incrementalAlterConfigs(UniMaps.of(configResource, arrayList)).all().get(5L, TimeUnit.SECONDS);
        log.debug("Compaction setup complete");
    }

    private List<String> triggerCompactionProcessing() {
        List<String> produceMessages = produceMessages(this.TO_PRODUCE * 2, "log-compaction-trigger-");
        log.info("Pausing for {} seconds to allow for compaction", 20);
        ThreadUtils.sleepSecondsLog(20);
        return produceMessages;
    }

    private ArrayList<String> sendRandomCompactionRecords(List<String> list, int i) {
        ArrayList<String> arrayList = new ArrayList<>();
        List list2 = (List) JavaUtils.getRandom(list, i).stream().map(str -> {
            arrayList.add(str);
            return getKcu().getProducer().send(new ProducerRecord(getTopic(), str, "compactor"));
        }).collect(Collectors.toList());
        ArrayList arrayList2 = new ArrayList();
        Iterator it = list2.iterator();
        while (it.hasNext()) {
            arrayList2.add(Long.valueOf(((RecordMetadata) ((Future) it.next()).get(5L, TimeUnit.SECONDS)).offset()));
        }
        arrayList.sort(Comparator.comparingLong(PartitionStateCommittedOffsetIT::getOffsetFromKey));
        log.debug("Keys to tombstone: {}\nOffsets of the generated tombstone: {}", arrayList, arrayList2);
        return arrayList;
    }

    @Test
    void committedOffsetLower() {
        produceMessages(this.TO_PRODUCE);
        runPcUntilOffset(50);
        moveCommittedOffset(getKcu().getGroupId(), 25L);
        runPcCheckStartIs(25, this.TO_PRODUCE);
    }

    private void runPcCheckStartIs(long j, long j2, KafkaClientUtils.GroupOption groupOption) {
        ParallelEoSStreamProcessor<String, String> buildPc = super.getKcu().buildPc(ParallelConsumerOptions.ProcessingOrder.PARTITION, groupOption);
        buildPc.subscribe(UniLists.of(getTopic()));
        AtomicLong atomicLong = new AtomicLong(Long.MAX_VALUE);
        AtomicLong atomicLong2 = new AtomicLong(Long.MIN_VALUE);
        AtomicLong atomicLong3 = new AtomicLong();
        buildPc.poll(pollContext -> {
            log.error("Consumed: {} Bumpers sent {}", Long.valueOf(pollContext.offset()), atomicLong3);
            long offset = pollContext.offset();
            if (offset < atomicLong.get()) {
                log.error("Found lowest offset {}", Long.valueOf(offset));
                atomicLong.set(offset);
            } else if (offset > atomicLong2.get()) {
                atomicLong2.set(offset);
            }
        });
        if (this.offsetResetStrategy.equals(OffsetResetStrategy.NONE)) {
            Awaitility.await().untilAsserted(() -> {
                Truth.assertThat(Boolean.valueOf(buildPc.isClosedOrFailed())).isFalse();
            });
            Awaitility.await().untilAsserted(() -> {
                Truth.assertThat(Boolean.valueOf(buildPc.isClosedOrFailed())).isTrue();
            });
            StringSubject assertThat = Truth.assertThat(ExceptionUtils.getRootCauseMessage(buildPc.getFailureCause()));
            assertThat.contains("NoOffsetForPartitionException");
            assertThat.contains("Undefined offset with no reset policy");
            getKcu().close();
        } else {
            ConditionFactory atMost = Awaitility.await().pollInterval(5L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS);
            Objects.requireNonNull(buildPc);
            atMost.failFast(buildPc::isClosedOrFailed).untilAsserted(() -> {
                getKcu().getProducer().send(new ProducerRecord(getTopic(), "key-bumper", "poll-bumper"));
                atomicLong3.incrementAndGet();
                long offset = ((ListOffsetsResult.ListOffsetsResultInfo) getKcu().getAdmin().listOffsets(UniMaps.of(this.tp, OffsetSpec.earliest())).partitionResult(this.tp).get()).offset();
                log.error("start await loop: {}, end: {}, bumpersSent: {}", new Object[]{Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) getKcu().getAdmin().listOffsets(UniMaps.of(this.tp, OffsetSpec.latest())).partitionResult(this.tp).get()).offset()), Long.valueOf(offset), atomicLong3});
                Truth.assertWithMessage("Highest seen offset to read up to").that(Long.valueOf(atomicLong2.get())).isAtLeast(Long.valueOf(j2 - 1));
            });
            log.warn("Offset started at should equal the target {}, lowest {}, sent {}, diff is {})", new Object[]{Long.valueOf(j), atomicLong, atomicLong3, Long.valueOf(atomicLong.get() - j)});
            Truth.assertWithMessage("Offset started at should equal the target (sent %s , diff is %s)", new Object[]{atomicLong3, Long.valueOf(atomicLong.get() - j)}).that(Long.valueOf(atomicLong.get())).isEqualTo(Long.valueOf(j));
            buildPc.close();
        }
    }

    private void moveCommittedOffset(String str, long j) {
        log.debug("Moving offset of {} to {}", str, Long.valueOf(j));
        getKcu().getAdmin().alterConsumerGroupOffsets(str, UniMaps.of(this.tp, new OffsetAndMetadata(j))).all().get(5L, TimeUnit.SECONDS);
        log.debug("Moved offset to {}", Long.valueOf(j));
    }

    private List<PollContext<String, String>> runPcUntilOffset(long j, long j2, Set<Long> set) {
        return runPcUntilOffset(DEFAULT_OFFSET_RESET_POLICY, j, j2, set, KafkaClientUtils.GroupOption.NEW_GROUP);
    }

    private List<PollContext<String, String>> runPcUntilOffset(OffsetResetStrategy offsetResetStrategy, long j, long j2, Set<Long> set, KafkaClientUtils.GroupOption groupOption) {
        log.debug("Running PC until at least offset {}", Long.valueOf(j));
        super.getKcu().setOffsetResetPolicy(offsetResetStrategy);
        ParallelEoSStreamProcessor<String, String> buildPc = super.getKcu().buildPc(ParallelConsumerOptions.ProcessingOrder.UNORDERED, groupOption);
        this.activePc = buildPc;
        try {
            SortedSet synchronizedSortedSet = Collections.synchronizedSortedSet(new TreeSet(Comparator.comparingLong((v0) -> {
                return v0.offset();
            })));
            SortedSet synchronizedSortedSet2 = Collections.synchronizedSortedSet(new TreeSet(Comparator.comparingLong((v0) -> {
                return v0.offset();
            })));
            buildPc.subscribe(UniLists.of(getTopic()));
            buildPc.poll(pollContext -> {
                synchronizedSortedSet.add(pollContext);
                long offset = pollContext.offset();
                if (set.contains(Long.valueOf(offset))) {
                    log.debug("Exceptional offset {} succeeded", Long.valueOf(offset));
                } else {
                    if (offset >= j) {
                        log.debug("Failing on {}", Long.valueOf(offset));
                        throw new FakeRuntimeException("Failing on " + offset);
                    }
                    log.debug("Succeeded {}: {}", Long.valueOf(offset), pollContext.getSingleRecord());
                    synchronizedSortedSet2.add(pollContext);
                }
            });
            ThreadUtils.sleepSecondsLog(1);
            getKcu().produceMessages(getTopic(), 1L, "poll-bumper");
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(buildPc);
            await.failFast(buildPc::isClosedOrFailed).untilAsserted(() -> {
                Truth.assertThat(synchronizedSortedSet).isNotEmpty();
                Truth.assertThat(Long.valueOf(((PollContext) synchronizedSortedSet.last()).offset())).isGreaterThan(Long.valueOf(j2 - 2));
            });
            if (!synchronizedSortedSet2.isEmpty()) {
                log.debug("Succeeded up to: {}", Long.valueOf(((PollContext) synchronizedSortedSet2.last()).offset()));
            }
            log.debug("Consumed up to {}", Long.valueOf(((PollContext) synchronizedSortedSet.last()).offset()));
            ArrayList arrayList = new ArrayList(synchronizedSortedSet);
            Collections.sort(arrayList, Comparator.comparingLong((v0) -> {
                return v0.offset();
            }));
            return arrayList;
        } finally {
            try {
                if (!buildPc.isClosedOrFailed()) {
                    buildPc.close();
                }
            } catch (Exception e) {
                log.debug("Cause will get rethrown close on the NONE parameter branch", e);
            }
        }
    }

    @Test
    void committedOffsetHigher() {
        produceMessages(100);
        runPcUntilOffset(50);
        moveCommittedOffset(getKcu().getGroupId(), 75L);
        runPcCheckStartIs(75, 100);
    }

    private void runPcCheckStartIs(int i, int i2) {
        runPcCheckStartIs(i, i2, KafkaClientUtils.GroupOption.REUSE_GROUP);
    }

    @EnumSource(OffsetResetStrategy.class)
    @ParameterizedTest
    void committedOffsetRemoved(OffsetResetStrategy offsetResetStrategy) {
        int i;
        this.offsetResetStrategy = offsetResetStrategy;
        KafkaContainer kafkaContainer = setupCompactingKafkaBroker();
        try {
            KafkaClientUtils kafkaClientUtils = new KafkaClientUtils(kafkaContainer);
            try {
                log.debug("Compacting broker started {}", kafkaContainer.getBootstrapServers());
                kafkaClientUtils.setOffsetResetPolicy(offsetResetStrategy);
                kafkaClientUtils.open();
                if (offsetResetStrategy.equals(OffsetResetStrategy.NONE)) {
                    KafkaConsumer<String, String> consumer = getKcu().getConsumer();
                    consumer.subscribe(UniLists.of(getTopic()));
                    consumer.poll(Duration.ofSeconds(1L));
                    consumer.commitSync(UniMaps.of(this.tp, new OffsetAndMetadata(0L)));
                    consumer.close();
                }
                int size = produceMessages(this.TO_PRODUCE).size();
                String groupId = kafkaClientUtils.getGroupId();
                runPcUntilOffset(offsetResetStrategy, 50L, 50L, UniSets.of(), KafkaClientUtils.GroupOption.REUSE_GROUP);
                int i2 = size + 1;
                checkHowManyRecordsWithKeyPresent("key-50", 1, this.TO_PRODUCE);
                int causeCommittedOffsetToBeRemoved = causeCommittedOffsetToBeRemoved(50L);
                checkHowManyRecordsWithKeyPresent("key-50", 1, this.TO_PRODUCE + 2);
                int i3 = i2 + causeCommittedOffsetToBeRemoved;
                switch (AnonymousClass1.$SwitchMap$org$apache$kafka$clients$consumer$OffsetResetStrategy[offsetResetStrategy.ordinal()]) {
                    case 1:
                        i = 0;
                        break;
                    case 2:
                        i = i3;
                        break;
                    case 3:
                        i = -1;
                        break;
                    default:
                        throw new IncompatibleClassChangeError();
                }
                kafkaClientUtils.setGroupId(groupId);
                runPcCheckStartIs(i, i3);
                kafkaClientUtils.close();
                if (kafkaContainer != null) {
                    kafkaContainer.close();
                }
            } catch (Throwable th) {
                try {
                    kafkaClientUtils.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } finally {
        }
    }

    private void checkHowManyRecordsWithKeyPresent(String str, int i, long j) {
        log.debug("Looking for {} records with key {} up to offset {}", new Object[]{Integer.valueOf(i), str, Long.valueOf(j)});
        KafkaConsumer createNewConsumer = getKcu().createNewConsumer(KafkaClientUtils.GroupOption.NEW_GROUP);
        try {
            createNewConsumer.assign(UniLists.of(this.tp));
            createNewConsumer.seekToBeginning(UniSets.of(this.tp));
            Truth.assertThat(Long.valueOf(createNewConsumer.position(this.tp))).isEqualTo(0);
            ArrayList arrayList = new ArrayList();
            long j2 = -1;
            while (j2 < j - 1) {
                arrayList.addAll(createNewConsumer.poll(Duration.ofSeconds(1L)).records(this.tp));
                Optional last = JavaUtils.getLast(arrayList);
                if (last.isPresent()) {
                    j2 = ((ConsumerRecord) last.get()).offset();
                }
            }
            ManagedTruth.assertThat((List) arrayList.stream().filter(consumerRecord -> {
                return ((String) consumerRecord.key()).equals(str);
            }).collect(Collectors.toList())).hasSize(i);
            if (createNewConsumer != null) {
                createNewConsumer.close();
            }
        } catch (Throwable th) {
            if (createNewConsumer != null) {
                try {
                    createNewConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private int causeCommittedOffsetToBeRemoved(long j) {
        sendCompactionKeyForOffset(j);
        sendCompactionKeyForOffset(j + 1);
        checkHowManyRecordsWithKeyPresent("key-" + j, 2, this.TO_PRODUCE + 2);
        return 2 + triggerCompactionProcessing().size();
    }

    private void sendCompactionKeyForOffset(long j) throws InterruptedException, ExecutionException, TimeoutException {
        getKcu().getProducer().send(new ProducerRecord(getTopic(), 0, "key-" + j, "compactor")).get(1L, TimeUnit.SECONDS);
    }

    @Test
    void noOffsetPolicyOnStartup() {
        this.offsetResetStrategy = OffsetResetStrategy.NONE;
        KafkaClientUtils kafkaClientUtils = new KafkaClientUtils(kafkaContainer);
        try {
            kafkaClientUtils.setOffsetResetPolicy(this.offsetResetStrategy);
            kafkaClientUtils.open();
            int size = produceMessages(this.TO_PRODUCE).size();
            try {
                runPcUntilOffset(this.offsetResetStrategy, size, size, UniSets.of(), KafkaClientUtils.GroupOption.REUSE_GROUP);
            } catch (TerminalFailureException e) {
                StringSubject assertThat = Truth.assertThat(ExceptionUtils.getRootCauseMessage(this.activePc.getFailureCause()));
                assertThat.contains("NoOffsetForPartitionException");
                assertThat.contains("Undefined offset");
                assertThat.contains("no reset policy");
            }
            kafkaClientUtils.close();
        } catch (Throwable th) {
            try {
                kafkaClientUtils.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
