package org.neo4j.internal.batchimport.staging;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.stats.Keys;
import org.neo4j.internal.batchimport.stats.StatsProvider;
import org.neo4j.internal.batchimport.stats.StepStats;
import org.neo4j.internal.helpers.collection.Iterables;
import org.neo4j.io.IOUtils;
import org.neo4j.io.pagecache.context.CursorContext;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;

/* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStepTest.class */
class ForkedProcessorStepTest {
    private static final int TIMEOUT_MINUTES = 2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStepTest$Batch.class */
    public static class Batch {
        private final boolean[] processed;

        Batch(int i) {
            this.processed = new boolean[i];
        }

        void processedBy(int i) {
            Assertions.assertFalse(this.processed[i]);
            this.processed[i] = true;
        }
    }

    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStepTest$BatchProcessor.class */
    private static class BatchProcessor extends ForkedProcessorStep<Batch> {
        BatchProcessor(StageControl stageControl, int i) {
            super(stageControl, "PROCESSOR", ForkedProcessorStepTest.config(i), new StatsProvider[0]);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void forkedProcess(int i, int i2, Batch batch) {
            batch.processedBy(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStepTest$StressStage.class */
    public static class StressStage extends Stage {
        StressStage(Configuration configuration, int i, final int i2) {
            super("Stress", (String) null, configuration, i);
            add(new PullingProducerStep(control(), configuration) { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.StressStage.1
                protected long position() {
                    return 0L;
                }

                protected Object nextBatchOrNull(long j, int i3) {
                    if (j < i2) {
                        return Long.valueOf(j);
                    }
                    return null;
                }
            });
            add(new ProcessorStep<Long>(control(), "Yeah", configuration, 3, PageCacheTracer.NULL, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.StressStage.2
                /* JADX INFO: Access modifiers changed from: protected */
                public void process(Long l, BatchSender batchSender, CursorContext cursorContext) throws Throwable {
                    Thread.sleep(1L);
                    batchSender.send(l);
                }
            });
            add(new ForkedProcessorStep<Long>(control(), "Subject", configuration, new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.StressStage.3
                /* JADX INFO: Access modifiers changed from: protected */
                public void forkedProcess(int i3, int i4, Long l) throws Throwable {
                    Thread.sleep(1L);
                }
            });
            add(new DeadEndStep(control()));
        }
    }

    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStepTest$TrackingStep.class */
    private static class TrackingStep implements Step<Batch> {
        private final AtomicLong received = new AtomicLong();

        private TrackingStep() {
        }

        public void receivePanic(Throwable th) {
        }

        public void start(int i) {
        }

        public String name() {
            return "END";
        }

        public long receive(long j, Batch batch) {
            int i = 0;
            for (int i2 = 0; i2 < batch.processed.length; i2++) {
                if (batch.processed[i2]) {
                    i++;
                }
            }
            Assertions.assertEquals(batch.processed.length, i);
            if (this.received.compareAndSet(j - 1, j)) {
                return 0L;
            }
            this.received.get();
            Assertions.fail("Hmm " + j + " " + j);
            return 0L;
        }

        public StepStats stats() {
            return null;
        }

        public void endOfUpstream() {
        }

        public boolean isCompleted() {
            return false;
        }

        public void awaitCompleted() {
            throw new UnsupportedOperationException();
        }

        public void setDownstream(Step<?> step) {
        }

        public void close() {
        }
    }

    ForkedProcessorStepTest() {
    }

    @Test
    void shouldProcessAllSingleThreaded() throws Exception {
        BatchProcessor batchProcessor = new BatchProcessor((StageControl) Mockito.mock(StageControl.class), 10);
        TrackingStep trackingStep = new TrackingStep();
        batchProcessor.setDownstream(trackingStep);
        int processors = batchProcessor.processors(10 - batchProcessor.processors(0));
        batchProcessor.start(0);
        for (int i = 1; i <= 10; i++) {
            batchProcessor.receive(i, new Batch(processors));
        }
        batchProcessor.endOfUpstream();
        batchProcessor.awaitCompleted();
        trackingStep.close();
        batchProcessor.close();
        Assertions.assertEquals(10, trackingStep.received.get());
    }

    @Timeout(value = 2, unit = TimeUnit.MINUTES)
    @Test
    void shouldProcessAllBatchesOnSingleCoreSystems() throws Exception {
        BatchProcessor batchProcessor = new BatchProcessor((StageControl) Mockito.mock(StageControl.class), 1);
        TrackingStep trackingStep = new TrackingStep();
        batchProcessor.setDownstream(trackingStep);
        batchProcessor.start(0);
        for (int i = 1; i <= 10; i++) {
            batchProcessor.receive(i, new Batch(1));
        }
        batchProcessor.endOfUpstream();
        batchProcessor.awaitCompleted();
        batchProcessor.close();
        Assertions.assertEquals(10, trackingStep.received.get());
    }

    @Test
    void mustNotDetachProcessorsFromBatchChains() throws Exception {
        BatchProcessor batchProcessor = new BatchProcessor((StageControl) Mockito.mock(StageControl.class), 1);
        TrackingStep trackingStep = new TrackingStep();
        batchProcessor.setDownstream(trackingStep);
        batchProcessor.processors(1 - batchProcessor.processors(0));
        batchProcessor.start(0);
        for (int i = 1; i <= 10; i++) {
            batchProcessor.receive(i, new Batch(1));
        }
        batchProcessor.endOfUpstream();
        batchProcessor.awaitCompleted();
        batchProcessor.close();
        Assertions.assertEquals(10, trackingStep.received.get());
    }

    @Test
    void shouldProcessAllMultiThreadedAndWithChangingProcessorCount() throws Exception {
        BatchProcessor batchProcessor = new BatchProcessor((StageControl) Mockito.mock(StageControl.class), Runtime.getRuntime().availableProcessors());
        TrackingStep trackingStep = new TrackingStep();
        batchProcessor.setDownstream(trackingStep);
        batchProcessor.start(0);
        AtomicLong atomicLong = new AtomicLong();
        Thread[] threadArr = new Thread[3];
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        for (int i = 0; i < threadArr.length; i++) {
            threadArr[i] = new Thread(() -> {
                ThreadLocalRandom current = ThreadLocalRandom.current();
                while (!atomicBoolean.get()) {
                    synchronized (atomicLong) {
                        if (current.nextFloat() < 0.1d) {
                            batchProcessor.processors(current.nextInt(-2, 4));
                        }
                        batchProcessor.receive(atomicLong.incrementAndGet(), new Batch(batchProcessor.processors(0)));
                    }
                }
            });
            threadArr[i].start();
        }
        while (trackingStep.received.get() < 200) {
            Thread.sleep(10L);
        }
        atomicBoolean.set(true);
        for (Thread thread : threadArr) {
            thread.join();
        }
        batchProcessor.endOfUpstream();
        batchProcessor.close();
    }

    @Test
    void shouldKeepForkedOrderIntactWhenChangingProcessorCount() throws Exception {
        final AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(100);
        SimpleStageControl simpleStageControl = new SimpleStageControl();
        ForkedProcessorStep<int[]> forkedProcessorStep = new ForkedProcessorStep<int[]>(simpleStageControl, "Processor", config(Runtime.getRuntime().availableProcessors()), new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void forkedProcess(int i, int i2, int[] iArr) throws InterruptedException {
                int i3 = iArr[0];
                Thread.sleep(ThreadLocalRandom.current().nextInt(10));
                for (int i4 = 1; i4 < iArr.length; i4++) {
                    if (iArr[i4] % i2 == i) {
                        Assertions.assertTrue(atomicIntegerArray.compareAndSet(iArr[i4], i3, i3 + 1), "I am " + i + ". Was expecting " + i3 + " for " + iArr[i4] + " but was " + atomicIntegerArray.get(iArr[i4]));
                    }
                }
            }
        };
        DeadEndStep deadEndStep = new DeadEndStep(simpleStageControl);
        forkedProcessorStep.setDownstream(deadEndStep);
        forkedProcessorStep.start(0);
        deadEndStep.start(0);
        ThreadLocalRandom current = ThreadLocalRandom.current();
        for (int i = 0; i < 200; i++) {
            if (current.nextFloat() < 0.1d) {
                forkedProcessorStep.processors(current.nextInt(-2, 4));
            }
            int[] iArr = new int[100];
            iArr[0] = i;
            for (int i2 = 1; i2 < iArr.length; i2++) {
                iArr[i2] = i2 - 1;
            }
            forkedProcessorStep.receive(i, iArr);
        }
        forkedProcessorStep.endOfUpstream();
        forkedProcessorStep.awaitCompleted();
        forkedProcessorStep.close();
        deadEndStep.close();
    }

    @Test
    void shouldPanicOnFailure() throws Exception {
        SimpleStageControl simpleStageControl = new SimpleStageControl();
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        final RuntimeException runtimeException = new RuntimeException();
        ForkedProcessorStep<Void> forkedProcessorStep = new ForkedProcessorStep<Void>(simpleStageControl, "Processor", config(availableProcessors), new StatsProvider[0]) { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.2
            /* JADX INFO: Access modifiers changed from: protected */
            public void forkedProcess(int i, int i2, Void r5) throws Throwable {
                throw runtimeException;
            }
        };
        simpleStageControl.steps(forkedProcessorStep);
        forkedProcessorStep.start(0);
        forkedProcessorStep.receive(1L, (Object) null);
        forkedProcessorStep.awaitCompleted();
        try {
            simpleStageControl.assertHealthy();
        } catch (Exception e) {
            Assertions.assertSame(runtimeException, e);
        }
    }

    @Timeout(value = 2, unit = TimeUnit.MINUTES)
    @Test
    void shouldBeAbleToProgressUnderStressfulProcessorChangesWhenOrdered() throws Exception {
        shouldBeAbleToProgressUnderStressfulProcessorChanges(1);
    }

    @Timeout(value = 2, unit = TimeUnit.MINUTES)
    @Test
    void shouldBeAbleToProgressUnderStressfulProcessorChangesWhenUnordered() throws Exception {
        shouldBeAbleToProgressUnderStressfulProcessorChanges(0);
    }

    private static void shouldBeAbleToProgressUnderStressfulProcessorChanges(int i) throws Exception {
        final int availableProcessors = Runtime.getRuntime().availableProcessors() * 10;
        StageExecution execute = new StressStage(new Configuration.Overridden(Configuration.DEFAULT) { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.3
            public int maxNumberOfProcessors() {
                return availableProcessors;
            }
        }, i, 100).execute();
        List asList = Iterables.asList(execute.steps());
        ((Step) asList.get(1)).processors(availableProcessors / 3);
        ThreadLocalRandom current = ThreadLocalRandom.current();
        while (execute.stillExecuting()) {
            ((Step) asList.get(TIMEOUT_MINUTES)).processors(current.nextInt(-2, 5));
            Thread.sleep(1L);
        }
        execute.assertHealthy();
        Assertions.assertEquals(100, ((Step) asList.get(asList.size() - 1)).stats().stat(Keys.done_batches).asLong());
        IOUtils.closeAllSilently(asList);
    }

    private static Configuration config(final int i) {
        return new Configuration() { // from class: org.neo4j.internal.batchimport.staging.ForkedProcessorStepTest.4
            public int maxNumberOfProcessors() {
                return i;
            }
        };
    }
}
