/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.internal.batchimport.staging;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.executor.ProcessorScheduler;
import org.neo4j.internal.batchimport.staging.BatchSender;
import org.neo4j.internal.batchimport.staging.ExecutionMonitor;
import org.neo4j.internal.batchimport.staging.ExecutionSupervisor;
import org.neo4j.internal.batchimport.staging.ExecutionSupervisors;
import org.neo4j.internal.batchimport.staging.ForkedProcessorStep;
import org.neo4j.internal.batchimport.staging.ProcessContext;
import org.neo4j.internal.batchimport.staging.ProcessorStep;
import org.neo4j.internal.batchimport.staging.PullingProducerStep;
import org.neo4j.internal.batchimport.staging.Stage;
import org.neo4j.internal.batchimport.staging.StageControl;
import org.neo4j.internal.batchimport.staging.StageExecution;
import org.neo4j.internal.batchimport.staging.Step;
import org.neo4j.internal.batchimport.staging.TrackingPanicMonitor;
import org.neo4j.internal.batchimport.stats.Key;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.internal.batchimport.stats.StatsProvider;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.context.CursorContextFactory;
import org.neo4j.io.pagecache.context.EmptyVersionContextSupplier;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
import org.neo4j.test.RandomSupport;
import org.neo4j.test.extension.Inject;
import org.neo4j.test.extension.RandomExtension;

@ExtendWith(value={RandomExtension.class})
class StageTest {
    private static final int TEST_BATCH_SIZE = 100;
    private static final CursorContextFactory CONTEXT_FACTORY = new CursorContextFactory(PageCacheTracer.NULL, EmptyVersionContextSupplier.EMPTY);
    @Inject
    private RandomSupport random;

    StageTest() {
    }

