/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.internal.batchimport.staging;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.executor.ProcessorScheduler;
import org.neo4j.internal.batchimport.staging.BatchSender;
import org.neo4j.internal.batchimport.staging.ProcessorStep;
import org.neo4j.internal.batchimport.staging.ProducerStep;
import org.neo4j.internal.batchimport.staging.SimpleStageControl;
import org.neo4j.internal.batchimport.staging.Stage;
import org.neo4j.internal.batchimport.staging.StageControl;
import org.neo4j.internal.batchimport.staging.StageExecution;
import org.neo4j.internal.batchimport.staging.Step;
import org.neo4j.internal.batchimport.staging.TrackingPanicMonitor;
import org.neo4j.internal.batchimport.stats.Key;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.internal.batchimport.stats.StatsProvider;
import org.neo4j.io.pagecache.PageSwapper;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.context.CursorContextFactory;
import org.neo4j.io.pagecache.context.EmptyVersionContextSupplier;
import org.neo4j.io.pagecache.tracing.DefaultPageCacheTracer;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.io.pagecache.tracing.PinEvent;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.OtherThread;
import org.neo4j.test.extension.OtherThreadExtension;

@ExtendWith(value={OtherThreadExtension.class})
class ProcessorStepTest {
    private static final CursorContextFactory CONTEXT_FACTORY = new CursorContextFactory(PageCacheTracer.NULL, EmptyVersionContextSupplier.EMPTY);
    @Inject
    private OtherThread t2;

    ProcessorStepTest() {
    }

