package org.deeplearning4j.spark.api.worker;

import java.util.Collections;
import java.util.Iterator;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.deeplearning4j.datasets.iterator.IteratorDataSetIterator;
import org.deeplearning4j.nn.graph.ComputationGraph;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
import org.deeplearning4j.spark.api.TrainingResult;
import org.deeplearning4j.spark.api.TrainingWorker;
import org.deeplearning4j.spark.api.WorkerConfiguration;
import org.deeplearning4j.spark.api.stats.SparkTrainingStats;
import org.deeplearning4j.spark.api.stats.StatsCalculationHelper;
import org.nd4j.common.primitives.Pair;
import org.nd4j.linalg.dataset.AsyncDataSetIterator;
import org.nd4j.linalg.dataset.DataSet;
import org.nd4j.linalg.dataset.api.iterator.DataSetIterator;
import org.nd4j.linalg.factory.Nd4j;

/* loaded from: input_file:org/deeplearning4j/spark/api/worker/ExecuteWorkerFlatMap.class */
public class ExecuteWorkerFlatMap<R extends TrainingResult> implements FlatMapFunction<Iterator<DataSet>, R> {
    private final TrainingWorker<R> worker;

    public ExecuteWorkerFlatMap(TrainingWorker<R> trainingWorker) {
        this.worker = trainingWorker;
    }