    @Test
    void shouldReceiveBatchesInOrder() {
        Configuration.Overridden config = new Configuration.Overridden(Configuration.DEFAULT){

            public int batchSize() {
                return 10;
            }
        };
        Stage stage = new Stage("Test stage", null, (Configuration)config, 1);
        long batches = 1000L;
        final long items = batches * (long)config.batchSize();
        stage.add((Step)new PullingProducerStep<ProcessContext>(stage.control(), (Configuration)config){
            private final Object theObject;
            private long i;
            {
                super(control, config);
                this.theObject = new Object();
            }

            protected Object nextBatchOrNull(long ticket, int batchSize, ProcessContext processContext) {
                if (this.i >= items) {
                    return null;
                }
                Object[] batch = new Object[batchSize];
                Arrays.fill(batch, this.theObject);
                this.i += (long)batchSize;
                return batch;
            }

            protected long position() {
                return 0L;
            }
        });
        for (int i = 0; i < 3; ++i) {
            stage.add((Step)new ReceiveOrderAssertingStep(stage.control(), "Step" + i, (Configuration)config, i, false));
        }
        stage.add((Step)new ReceiveOrderAssertingStep(stage.control(), "Final step", (Configuration)config, 0L, true));
        StageExecution execution = stage.execute();
        for (Step step : execution.steps()) {
            step.processors(1);
        }
        new ExecutionSupervisor(ExecutionMonitor.INVISIBLE).supervise(execution);
        for (Step step : execution.steps()) {
            Assertions.assertEquals((long)batches, (long)step.stats().stat((Key)Keys.done_batches).asLong(), (String)("For " + step));
        }
        stage.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void processContextResources() throws InterruptedException {
        Configuration.Overridden config = new Configuration.Overridden(Configuration.DEFAULT);
        final AtomicLong globalCounterAccumulator = new AtomicLong();
        int customTickets = 1000;
        final CountDownLatch processedBatches = new CountDownLatch(customTickets + 1);
        ExecutorService executorService = Executors.newCachedThreadPool();
        try (Stage stage = new Stage("Test stage", null, (Configuration)config, 1, (job, name) -> executorService.submit(job), StageExecution.DEFAULT_PANIC_MONITOR);){
            stage.add((Step)new PullingProducerStep<TestProcessContext>(stage.control(), (Configuration)config){

                protected Object nextBatchOrNull(long ticket, int batchSize, TestProcessContext processContext) {
                    long numberOfBatches = processContext.batches.decrementAndGet();
                    if (numberOfBatches < 0L) {
                        return null;
                    }
                    ++processContext.counter;
                    Object[] batch = new Object[batchSize];
                    Arrays.fill(batch, new Object());
                    return batch;
                }

                protected TestProcessContext processContext() {
                    return new TestProcessContext(globalCounterAccumulator, processedBatches);
                }

                protected long position() {
                    return 0L;
                }
            });
            stage.add((Step)new ProcessorStep<Object>(stage.control(), "consumer", (Configuration)config, 1, CONTEXT_FACTORY, new StatsProvider[0]){

                protected void process(Object batch, BatchSender sender, CursorContext cursorContext) {
                }
            });
            StageExecution execution = stage.execute();
            for (Step step : execution.steps()) {
                step.processors(Runtime.getRuntime().availableProcessors());
            }
            for (int i = 0; i < customTickets; ++i) {
                for (Step step : execution.steps()) {
                    step.receive((long)i, null);
                }
            }
            new ExecutionSupervisor(ExecutionMonitor.INVISIBLE).supervise(execution);
            Assertions.assertTrue((boolean)processedBatches.await(5L, TimeUnit.MINUTES));
            Assertions.assertEquals((long)((customTickets + 1) * 100), (long)globalCounterAccumulator.get());
        }
        finally {
            executorService.shutdown();
        }
    }

    @Test
    void shouldCloseOnPanic() {
        final Configuration configuration = Configuration.DEFAULT;
        TrackingPanicMonitor panicMonitor = new TrackingPanicMonitor();
        Stage stage = new Stage("test close on panic", null, configuration, this.random.nextBoolean() ? 1 : 0, ProcessorScheduler.SPAWN_THREAD, panicMonitor){
            {
                super(name, part, config, orderingGuarantees, scheduler, panicMonitor);
                this.add((Step)new PullingProducerStep<ProcessContext>(this.control(), configuration){
                    private volatile long ticket;
                    private final ChaosMonkey chaosMonkey;
                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected Object nextBatchOrNull(long ticket, int batchSize, ProcessContext processContext) {
                        this.chaosMonkey.makeChaos();
                        this.ticket = ticket;
                        return new int[batchSize];
                    }

                    protected long position() {
                        return this.ticket;
                    }
                });
                this.add((Step)new ProcessorStep<Object>(this.control(), "processor", configuration, 2, CONTEXT_FACTORY, new StatsProvider[0]){
                    private final ChaosMonkey chaosMonkey;
                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected void process(Object batch, BatchSender sender, CursorContext cursorContext) {
                        this.chaosMonkey.makeChaos();
                        sender.send(batch);
                    }
                });
                this.add((Step)new ForkedProcessorStep<Object>(this.control(), "forked processor", configuration, new StatsProvider[0]){
                    private final ChaosMonkey chaosMonkey;
                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected void forkedProcess(int id, int processors, Object batch) {
                        this.chaosMonkey.makeChaos();
                    }
                });
                this.add((Step)new ProcessorStep<Object>(this.control(), "consumer", configuration, 1, CONTEXT_FACTORY, new StatsProvider[0]){
                    private final ChaosMonkey chaosMonkey;
                    {
                        this.chaosMonkey = new ChaosMonkey();
                    }

                    protected void process(Object batch, BatchSender sender, CursorContext cursorContext) {
                        this.chaosMonkey.makeChaos();
                    }
                });
            }
        };
        Assertions.assertThrows(RuntimeException.class, () -> ExecutionSupervisors.superviseDynamicExecution((Stage)stage));
        Assertions.assertTrue((boolean)panicMonitor.hasReceivedPanic());
        Assertions.assertTrue((boolean)panicMonitor.getReceivedPanic().getMessage().contains("Chaos monkey"));
    }

    private static class ReceiveOrderAssertingStep
    extends ProcessorStep<Object> {
        private final AtomicLong lastTicket = new AtomicLong();
        private final long processingTime;
        private final boolean endOfLine;

        ReceiveOrderAssertingStep(StageControl control, String name, Configuration config, long processingTime, boolean endOfLine) {
            super(control, name, config, 1, CONTEXT_FACTORY, new StatsProvider[0]);
            this.processingTime = processingTime;
            this.endOfLine = endOfLine;
        }

        public long receive(long ticket, Object batch) {
            Assertions.assertEquals((long)this.lastTicket.getAndIncrement(), (long)ticket, (String)("For " + batch + " in " + this.name()));
            return super.receive(ticket, batch);
        }

        protected void process(Object batch, BatchSender sender, CursorContext cursorContext) {
            try {
                Thread.sleep(this.processingTime);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (!this.endOfLine) {
                sender.send(batch);
            }
        }
    }

    private static class TestProcessContext
    implements ProcessContext {
        private final AtomicLong globalCounterAccumulator;
        private final CountDownLatch processedBatches;
        private final AtomicLong batches = new AtomicLong(100L);
        long counter;

        TestProcessContext(AtomicLong globalCounter, CountDownLatch processedBatches) {
            this.globalCounterAccumulator = globalCounter;
            this.processedBatches = processedBatches;
        }

        public void close() {
            this.globalCounterAccumulator.getAndAdd(this.counter);
            this.processedBatches.countDown();
        }
    }

    private class ChaosMonkey {
        private ChaosMonkey() {
        }

        void makeChaos() {
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(0, 10));
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if ((double)ThreadLocalRandom.current().nextFloat() < 0.01) {
                throw new RuntimeException("Chaos monkey causing failure");
            }
        }
    }
}

