/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.consistency.checking.full;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.consistency.checking.full.QueueDistribution;
import org.neo4j.consistency.checking.full.RecordCheckWorker;
import org.neo4j.consistency.checking.full.RecordProcessor;
import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.string.Workers;

public class RecordDistributor {
    public static <RECORD> void distributeRecords(int numberOfThreads, String workerNames, int queueSize, Iterator<RECORD> records, ProgressListener progress, RecordProcessor<RECORD> processor, QueueDistribution.QueueDistributor<RECORD> idDistributor) {
        if (!records.hasNext()) {
            return;
        }
        final ArrayBlockingQueue[] recordQ = new ArrayBlockingQueue[numberOfThreads];
        Workers workers = new Workers(workerNames);
        AtomicInteger idGroup = new AtomicInteger(-1);
        for (int threadId = 0; threadId < numberOfThreads; ++threadId) {
            recordQ[threadId] = new ArrayBlockingQueue(queueSize);
            workers.start(new RecordCheckWorker<RECORD>(threadId, idGroup, recordQ[threadId], processor));
        }
        final int[] recsProcessed = new int[numberOfThreads];
        RecordConsumer recordConsumer = new RecordConsumer<RECORD>(){

            @Override
            public void accept(RECORD record, int qIndex) throws InterruptedException {
                recordQ[qIndex].put(record);
                int n = qIndex;
                recsProcessed[n] = recsProcessed[n] + 1;
            }
        };
        try {
            while (records.hasNext()) {
                try {
                    RECORD record = records.next();
                    idDistributor.distribute(record, recordConsumer);
                    progress.add(1L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            for (RecordCheckWorker worker : workers) {
                worker.done();
            }
            workers.awaitAndThrowOnError(RuntimeException.class);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Was interrupted while awaiting completion");
        }
    }

    public static long calculateRecodsPerCpu(long highId, int numberOfThreads) {
        boolean hasRest = highId % (long)numberOfThreads > 0L;
        long result = highId / (long)numberOfThreads;
        if (hasRest) {
            ++result;
        }
        return result;
    }

    static interface RecordConsumer<RECORD> {
        public void accept(RECORD var1, int var2) throws InterruptedException;
    }
}

