/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.parallelconsumer.internal;

import com.google.common.truth.Truth;
import io.confluent.csid.utils.BlockedThreadAsserter;
import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.parallelconsumer.ManagedTruth;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.PollContextInternal;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.PCModuleTestEnv;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.confluent.parallelconsumer.internal.ProducerWrapper;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.state.ModelUtils;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniMaps;

@Tags(value={@Tag(value="transactions"), @Tag(value="#355")})
@Timeout(value=60L)
class ProducerManagerTest {
    private static final Logger log = LoggerFactory.getLogger(ProducerManagerTest.class);
    ParallelConsumerOptions<String, String> opts;
    PCModuleTestEnv module;
    ModelUtils mu;
    ProducerManager<String, String> producerManager;

    ProducerManagerTest() {
    }

    @BeforeEach
    void setup() {
        this.setup((ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>)ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).commitLockAcquisitionTimeout(Duration.ofSeconds(2L)));
    }

    private void setup(ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String> optionsBuilder) {
        this.opts = optionsBuilder.build();
        this.buildModule(this.opts);
        this.module = this.buildModule(this.opts);
        this.mu = new ModelUtils(this.module);
        this.producerManager = this.module.producerManager();
    }

    private PCModuleTestEnv buildModule(ParallelConsumerOptions<String, String> opts) {
        return new PCModuleTestEnv(opts){

            protected AbstractParallelEoSStreamProcessor<String, String> pc() {
                if (this.parallelEoSStreamProcessor == null) {
                    AbstractParallelEoSStreamProcessor raw = super.pc();
                    this.parallelEoSStreamProcessor = (AbstractParallelEoSStreamProcessor)Mockito.spy((Object)raw);
                    this.parallelEoSStreamProcessor = new ParallelEoSStreamProcessor<String, String>(this.options(), (PCModule)this){

                        protected boolean isTimeToCommitNow() {
                            return true;
                        }

                        public void close(Duration timeout, DrainingCloseable.DrainingMode drainMode) {
                        }
                    };
                }
                return this.parallelEoSStreamProcessor;
            }
        };
    }

    @Test
    void sendingGetsLockedInTx() {
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ProducerManager.ProducingLock produceReadLock = this.producerManager.beginProducing((PollContextInternal)Mockito.mock(PollContextInternal.class));
        this.produceOneRecord();
        BlockedThreadAsserter blockedCommit = new BlockedThreadAsserter();
        blockedCommit.assertFunctionBlocks(() -> {
            try {
                this.producerManager.preAcquireOffsetsToCommit();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            this.producerManager.postCommit();
        });
        log.debug("Unlocking produce lock...");
        this.producerManager.finishProducing(produceReadLock);
        log.debug("Waiting for commit lock to release...");
        blockedCommit.awaitReturnFully();
        this.producerManager.preAcquireOffsetsToCommit();
        ManagedTruth.assertThat(this.producerManager).isTransactionCommittingInProgress();
        BlockedThreadAsserter blockedRecordSenderReturned = new BlockedThreadAsserter();
        blockedRecordSenderReturned.assertFunctionBlocks(() -> {
            log.debug("Starting sending records - will block due to open commit");
            ProducerManager.ProducingLock produceLock = null;
            try {
                produceLock = this.producerManager.beginProducing((PollContextInternal)Mockito.mock(PollContextInternal.class));
            }
            catch (TimeoutException e) {
                throw new RuntimeException(e);
            }
            log.debug("Then after released by finishing tx, complete the producing");
            this.producerManager.finishProducing(produceLock);
        });
        this.producerManager.postCommit();
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        Awaitility.await((String)"blocked sends should only now complete").until(blockedRecordSenderReturned::functionHasCompleted);
    }

    private List<ParallelConsumer.Tuple<ProducerRecord<String, String>, Future<RecordMetadata>>> produceOneRecord() {
        return this.producerManager.produceMessages(this.makeRecord());
    }

    private List<ProducerRecord<String, String>> makeRecord() {
        return this.mu.createProducerRecords("topic", 1L);
    }

    @Test
    void txOnlyStartedUponMessageSend() {
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ManagedTruth.assertThat(this.producerManager).stateIs(ProducerWrapper.ProducerState.INIT);
        ManagedTruth.assertWithMessage("Transaction is started as not open").that(this.producerManager).transactionNotOpen();
        ProducerManager.ProducingLock produceLock = this.producerManager.beginProducing((PollContextInternal)Mockito.mock(PollContextInternal.class));
        List<ParallelConsumer.Tuple<ProducerRecord<String, String>, Future<RecordMetadata>>> list = this.produceOneRecord();
        ManagedTruth.assertThat(this.producerManager).stateIs(ProducerWrapper.ProducerState.BEGIN);
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        list = this.produceOneRecord();
        this.producerManager.finishProducing(produceLock);
        this.producerManager.preAcquireOffsetsToCommit();
        ManagedTruth.assertThat(this.producerManager).isTransactionCommittingInProgress();
        this.producerManager.commitOffsets(UniMaps.of(), new ConsumerGroupMetadata(""));
        ManagedTruth.assertThat(this.producerManager).isTransactionCommittingInProgress();
        this.producerManager.postCommit();
        ManagedTruth.assertThat(this.producerManager).isNotTransactionCommittingInProgress();
        ManagedTruth.assertWithMessage("A new transaction hasn't been opened").that(this.producerManager).transactionNotOpen();
        ProducerManager.ProducingLock producingLock = this.producerManager.beginProducing((PollContextInternal)Mockito.mock(PollContextInternal.class));
        ManagedTruth.assertThat(this.producerManager).transactionNotOpen();
        this.produceOneRecord();
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        this.producerManager.finishProducing(producingLock);
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        this.producerManager.preAcquireOffsetsToCommit();
        ManagedTruth.assertThat(this.producerManager).transactionOpen();
        this.producerManager.commitOffsets(UniMaps.of(), new ConsumerGroupMetadata(""));
        ManagedTruth.assertThat(this.producerManager).transactionNotOpen();
        ManagedTruth.assertThat(this.producerManager).stateIs(ProducerWrapper.ProducerState.COMMIT);
    }

    @Test
    void producedRecordsCantBeInTransactionWithoutItsOffsetDirect() {
        this.setup((ParallelConsumerOptions.ParallelConsumerOptionsBuilder<String, String>)ParallelConsumerOptions.builder().commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER));
        try (AbstractParallelEoSStreamProcessor pc = this.module.pc();){
            pc.subscribe((Collection)UniLists.of((Object)this.mu.getTopic()));
            pc.onPartitionsAssigned(this.mu.getPartitions());
            pc.setState(State.RUNNING);
            EpochAndRecordsMap<String, String> freshWork = this.mu.createFreshWork();
            pc.registerWork(freshWork);
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            AtomicReference producingLockRef = new AtomicReference();
            CountDownLatch offset1Mutex = new CountDownLatch(1);
            AtomicBoolean blockedOn1 = new AtomicBoolean(false);
            Function<PollContextInternal, List> userFunc = context -> {
                ProducerManager.ProducingLock newValue = null;
                try {
                    newValue = this.producerManager.beginProducing((PollContextInternal)Mockito.mock(PollContextInternal.class));
                }
                catch (TimeoutException e) {
                    throw new RuntimeException(e);
                }
                try {
                    producingLockRef.set(newValue);
                    log.info(context.toString());
                    if (context.offset() == 1L) {
                        log.debug("Blocking on {}", (Object)1);
                        blockedOn1.set(true);
                        LatchTestUtils.awaitLatch(offset1Mutex);
                    }
                    this.module.producerWrap().send((ProducerRecord)Mockito.mock(ProducerRecord.class), (a, b) -> {});
                    List list = UniLists.of();
                    return list;
                }
                finally {
                    newValue.unlock();
                }
            };
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            pc.controlLoop(userFunc, o -> {});
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            String msg = "wait for first record to finish";
            log.debug(msg);
            Awaitility.await((String)msg).untilAsserted(() -> ManagedTruth.assertThat(pc.getWorkMailBox()).hasSize(1));
            freshWork = this.mu.createFreshWork();
            pc.registerWork(freshWork);
            pc.controlLoop(userFunc, o -> {});
            ManagedTruth.assertThat(this.producerManager).getProducerTransactionLock().isNotWriteLocked();
            msg = "Ensure expected produce lock is now held by blocked worker thread";
            log.debug(msg);
            Awaitility.await((String)msg).untilTrue(blockedOn1);
            BlockedThreadAsserter commitBlocks = new BlockedThreadAsserter();
            commitBlocks.assertUnblocksAfter(() -> {
                log.debug("Running control loop which should block until offset 1 is released by finishing produce");
                try {
                    pc.controlLoop(userFunc, o -> {});
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, () -> {
                log.debug("Unblocking offset processing offset1Mutex...");
                offset1Mutex.countDown();
            }, Duration.ofSeconds(10L));
            Awaitility.await().untilAsserted(() -> Truth.assertWithMessage((String)"commit should now have unlocked and returned").that(Boolean.valueOf(commitBlocks.functionHasCompleted())).isTrue());
            int nextExpectedOffset = 2;
            ProducerWrapper<String, String> producer = this.module.producerWrap();
            ((ProducerWrapper)Mockito.verify(producer, (VerificationMode)Mockito.description((String)"Both offsets are represented in base commit"))).sendOffsetsToTransaction(UniMaps.of((Object)this.mu.getPartition(), (Object)new OffsetAndMetadata(2L, "")), this.mu.consumerGroupMeta());
            ((ProducerWrapper)Mockito.verify(producer, (VerificationMode)Mockito.times((int)2).description("Should send twice, as it blocks the commit lock until it finishes, so offsets get taken only after"))).send((ProducerRecord)ArgumentMatchers.any(), (Callback)ArgumentMatchers.any());
        }
    }

    @Test
    void testOptions() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> ParallelConsumerOptions.builder().consumer((Consumer)Mockito.mock(Consumer.class)).commitMode(ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER).build().validate());
        Assertions.assertThrows(IllegalArgumentException.class, () -> ParallelConsumerOptions.builder().consumer((Consumer)Mockito.mock(Consumer.class)).allowEagerProcessingDuringTransactionCommit(true).build().validate());
    }
}

