/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.internal.batchimport.staging;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.staging.BatchSender;
import org.neo4j.internal.batchimport.staging.DeadEndStep;
import org.neo4j.internal.batchimport.staging.ForkedProcessorStep;
import org.neo4j.internal.batchimport.staging.ProcessContext;
import org.neo4j.internal.batchimport.staging.ProcessorStep;
import org.neo4j.internal.batchimport.staging.PullingProducerStep;
import org.neo4j.internal.batchimport.staging.SimpleStageControl;
import org.neo4j.internal.batchimport.staging.Stage;
import org.neo4j.internal.batchimport.staging.StageControl;
import org.neo4j.internal.batchimport.staging.StageExecution;
import org.neo4j.internal.batchimport.staging.Step;
import org.neo4j.internal.batchimport.stats.Key;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.internal.batchimport.stats.StatsProvider;
import org.neo4j.internal.batchimport.stats.StepStats;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.io.IOUtils;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.context.CursorContextFactory;
import org.neo4j.io.pagecache.context.EmptyVersionContextSupplier;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;

class ForkedProcessorStepTest {
    private static final int TIMEOUT_MINUTES = 2;
    private static final CursorContextFactory CONTEXT_FACTORY = new CursorContextFactory(PageCacheTracer.NULL, EmptyVersionContextSupplier.EMPTY);

    ForkedProcessorStepTest() {
    }

    @Test
    void shouldProcessAllSingleThreaded() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int maxProcessors = 10;
        int batches = 10;
        BatchProcessor step = new BatchProcessor(control, maxProcessors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        int processors = step.processors(maxProcessors - step.processors(0));
        step.start(0);
        for (int i = 1; i <= batches; ++i) {
            step.receive(i, new Batch(processors));
        }
        step.endOfUpstream();
        step.awaitCompleted();
        downstream.close();
        step.close();
        Assertions.assertEquals((long)batches, (long)downstream.received.get());
    }

    @Test
    @Timeout(value=2L, unit=TimeUnit.MINUTES)
    void shouldProcessAllBatchesOnSingleCoreSystems() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int processors = 1;
        int batches = 10;
        BatchProcessor step = new BatchProcessor(control, processors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        step.start(0);
        for (int i = 1; i <= batches; ++i) {
            step.receive(i, new Batch(processors));
        }
        step.endOfUpstream();
        step.awaitCompleted();
        step.close();
        Assertions.assertEquals((long)batches, (long)downstream.received.get());
    }

    @Test
    void mustNotDetachProcessorsFromBatchChains() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int processors = 1;
        int batches = 10;
        BatchProcessor step = new BatchProcessor(control, processors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        int delta = processors - step.processors(0);
        step.processors(delta);
        step.start(0);
        for (int i = 1; i <= batches; ++i) {
            step.receive(i, new Batch(processors));
        }
        step.endOfUpstream();
        step.awaitCompleted();
        step.close();
        Assertions.assertEquals((long)batches, (long)downstream.received.get());
    }

