package org.neo4j.unsafe.impl.batchimport.staging;

import java.lang.Thread;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.graphdb.Resource;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.class */
public class ProcessorStepTest {

    @Rule
    public final OtherThreadRule<Void> t2 = new OtherThreadRule<>();

    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest$BlockingProcessorStep.class */
    private static class BlockingProcessorStep extends ProcessorStep<Void> {
        private final CountDownLatch latch;

        public BlockingProcessorStep(StageControl stageControl, int i, CountDownLatch countDownLatch) {
            super(stageControl, "test", Configuration.DEFAULT, i, new StatsProvider[0]);
            this.latch = countDownLatch;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void process(Void r3, BatchSender batchSender) throws Throwable {
            this.latch.await();
        }
    }

    /* loaded from: input_file:org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest$MyProcessorStep.class */
    private static class MyProcessorStep extends ProcessorStep<Integer> {
        private final AtomicInteger nextExpected;

        private MyProcessorStep(StageControl stageControl, int i) {
            super(stageControl, "test", Configuration.DEFAULT, i, new StatsProvider[0]);
            this.nextExpected = new AtomicInteger();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public Resource permit(Integer num) throws Throwable {
            Thread.sleep(10L);
            Assert.assertEquals(this.nextExpected.getAndIncrement(), num.intValue());
            return super.permit(num);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void process(Integer num, BatchSender batchSender) throws Throwable {
        }
    }

    @Test
    public void shouldUpholdProcessOrderingGuarantee() throws Exception {
        StageControl stageControl = (StageControl) Mockito.mock(StageControl.class);
        MyProcessorStep myProcessorStep = new MyProcessorStep(stageControl, 0);
        myProcessorStep.start(2);
        while (myProcessorStep.numberOfProcessors() < 5) {
            myProcessorStep.incrementNumberOfProcessors();
        }
        for (int i = 0; i < 10; i++) {
            myProcessorStep.receive(i, Integer.valueOf(i));
        }
        myProcessorStep.endOfUpstream();
        while (!myProcessorStep.isCompleted()) {
            Mockito.verifyNoMoreInteractions(new Object[]{stageControl});
        }
        Assert.assertEquals(10, myProcessorStep.nextExpected.get());
        myProcessorStep.close();
    }

    @Test
    public void shouldHaveTaskQueueSizeEqualToNumberOfProcessorsIfSpecificallySet() throws Exception {
        StageControl stageControl = (StageControl) Mockito.mock(StageControl.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BlockingProcessorStep blockingProcessorStep = new BlockingProcessorStep(stageControl, 2, countDownLatch);
        blockingProcessorStep.start(2);
        blockingProcessorStep.incrementNumberOfProcessors();
        for (int i = 0; i < 3; i++) {
            blockingProcessorStep.receive(i, (Object) null);
        }
        Future<RESULT> execute = this.t2.execute(receive(2, blockingProcessorStep));
        this.t2.get().waitUntilThreadState(Thread.State.TIMED_WAITING);
        countDownLatch.countDown();
        execute.get();
    }

    @Test
    public void shouldHaveTaskQueueSizeEqualToCurrentNumberOfProcessorsIfNotSpecificallySet() throws Exception {
        StageControl stageControl = (StageControl) Mockito.mock(StageControl.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        final BlockingProcessorStep blockingProcessorStep = new BlockingProcessorStep(stageControl, 0, countDownLatch);
        blockingProcessorStep.start(2);
        blockingProcessorStep.incrementNumberOfProcessors();
        blockingProcessorStep.incrementNumberOfProcessors();
        for (int i = 0; i < blockingProcessorStep.numberOfProcessors() + 1; i++) {
            blockingProcessorStep.receive(i, (Object) null);
        }
        Future<RESULT> execute = this.t2.execute(new OtherThreadExecutor.WorkerCommand<Void, Void>() { // from class: org.neo4j.unsafe.impl.batchimport.staging.ProcessorStepTest.1
            @Override // org.neo4j.test.OtherThreadExecutor.WorkerCommand
            public Void doWork(Void r6) throws Exception {
                blockingProcessorStep.receive(blockingProcessorStep.numberOfProcessors(), (Object) null);
                return null;
            }
        });
        this.t2.get().waitUntilThreadState(Thread.State.TIMED_WAITING);
        countDownLatch.countDown();
        execute.get();
    }

    private OtherThreadExecutor.WorkerCommand<Void, Void> receive(final int i, final ProcessorStep<Void> processorStep) {
        return new OtherThreadExecutor.WorkerCommand<Void, Void>() { // from class: org.neo4j.unsafe.impl.batchimport.staging.ProcessorStepTest.2
            @Override // org.neo4j.test.OtherThreadExecutor.WorkerCommand
            public Void doWork(Void r6) throws Exception {
                processorStep.receive(i, (Object) null);
                return null;
            }
        };
    }
}
