package org.neo4j.internal.batchimport.staging;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.StampedLock;
import org.neo4j.internal.batchimport.Configuration;
import org.neo4j.internal.batchimport.executor.ParkStrategy;
import org.neo4j.internal.batchimport.stats.StatsProvider;

/* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStep.class */
public abstract class ForkedProcessorStep<T> extends AbstractStep<T> {
    private final Object[] forkedProcessors;
    private volatile int numberOfForkedProcessors;
    private final AtomicReference<ForkedProcessorStep<T>.Unit> head;
    private final AtomicReference<ForkedProcessorStep<T>.Unit> tail;
    private final Thread downstreamSender;
    private volatile int targetNumberOfProcessors;
    private final int maxProcessors;
    private final int maxQueueLength;
    private volatile Thread receiverThread;
    private final StampedLock stripingLock;
    private static final VarHandle COMPLETED_PROCESSORS;
    private static final VarHandle PROCESSING_TIME;

    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStep$CompletedBatchesSender.class */
    private final class CompletedBatchesSender extends Thread {
        CompletedBatchesSender(String str) {
            super(str);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ForkedProcessorStep<T>.Unit unit = ForkedProcessorStep.this.tail.get();
                while (!ForkedProcessorStep.this.isCompleted() && !ForkedProcessorStep.this.isPanic()) {
                    ForkedProcessorStep<T>.Unit unit2 = ((Unit) unit).next;
                    if (unit2 == null || !unit2.isCompleted()) {
                        Thread thread = ForkedProcessorStep.this.receiverThread;
                        if (thread != null) {
                            AbstractStep.PARK.unpark(thread);
                        }
                        AbstractStep.PARK.park(this);
                    } else {
                        if (ForkedProcessorStep.this.downstream != null) {
                            ForkedProcessorStep.this.sendDownstream(unit2);
                        } else {
                            ForkedProcessorStep.this.control.recycle(((Unit) unit2).batch);
                        }
                        unit = unit2;
                        ForkedProcessorStep.this.tail.set(unit);
                        ForkedProcessorStep.this.queuedBatches.decrementAndGet();
                        ForkedProcessorStep.this.doneBatches.incrementAndGet();
                        ForkedProcessorStep.this.totalProcessingTime.add(((Unit) unit2).processingTime);
                        ForkedProcessorStep.this.checkNotifyEndDownstream();
                    }
                }
            } catch (Throwable th) {
                ForkedProcessorStep.this.issuePanic(th, false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStep$ForkedProcessor.class */
    public class ForkedProcessor extends Thread {
        private final int id;
        private ForkedProcessorStep<T>.Unit current;

        ForkedProcessor(int i, ForkedProcessorStep<T>.Unit unit) {
            super(ForkedProcessorStep.this.name() + "-" + i);
            this.id = i;
            this.current = unit;
            start();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!ForkedProcessorStep.this.isCompleted() && !ForkedProcessorStep.this.isPanic()) {
                try {
                    ForkedProcessorStep<T>.Unit unit = ((Unit) this.current).next;
                    if (unit != null) {
                        if (this.id < ((Unit) unit).processors) {
                            long nanoTime = System.nanoTime();
                            ForkedProcessorStep.this.forkedProcess(this.id, ((Unit) unit).processors, ((Unit) unit).batch);
                            unit.processorDone(System.nanoTime() - nanoTime);
                        }
                        this.current = unit;
                    } else {
                        AbstractStep.PARK.park(this);
                    }
                } catch (Throwable th) {
                    ForkedProcessorStep.this.issuePanic(th, false);
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/neo4j/internal/batchimport/staging/ForkedProcessorStep$Unit.class */
    public class Unit {
        private final long ticket;
        private final T batch;
        private final int processors;
        private volatile int completedProcessors;
        private volatile long processingTime;
        private volatile ForkedProcessorStep<T>.Unit next;
        static final /* synthetic */ boolean $assertionsDisabled;

        Unit(long j, T t, int i) {
            this.ticket = j;
            this.batch = t;
            this.processors = i;
        }

        boolean isCompleted() {
            return this.processors > 0 && this.processors == this.completedProcessors;
        }

        void processorDone(long j) {
            ForkedProcessorStep.PROCESSING_TIME.getAndAdd(this, j);
            int andAdd = ForkedProcessorStep.COMPLETED_PROCESSORS.getAndAdd(this, 1);
            if (!$assertionsDisabled && andAdd >= this.processors) {
                throw new AssertionError(andAdd + " vs " + this.processors + " for " + this.ticket);
            }
        }

        public String toString() {
            return String.format("Unit[%d/%d]", Integer.valueOf(this.completedProcessors), Integer.valueOf(this.processors));
        }

        static {
            $assertionsDisabled = !ForkedProcessorStep.class.desiredAssertionStatus();
        }
    }

    protected ForkedProcessorStep(StageControl stageControl, String str, Configuration configuration, StatsProvider... statsProviderArr) {
        super(stageControl, str, configuration, statsProviderArr);
        this.targetNumberOfProcessors = 1;
        this.maxProcessors = Integer.max(1, (int) (configuration.maxNumberOfProcessors() * 0.7d));
        this.forkedProcessors = new Object[this.maxProcessors];
        this.stripingLock = new StampedLock();
        Unit unit = new Unit(-1L, null, 0);
        this.head = new AtomicReference<>(unit);
        this.tail = new AtomicReference<>(unit);
        this.stripingLock.unlock(applyProcessorCount(this.stripingLock.readLock()));
        this.downstreamSender = new CompletedBatchesSender(str + " [CompletedBatchSender]");
        this.maxQueueLength = this.maxProcessors + (configuration.maxQueueSize() * 2);
    }

    private long applyProcessorCount(long j) {
        if (this.numberOfForkedProcessors != this.targetNumberOfProcessors) {
            this.stripingLock.unlock(j);
            j = this.stripingLock.writeLock();
            awaitAllCompleted();
            int i = this.targetNumberOfProcessors;
            while (this.numberOfForkedProcessors < i) {
                if (this.forkedProcessors[this.numberOfForkedProcessors] == null) {
                    this.forkedProcessors[this.numberOfForkedProcessors] = new ForkedProcessor(this.numberOfForkedProcessors, this.tail.get());
                }
                this.numberOfForkedProcessors++;
            }
            if (this.numberOfForkedProcessors > i) {
                this.numberOfForkedProcessors = i;
            }
        }
        return j;
    }

    private void awaitAllCompleted() {
        while (this.head.get() != this.tail.get()) {
            ParkStrategy parkStrategy = PARK;
            Thread currentThread = Thread.currentThread();
            this.receiverThread = currentThread;
            parkStrategy.park(currentThread);
        }
    }

    @Override // org.neo4j.internal.batchimport.Parallelizable
    public int processors(int i) {
        this.targetNumberOfProcessors = Integer.max(1, Integer.min(this.targetNumberOfProcessors + i, this.maxProcessors));
        return this.targetNumberOfProcessors;
    }

    @Override // org.neo4j.internal.batchimport.staging.Step
    public int maxProcessors() {
        return this.maxProcessors;
    }

    @Override // org.neo4j.internal.batchimport.staging.AbstractStep, org.neo4j.internal.batchimport.staging.Step
    public void start(int i) {
        super.start(i);
        this.downstreamSender.start();
    }

    @Override // org.neo4j.internal.batchimport.staging.Step
    public long receive(long j, T t) {
        long nanoTime = System.nanoTime();
        while (this.queuedBatches.get() >= this.maxQueueLength && !isPanic()) {
            ParkStrategy parkStrategy = PARK;
            Thread currentThread = Thread.currentThread();
            this.receiverThread = currentThread;
            parkStrategy.park(currentThread);
        }
        long applyProcessorCount = applyProcessorCount(this.stripingLock.readLock());
        this.queuedBatches.incrementAndGet();
        ForkedProcessorStep<T>.Unit unit = new Unit(j, t, this.numberOfForkedProcessors);
        ((Unit) this.head.getAndSet(unit)).next = unit;
        this.stripingLock.unlock(applyProcessorCount);
        return System.nanoTime() - nanoTime;
    }

    protected abstract void forkedProcess(int i, int i2, T t) throws Throwable;

    void sendDownstream(ForkedProcessorStep<T>.Unit unit) {
        this.downstreamIdleTime.add(this.downstream.receive(((Unit) unit).ticket, ((Unit) unit).batch));
    }

    @Override // org.neo4j.internal.batchimport.staging.AbstractStep, org.neo4j.internal.batchimport.staging.Step, java.lang.AutoCloseable
    public void close() throws Exception {
        Arrays.fill(this.forkedProcessors, (Object) null);
        super.close();
    }

    static {
        try {
            MethodHandles.Lookup lookup = MethodHandles.lookup();
            COMPLETED_PROCESSORS = lookup.findVarHandle(Unit.class, "completedProcessors", Integer.TYPE);
            PROCESSING_TIME = lookup.findVarHandle(Unit.class, "processingTime", Long.TYPE);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