    @Test
    void shouldProcessAllMultiThreadedAndWithChangingProcessorCount() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        BatchProcessor step = new BatchProcessor(control, availableProcessors);
        TrackingStep downstream = new TrackingStep();
        step.setDownstream(downstream);
        step.start(0);
        AtomicLong nextTicket = new AtomicLong();
        Thread[] submitters = new Thread[3];
        AtomicBoolean end = new AtomicBoolean();
        for (int i = 0; i < submitters.length; ++i) {
            submitters[i] = new Thread(() -> {
                ThreadLocalRandom random = ThreadLocalRandom.current();
                while (!end.get()) {
                    AtomicLong atomicLong = nextTicket;
                    synchronized (atomicLong) {
                        if ((double)random.nextFloat() < 0.1) {
                            step.processors(random.nextInt(-2, 4));
                        }
                        long ticket = nextTicket.incrementAndGet();
                        Batch batch = new Batch(step.processors(0));
                        step.receive(ticket, batch);
                    }
                }
            });
            submitters[i].start();
        }
        while (downstream.received.get() < 200L) {
            Thread.sleep(10L);
        }
        end.set(true);
        for (Thread submitter : submitters) {
            submitter.join();
        }
        step.endOfUpstream();
        step.close();
    }

    @Test
    void shouldKeepForkedOrderIntactWhenChangingProcessorCount() throws Exception {
        int length = 100;
        final AtomicIntegerArray reference = new AtomicIntegerArray(length);
        SimpleStageControl control = new SimpleStageControl();
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        ForkedProcessorStep<int[]> step = new ForkedProcessorStep<int[]>((StageControl)control, "Processor", ForkedProcessorStepTest.config(availableProcessors), new StatsProvider[0]){

            protected void forkedProcess(int id, int processors, int[] batch) throws InterruptedException {
                int ticket = batch[0];
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                for (int i = 1; i < batch.length; ++i) {
                    if (batch[i] % processors != id) continue;
                    boolean compareAndSet = reference.compareAndSet(batch[i], ticket, ticket + 1);
                    Assertions.assertTrue((boolean)compareAndSet, (String)("I am " + id + ". Was expecting " + ticket + " for " + batch[i] + " but was " + reference.get(batch[i])));
                }
            }
        };
        DeadEndStep downstream = new DeadEndStep(control, CONTEXT_FACTORY);
        step.setDownstream((Step)downstream);
        step.start(0);
        downstream.start(0);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int ticket = 0; ticket < 200; ++ticket) {
            if ((double)random.nextFloat() < 0.1) {
                step.processors(random.nextInt(-2, 4));
            }
            int[] batch = new int[length];
            batch[0] = ticket;
            for (int j = 1; j < batch.length; ++j) {
                batch[j] = j - 1;
            }
            step.receive((long)ticket, (Object)batch);
        }
        step.endOfUpstream();
        step.awaitCompleted();
        step.close();
        downstream.close();
    }

    @Test
    void shouldPanicOnFailure() throws Exception {
        SimpleStageControl control = new SimpleStageControl();
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        final RuntimeException testPanic = new RuntimeException();
        ForkedProcessorStep<Void> step = new ForkedProcessorStep<Void>((StageControl)control, "Processor", ForkedProcessorStepTest.config(availableProcessors), new StatsProvider[0]){

            protected void forkedProcess(int id, int processors, Void batch) throws Throwable {
                throw testPanic;
            }
        };
        control.steps(new Step[]{step});
        step.start(0);
        step.receive(1L, null);
        step.awaitCompleted();
        try {
            control.assertHealthy();
        }
        catch (Exception e) {
            Assertions.assertSame((Object)testPanic, (Object)e);
        }
    }

    @Test
    @Timeout(value=2L, unit=TimeUnit.MINUTES)
    void shouldBeAbleToProgressUnderStressfulProcessorChangesWhenOrdered() throws Exception {
        ForkedProcessorStepTest.shouldBeAbleToProgressUnderStressfulProcessorChanges(1);
    }

    @Test
    @Timeout(value=2L, unit=TimeUnit.MINUTES)
    void shouldBeAbleToProgressUnderStressfulProcessorChangesWhenUnordered() throws Exception {
        ForkedProcessorStepTest.shouldBeAbleToProgressUnderStressfulProcessorChanges(0);
    }

    private static void shouldBeAbleToProgressUnderStressfulProcessorChanges(int orderingGuarantees) throws Exception {
        int batches = 100;
        final int processors = Runtime.getRuntime().availableProcessors() * 10;
        Configuration.Overridden config = new Configuration.Overridden(Configuration.DEFAULT){

            public int maxNumberOfWorkerThreads() {
                return processors;
            }
        };
        StressStage stage = new StressStage((Configuration)config, orderingGuarantees, batches);
        StageExecution execution = stage.execute();
        List steps = Iterables.asList((Iterable)execution.steps());
        ((Step)steps.get(1)).processors(processors / 3);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        while (execution.stillExecuting()) {
            ((Step)steps.get(2)).processors(random.nextInt(-2, 5));
            Thread.sleep(1L);
        }
        execution.assertHealthy();
        Assertions.assertEquals((long)batches, (long)((Step)steps.get(steps.size() - 1)).stats().stat((Key)Keys.done_batches).asLong());
        IOUtils.closeAllSilently((Collection)steps);
    }

    private static Configuration config(final int processors) {
        return new Configuration(){

            public int maxNumberOfWorkerThreads() {
                return processors;
            }
        };
    }

    private static class BatchProcessor
    extends ForkedProcessorStep<Batch> {
        BatchProcessor(StageControl control, int processors) {
            super(control, "PROCESSOR", ForkedProcessorStepTest.config(processors), new StatsProvider[0]);
        }

        protected void forkedProcess(int id, int processors, Batch batch) {
            batch.processedBy(id);
        }
    }

    private static class TrackingStep
    implements Step<Batch> {
        private final AtomicLong received = new AtomicLong();

        private TrackingStep() {
        }

        public void receivePanic(Throwable cause) {
        }

        public void start(int orderingGuarantees) {
        }

        public String name() {
            return "END";
        }

        public long receive(long ticket, Batch batch) {
            int count = 0;
            for (int i = 0; i < batch.processed.length; ++i) {
                if (!batch.processed[i]) continue;
                ++count;
            }
            Assertions.assertEquals((int)batch.processed.length, (int)count);
            if (!this.received.compareAndSet(ticket - 1L, ticket)) {
                Assertions.fail((String)("Hmm " + ticket + " " + this.received.get()));
            }
            return 0L;
        }

        public StepStats stats() {
            return null;
        }

        public void endOfUpstream() {
        }

        public boolean isCompleted() {
            return false;
        }

        public boolean awaitCompleted(long time, TimeUnit unit) {
            throw new UnsupportedOperationException();
        }

        public void setDownstream(Step<?> downstreamStep) {
        }

        public void close() {
        }
    }

    private static class Batch {
        private final boolean[] processed;

        Batch(int processors) {
            this.processed = new boolean[processors];
        }

        void processedBy(int id) {
            Assertions.assertFalse((boolean)this.processed[id]);
            this.processed[id] = true;
        }
    }

    private static class StressStage
    extends Stage {
        StressStage(Configuration config, int orderingGuarantees, final int batches) {
            super("Stress", null, config, orderingGuarantees);
            this.add((Step)new PullingProducerStep<ProcessContext>(this.control(), config){

                protected long position() {
                    return 0L;
                }

                protected Object nextBatchOrNull(long ticket, int batchSize, ProcessContext processContext) {
                    return ticket < (long)batches ? Long.valueOf(ticket) : null;
                }
            });
            this.add((Step)new ProcessorStep<Long>(this.control(), "Yeah", config, 3, CONTEXT_FACTORY, new StatsProvider[0]){

                protected void process(Long batch, BatchSender sender, CursorContext cursorContext) throws Throwable {
                    Thread.sleep(1L);
                    sender.send((Object)batch);
                }
            });
            this.add((Step)new ForkedProcessorStep<Long>(this.control(), "Subject", config, new StatsProvider[0]){

                protected void forkedProcess(int id, int processors, Long batch) throws Throwable {
                    Thread.sleep(1L);
                }
            });
            this.add((Step)new DeadEndStep(this.control(), CONTEXT_FACTORY));
        }
    }
}