    @Test
    void shouldUpholdProcessOrderingGuarantee() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        try (MyProcessorStep step = new MyProcessorStep(control, 0);){
            step.start(1);
            step.processors(4);
            int batches = 10;
            for (int i = 0; i < batches; ++i) {
                step.receive(i, i);
            }
            step.endOfUpstream();
            step.awaitCompleted();
            org.junit.jupiter.api.Assertions.assertEquals((int)batches, (int)step.nextExpected.get());
        }
    }

    @Test
    void tracePageCacheAccessOnProcess() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        DefaultPageCacheTracer cacheTracer = new DefaultPageCacheTracer();
        int batches = 10;
        CursorContextFactory contextFactory = new CursorContextFactory((PageCacheTracer)cacheTracer, EmptyVersionContextSupplier.EMPTY);
        try (MyProcessorStep step = new MyProcessorStep(control, 0, contextFactory);){
            step.start(1);
            for (int i = 0; i < batches; ++i) {
                step.receive(i, i);
            }
            step.endOfUpstream();
            step.awaitCompleted();
            org.junit.jupiter.api.Assertions.assertEquals((int)batches, (int)step.nextExpected.get());
        }
        Assertions.assertThat((long)cacheTracer.pins()).isEqualTo((long)batches);
        Assertions.assertThat((long)cacheTracer.unpins()).isEqualTo((long)batches);
    }

    @Test
    void shouldHaveTaskQueueSizeEqualToMaxNumberOfProcessors() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        CountDownLatch latch = new CountDownLatch(1);
        int processors = 2;
        final int maxProcessors = 5;
        Configuration configuration = new Configuration(){

            public int maxNumberOfWorkerThreads() {
                return maxProcessors;
            }
        };
        try (BlockingProcessorStep<Void> step = new BlockingProcessorStep<Void>(control, configuration, 2, latch);){
            step.start(1);
            step.processors(1);
            for (int i = 0; i < 2 + maxProcessors; ++i) {
                step.receive(i, null);
            }
            Future receiveFuture = this.t2.execute(ProcessorStepTest.receive(2, step));
            this.t2.get().waitUntilThreadState(new Thread.State[]{Thread.State.TIMED_WAITING});
            latch.countDown();
            receiveFuture.get();
        }
    }

    @Test
    void shouldRecycleDoneBatches() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        Mockito.when((Object)control.scheduler()).thenReturn((Object)ProcessorScheduler.SPAWN_THREAD);
        try (MyProcessorStep step = new MyProcessorStep(control, 0);){
            step.start(1);
            int batches = 10;
            for (int i = 0; i < batches; ++i) {
                step.receive(i, i);
            }
            step.endOfUpstream();
            step.awaitCompleted();
            ((StageControl)Mockito.verify((Object)control, (VerificationMode)Mockito.times((int)batches))).recycle(ArgumentMatchers.any());
        }
    }

    @Test
    public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenLast() throws InterruptedException {
        this.shouldBeAbleToPropagatePanicOnBlockedProcessors(2, 1);
    }

    @Test
    public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenNotLast() throws InterruptedException {
        this.shouldBeAbleToPropagatePanicOnBlockedProcessors(3, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldBeAbleToPropagatePanicOnBlockedProcessors(int numProcessors, int failingProcessorIndex) throws InterruptedException {
        final String exceptionMessage = "Failing just for fun";
        Configuration configuration = Configuration.DEFAULT;
        CountDownLatch latch = new CountDownLatch(1);
        TrackingPanicMonitor panicMonitor = new TrackingPanicMonitor();
        Stage stage = new Stage("Test", "Part", configuration, 1, ProcessorScheduler.SPAWN_THREAD, (StageExecution.PanicMonitor)panicMonitor);
        stage.add((Step)ProcessorStepTest.intProducer(configuration, stage, configuration.maxNumberOfWorkerThreads() * 2));
        BlockingProcessorStep<Integer> failingProcessor = null;
        for (int i = 0; i < numProcessors; ++i) {
            if (failingProcessorIndex == i) {
                failingProcessor = new BlockingProcessorStep<Integer>(stage.control(), configuration, 1, latch){

                    @Override
                    protected void process(Integer batch, BatchSender sender, CursorContext cursorContext) throws Throwable {
                        super.process(batch, sender, cursorContext);
                        throw new RuntimeException(exceptionMessage);
                    }
                };
                stage.add((Step)failingProcessor);
                continue;
            }
            stage.add(ProcessorStepTest.intProcessor(configuration, stage));
        }
        try {
            StageExecution execution = stage.execute();
            while (failingProcessor.stats().stat((Key)Keys.received_batches).asLong() < (long)(configuration.maxNumberOfWorkerThreads() + 1)) {
                Thread.sleep(10L);
            }
            latch.countDown();
            execution.awaitCompletion();
            RuntimeException exception = (RuntimeException)org.junit.jupiter.api.Assertions.assertThrows(RuntimeException.class, () -> ((StageExecution)execution).assertHealthy());
            org.junit.jupiter.api.Assertions.assertEquals((Object)exceptionMessage, (Object)exception.getMessage());
        }
        finally {
            stage.close();
        }
        org.junit.jupiter.api.Assertions.assertTrue((boolean)panicMonitor.hasReceivedPanic());
    }

    private static ProducerStep intProducer(Configuration configuration, Stage stage, final int batches) {
        return new ProducerStep(stage.control(), configuration){

            protected void process() {
                for (int i = 0; i < batches; ++i) {
                    this.sendDownstream(i);
                }
            }

            protected long position() {
                return 0L;
            }
        };
    }

    private static ProcessorStep<Integer> intProcessor(Configuration configuration, Stage stage) {
        return new ProcessorStep<Integer>(stage.control(), "processor", configuration, 1, CONTEXT_FACTORY, new StatsProvider[0]){

            protected void process(Integer batch, BatchSender sender, CursorContext cursorContext) {
                sender.send((Object)batch);
            }
        };
    }

    private static Callable<Void> receive(int processors, ProcessorStep<Void> step) {
        return () -> {
            step.receive((long)processors, null);
            return null;
        };
    }

    private static class MyProcessorStep
    extends ProcessorStep<Integer> {
        private final AtomicInteger nextExpected = new AtomicInteger();

        private MyProcessorStep(StageControl control, int maxProcessors) {
            this(control, maxProcessors, CONTEXT_FACTORY);
        }

        private MyProcessorStep(StageControl control, int maxProcessors, CursorContextFactory contextFactory) {
            super(control, "test", Configuration.DEFAULT, maxProcessors, contextFactory, new StatsProvider[0]);
        }

        protected void process(Integer batch, BatchSender sender, CursorContext cursorContext) {
            PageSwapper swapper = (PageSwapper)Mockito.mock(PageSwapper.class, (Answer)Answers.RETURNS_MOCKS);
            try (PinEvent pinEvent = cursorContext.getCursorTracer().beginPin(false, 1L, swapper);){
                pinEvent.hit();
            }
            cursorContext.getCursorTracer().unpin(1L, swapper);
            this.nextExpected.incrementAndGet();
        }
    }

    private static class BlockingProcessorStep<T>
    extends ProcessorStep<T> {
        private final CountDownLatch latch;

        BlockingProcessorStep(StageControl control, Configuration configuration, int maxProcessors, CountDownLatch latch) {
            super(control, "test", configuration, maxProcessors, CONTEXT_FACTORY, new StatsProvider[0]);
            this.latch = latch;
        }

        protected void process(T batch, BatchSender sender, CursorContext cursorContext) throws Throwable {
            this.latch.await();
        }
    }
}

