package io.confluent.parallelconsumer.internal;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.BlockedThreadAsserter;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.confluent.parallelconsumer.internal.ProducerWrapper;
import io.confluent.parallelconsumer.state.ModelUtils;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

@Tags({@Tag("transactions"), @Tag("#355")})
@Timeout(60)
/* loaded from: input_file:io/confluent/parallelconsumer/internal/ProducerManagerTest.class */
class ProducerManagerTest {
    private static final Logger log = LoggerFactory.getLogger(ProducerManagerTest.class);
    ParallelConsumerOptions<String, String> opts;
    PCModuleTestEnv module;
    ModelUtils mu;
    ProducerManager<String, String> producerManager;

    ProducerManagerTest() {
    }

    @BeforeEach
    void setup() {
        setup(ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).commitLockAcquisitionTimeout(Duration.ofSeconds(2L)));
    }

    private void setup(ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> parallelConsumerOptionsBuilder) {
        this.opts = parallelConsumerOptionsBuilder.build();
        buildModule(this.opts);
        this.module = buildModule(this.opts);
        this.mu = new ModelUtils(this.module);
        this.producerManager = this.module.producerManager();
    }

    private PCModuleTestEnv buildModule(ParallelConsumerOptions<String, String> parallelConsumerOptions) {
        return new PCModuleTestEnv(parallelConsumerOptions) { // from class: io.confluent.parallelconsumer.internal.ProducerManagerTest.1
            protected AbstractParallelEoSStreamProcessor<String, String> pc() {
                if (this.parallelEoSStreamProcessor == null) {
                    this.parallelEoSStreamProcessor = (AbstractParallelEoSStreamProcessor) Mockito.spy(super.pc());
                    this.parallelEoSStreamProcessor = new ParallelEoSStreamProcessor<String, String>(options(), this) { // from class: io.confluent.parallelconsumer.internal.ProducerManagerTest.1.1
                        protected boolean isTimeToCommitNow() {
                            return true;
                        }

                        public void close(Duration duration, DrainingCloseable.DrainingMode drainingMode) {
                        }
                    };
                }
                return this.parallelEoSStreamProcessor;
            }
        };
    }

    @Test
    void sendingGetsLockedInTx() {
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ProducerManager.ProducingLock beginProducing = this.producerManager.beginProducing((PollContextInternal) Mockito.mock(PollContextInternal.class));
        produceOneRecord();
        BlockedThreadAsserter blockedThreadAsserter = new BlockedThreadAsserter();
        blockedThreadAsserter.assertFunctionBlocks(() -> {
            try {
                this.producerManager.preAcquireOffsetsToCommit();
                this.producerManager.postCommit();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        log.debug("Unlocking produce lock...");
        this.producerManager.finishProducing(beginProducing);
        log.debug("Waiting for commit lock to release...");
        blockedThreadAsserter.awaitReturnFully();
        this.producerManager.preAcquireOffsetsToCommit();
        ManagedTruth.assertThat(this.producerManager).isTransactionCommittingInProgress();
        BlockedThreadAsserter blockedThreadAsserter2 = new BlockedThreadAsserter();
        blockedThreadAsserter2.assertFunctionBlocks(() -> {
            log.debug("Starting sending records - will block due to open commit");
            try {
                ProducerManager.ProducingLock beginProducing2 = this.producerManager.beginProducing((PollContextInternal) Mockito.mock(PollContextInternal.class));
                log.debug("Then after released by finishing tx, complete the producing");
                this.producerManager.finishProducing(beginProducing2);
            } catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
        });
        this.producerManager.postCommit();
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ConditionFactory await = Awaitility.await("blocked sends should only now complete");
        Objects.requireNonNull(blockedThreadAsserter2);
        await.until(blockedThreadAsserter2::functionHasCompleted);
    }

    private List<ParallelConsumer.Tuple<ProducerRecord<String, String>, Future<RecordMetadata>>> produceOneRecord() {
        return this.producerManager.produceMessages(makeRecord());
    }

    private List<ProducerRecord<String, String>> makeRecord() {
        return this.mu.createProducerRecords("topic", 1L);
    }

    @Test
    void txOnlyStartedUponMessageSend() {
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ManagedTruth.assertThat(this.producerManager).stateIs(ProducerWrapper.ProducerState.INIT);
        ManagedTruth.assertWithMessage("Transaction is started as not open").that(this.producerManager).transactionNotOpen();
        ProducerManager.ProducingLock beginProducing = this.producerManager.beginProducing((PollContextInternal) Mockito.mock(PollContextInternal.class));
        produceOneRecord();
        ManagedTruth.assertThat(this.producerManager).stateIs(ProducerWrapper.ProducerState.BEGIN);
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        produceOneRecord();
        this.producerManager.finishProducing(beginProducing);
        this.producerManager.preAcquireOffsetsToCommit();
        ManagedTruth.assertThat(this.producerManager).isTransactionCommittingInProgress();
        this.producerManager.commitOffsets(UniMaps.of(), new ConsumerGroupMetadata(""));
        ManagedTruth.assertThat(this.producerManager).isTransactionCommittingInProgress();
        this.producerManager.postCommit();
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ManagedTruth.assertWithMessage("A new transaction hasn't been opened").that(this.producerManager).transactionNotOpen();
        ProducerManager.ProducingLock beginProducing2 = this.producerManager.beginProducing((PollContextInternal) Mockito.mock(PollContextInternal.class));
        ManagedTruth.assertThat(this.producerManager).transactionNotOpen();
        produceOneRecord();
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        this.producerManager.finishProducing(beginProducing2);
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        this.producerManager.preAcquireOffsetsToCommit();
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        this.producerManager.commitOffsets(UniMaps.of(), new ConsumerGroupMetadata(""));
        ManagedTruth.assertThat(this.producerManager).transactionNotOpen();
        ManagedTruth.assertThat(this.producerManager).stateIs(ProducerWrapper.ProducerState.COMMIT);
    }

    @Test
    void producedRecordsCantBeInTransactionWithoutItsOffsetDirect() {
        setup(ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER));
        AbstractParallelEoSStreamProcessor pc = this.module.pc();
        try {
            pc.subscribe(UniLists.of(this.mu.getTopic()));
            pc.onPartitionsAssigned(this.mu.getPartitions());
            pc.setState(State.running);
            pc.registerWork(this.mu.createFreshWork());
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            AtomicReference atomicReference = new AtomicReference();
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Function function = pollContextInternal -> {
                try {
                    ProducerManager.ProducingLock beginProducing = this.producerManager.beginProducing((PollContextInternal) Mockito.mock(PollContextInternal.class));
                    try {
                        atomicReference.set(beginProducing);
                        log.info(pollContextInternal.toString());
                        if (pollContextInternal.offset() == 1) {
                            log.debug("Blocking on {}", 1);
                            atomicBoolean.set(true);
                            LatchTestUtils.awaitLatch(countDownLatch);
                        }
                        this.module.producerWrap().send((ProducerRecord) Mockito.mock(ProducerRecord.class), (recordMetadata, exc) -> {
                        });
                        List of = UniLists.of();
                        beginProducing.unlock();
                        return of;
                    } catch (Throwable th) {
                        beginProducing.unlock();
                        throw th;
                    }
                } catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
            };
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            pc.controlLoop(function, obj -> {
            });
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            log.debug("wait for first record to finish");
            Awaitility.await("wait for first record to finish").untilAsserted(() -> {
                ManagedTruth.assertThat(pc.getWorkMailBox()).hasSize(1);
            });
            pc.registerWork(this.mu.createFreshWork());
            pc.controlLoop(function, obj2 -> {
            });
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            log.debug("Ensure expected produce lock is now held by blocked worker thread");
            Awaitility.await("Ensure expected produce lock is now held by blocked worker thread").untilTrue(atomicBoolean);
            BlockedThreadAsserter blockedThreadAsserter = new BlockedThreadAsserter();
            blockedThreadAsserter.assertUnblocksAfter(() -> {
                log.debug("Running control loop which should block until offset 1 is released by finishing produce");
                try {
                    pc.controlLoop(function, obj3 -> {
                    });
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, () -> {
                log.debug("Unblocking offset processing offset1Mutex...");
                countDownLatch.countDown();
            }, Duration.ofSeconds(10L));
            Awaitility.await().untilAsserted(() -> {
                Truth.assertWithMessage("commit should now have unlocked and returned").that(Boolean.valueOf(blockedThreadAsserter.functionHasCompleted())).isTrue();
            });
            ProducerWrapper<String, String> producerWrap = this.module.producerWrap();
            ((ProducerWrapper) Mockito.verify(producerWrap, Mockito.description("Both offsets are represented in base commit"))).sendOffsetsToTransaction(UniMaps.of(this.mu.getPartition(), new OffsetAndMetadata(2L, "")), this.mu.consumerGroupMeta());
            ((ProducerWrapper) Mockito.verify(producerWrap, Mockito.times(2).description("Should send twice, as it blocks the commit lock until it finishes, so offsets get taken only after"))).send((ProducerRecord) ArgumentMatchers.any(), (Callback) ArgumentMatchers.any());
            if (pc != null) {
                pc.close();
            }
        } finally {
        }
    }

    @Test
    void testOptions() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            ParallelConsumerOptions.builder().consumer((Consumer) Mockito.mock(Consumer.class)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build().validate();
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            ParallelConsumerOptions.builder().consumer((Consumer) Mockito.mock(Consumer.class)).allowEagerProcessingDuringTransactionCommit(true).build().validate();
        });
    }
}