    public Iterator<R> call(Iterator<DataSet> it) throws Exception {
        R processMinibatch;
        Pair<R, SparkTrainingStats> processMinibatchWithStats;
        WorkerConfiguration dataConfiguration = this.worker.getDataConfiguration();
        boolean isGraphNetwork = dataConfiguration.isGraphNetwork();
        boolean isCollectTrainingStats = dataConfiguration.isCollectTrainingStats();
        StatsCalculationHelper statsCalculationHelper = isCollectTrainingStats ? new StatsCalculationHelper() : null;
        if (isCollectTrainingStats) {
            statsCalculationHelper.logMethodStartTime();
        }
        if (!it.hasNext()) {
            if (!isCollectTrainingStats) {
                return Collections.singletonList(this.worker.getFinalResultNoData()).iterator();
            }
            statsCalculationHelper.logReturnTime();
            Pair<R, SparkTrainingStats> finalResultNoDataWithStats = this.worker.getFinalResultNoDataWithStats();
            ((TrainingResult) finalResultNoDataWithStats.getFirst()).setStats(statsCalculationHelper.build((SparkTrainingStats) finalResultNoDataWithStats.getSecond()));
            return Collections.singletonList(finalResultNoDataWithStats.getFirst()).iterator();
        }
        int batchSizePerWorker = dataConfiguration.getBatchSizePerWorker();
        int prefetchNumBatches = dataConfiguration.getPrefetchNumBatches();
        DataSetIterator iteratorDataSetIterator = new IteratorDataSetIterator(it, batchSizePerWorker);
        if (prefetchNumBatches > 0) {
            iteratorDataSetIterator = new AsyncDataSetIterator(iteratorDataSetIterator, prefetchNumBatches);
        }
        MultiLayerNetwork multiLayerNetwork = null;
        ComputationGraph computationGraph = null;
        if (isCollectTrainingStats) {
            try {
                statsCalculationHelper.logInitialModelBefore();
            } catch (Throwable th) {
                Nd4j.getExecutioner().commit();
                if (iteratorDataSetIterator instanceof AsyncDataSetIterator) {
                    ((AsyncDataSetIterator) iteratorDataSetIterator).shutdown();
                }
                throw th;
            }
        }
        if (isGraphNetwork) {
            computationGraph = this.worker.getInitialModelGraph();
        } else {
            multiLayerNetwork = this.worker.getInitialModel();
        }
        if (isCollectTrainingStats) {
            statsCalculationHelper.logInitialModelAfter();
        }
        int i = 0;
        int maxBatchesPerWorker = dataConfiguration.getMaxBatchesPerWorker() > 0 ? dataConfiguration.getMaxBatchesPerWorker() : Integer.MAX_VALUE;
        while (iteratorDataSetIterator.hasNext()) {
            int i2 = i;
            i++;
            if (i2 >= maxBatchesPerWorker) {
                break;
            }
            if (isCollectTrainingStats) {
                statsCalculationHelper.logNextDataSetBefore();
            }
            DataSet dataSet = (DataSet) iteratorDataSetIterator.next();
            if (isCollectTrainingStats) {
                statsCalculationHelper.logNextDataSetAfter(dataSet.numExamples());
            }
            if (isCollectTrainingStats) {
                statsCalculationHelper.logProcessMinibatchBefore();
                if (isGraphNetwork) {
                    processMinibatchWithStats = this.worker.processMinibatchWithStats((org.nd4j.linalg.dataset.api.DataSet) dataSet, computationGraph, !iteratorDataSetIterator.hasNext());
                } else {
                    processMinibatchWithStats = this.worker.processMinibatchWithStats((org.nd4j.linalg.dataset.api.DataSet) dataSet, multiLayerNetwork, !iteratorDataSetIterator.hasNext());
                }
                statsCalculationHelper.logProcessMinibatchAfter();
                if (processMinibatchWithStats != null) {
                    statsCalculationHelper.logReturnTime();
                    ((TrainingResult) processMinibatchWithStats.getFirst()).setStats(statsCalculationHelper.build((SparkTrainingStats) processMinibatchWithStats.getSecond()));
                    Iterator<R> it2 = Collections.singletonList(processMinibatchWithStats.getFirst()).iterator();
                    Nd4j.getExecutioner().commit();
                    if (iteratorDataSetIterator instanceof AsyncDataSetIterator) {
                        ((AsyncDataSetIterator) iteratorDataSetIterator).shutdown();
                    }
                    return it2;
                }
            } else {
                if (isGraphNetwork) {
                    processMinibatch = this.worker.processMinibatch((org.nd4j.linalg.dataset.api.DataSet) dataSet, computationGraph, !iteratorDataSetIterator.hasNext());
                } else {
                    processMinibatch = this.worker.processMinibatch((org.nd4j.linalg.dataset.api.DataSet) dataSet, multiLayerNetwork, !iteratorDataSetIterator.hasNext());
                }
                if (processMinibatch != null) {
                    Iterator<R> it3 = Collections.singletonList(processMinibatch).iterator();
                    Nd4j.getExecutioner().commit();
                    if (iteratorDataSetIterator instanceof AsyncDataSetIterator) {
                        ((AsyncDataSetIterator) iteratorDataSetIterator).shutdown();
                    }
                    return it3;
                }
            }
        }
        if (isCollectTrainingStats) {
            statsCalculationHelper.logReturnTime();
            Pair<R, SparkTrainingStats> finalResultWithStats = isGraphNetwork ? this.worker.getFinalResultWithStats(computationGraph) : this.worker.getFinalResultWithStats(multiLayerNetwork);
            ((TrainingResult) finalResultWithStats.getFirst()).setStats(statsCalculationHelper.build((SparkTrainingStats) finalResultWithStats.getSecond()));
            Iterator<R> it4 = Collections.singletonList(finalResultWithStats.getFirst()).iterator();
            Nd4j.getExecutioner().commit();
            if (iteratorDataSetIterator instanceof AsyncDataSetIterator) {
                ((AsyncDataSetIterator) iteratorDataSetIterator).shutdown();
            }
            return it4;
        }
        if (isGraphNetwork) {
            Iterator<R> it5 = Collections.singletonList(this.worker.getFinalResult(computationGraph)).iterator();
            Nd4j.getExecutioner().commit();
            if (iteratorDataSetIterator instanceof AsyncDataSetIterator) {
                ((AsyncDataSetIterator) iteratorDataSetIterator).shutdown();
            }
            return it5;
        }
        Iterator<R> it6 = Collections.singletonList(this.worker.getFinalResult(multiLayerNetwork)).iterator();
        Nd4j.getExecutioner().commit();
        if (iteratorDataSetIterator instanceof AsyncDataSetIterator) {
            ((AsyncDataSetIterator) iteratorDataSetIterator).shutdown();
        }
        return it6;
    }
}
