/*
 * Decompiled with CFR 0.152.
 */
package org.datacleaner.cluster;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SharedExecutorService;
import org.datacleaner.api.InputColumn;
import org.datacleaner.cluster.ClusterManager;
import org.datacleaner.cluster.DistributedAnalysisResultFuture;
import org.datacleaner.cluster.DistributedAnalysisResultReducer;
import org.datacleaner.cluster.DistributedJobContextImpl;
import org.datacleaner.cluster.FailedAnalysisResultFuture;
import org.datacleaner.cluster.JobDivisionManager;
import org.datacleaner.cluster.virtual.VirtualClusterManager;
import org.datacleaner.components.maxrows.MaxRowsFilter;
import org.datacleaner.configuration.DataCleanerConfiguration;
import org.datacleaner.configuration.InjectionManager;
import org.datacleaner.data.MetaModelInputColumn;
import org.datacleaner.descriptors.ComponentDescriptor;
import org.datacleaner.job.AnalysisJob;
import org.datacleaner.job.ComponentJob;
import org.datacleaner.job.builder.AnalysisJobBuilder;
import org.datacleaner.job.builder.FilterComponentBuilder;
import org.datacleaner.job.concurrent.SingleThreadedTaskRunner;
import org.datacleaner.job.concurrent.TaskListener;
import org.datacleaner.job.concurrent.TaskRunner;
import org.datacleaner.job.runner.AnalysisJobMetrics;
import org.datacleaner.job.runner.AnalysisListener;
import org.datacleaner.job.runner.AnalysisResultFuture;
import org.datacleaner.job.runner.AnalysisRunner;
import org.datacleaner.job.runner.CompositeAnalysisListener;
import org.datacleaner.job.runner.RowProcessingMetrics;
import org.datacleaner.job.runner.RowProcessingPublisher;
import org.datacleaner.job.runner.RowProcessingPublishers;
import org.datacleaner.job.tasks.Task;
import org.datacleaner.lifecycle.LifeCycleHelper;
import org.datacleaner.util.SourceColumnFinder;
import org.datacleaner.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DistributedAnalysisRunner
implements AnalysisRunner {
    private static final Logger logger = LoggerFactory.getLogger(DistributedAnalysisRunner.class);
    private final ClusterManager _clusterManager;
    private final DataCleanerConfiguration _configuration;
    private final CompositeAnalysisListener _analysisListener;

    public DistributedAnalysisRunner(DataCleanerConfiguration configuration, ClusterManager clusterManager) {
        this(configuration, clusterManager, new AnalysisListener[0]);
    }

    public DistributedAnalysisRunner(DataCleanerConfiguration configuration, ClusterManager clusterManager, AnalysisListener ... listeners) {
        this._configuration = configuration;
        this._clusterManager = clusterManager;
        this._analysisListener = new CompositeAnalysisListener(listeners);
    }

    public boolean isDistributable(AnalysisJob job) {
        try {
            this.failIfJobIsUnsupported(job);
            return true;
        }
        catch (Throwable e) {
            return false;
        }
    }

    public AnalysisResultFuture run(AnalysisJob job) throws UnsupportedOperationException {
        AnalysisResultFuture resultFuture;
        logger.info("Validating distributed job: {}", (Object)job);
        this.failIfJobIsUnsupported(job);
        InjectionManager injectionManager = this._configuration.getEnvironment().getInjectionManagerFactory().getInjectionManager(this._configuration, job);
        LifeCycleHelper lifeCycleHelper = new LifeCycleHelper(injectionManager, true);
        RowProcessingPublishers publishers = this.getRowProcessingPublishers(job, lifeCycleHelper);
        RowProcessingPublisher publisher = this.getRowProcessingPublisher(publishers);
        publisher.initializeConsumers(new TaskListener(){

            public void onError(Task task, Throwable throwable) {
                logger.error("Failed to initialize consumers at master node!", throwable);
            }

            public void onComplete(Task task) {
            }

            public void onBegin(Task task) {
            }
        });
        logger.info("Validation passed! Chunking job for distribution amongst slaves: {}", (Object)job);
        AnalysisJobMetrics analysisJobMetrics = publishers.getAnalysisJobMetrics();
        this._analysisListener.jobBegin(job, analysisJobMetrics);
        RowProcessingMetrics rowProcessingMetrics = publisher.getRowProcessingMetrics();
        this._analysisListener.rowProcessingBegin(job, rowProcessingMetrics);
        try {
            int expectedRows = rowProcessingMetrics.getExpectedRows();
            if (expectedRows == 0) {
                logger.info("Expected rows of the job was zero. Job will run on a local virtual slave.");
                DistributedJobContextImpl context = new DistributedJobContextImpl(this._configuration, job, 0, 1);
                VirtualClusterManager localCluster = new VirtualClusterManager(this._configuration, 1);
                resultFuture = localCluster.dispatchJob(job, context);
            } else {
                JobDivisionManager jobDivisionManager = this._clusterManager.getJobDivisionManager();
                int chunks = jobDivisionManager.calculateDivisionCount(job, expectedRows);
                int rowsPerChunk = (expectedRows + 1) / chunks;
                logger.info("Expected rows was {}. A total number of {} slave jobs will be built, each of approx. {} rows.", new Object[]{expectedRows, chunks, rowsPerChunk});
                List<AnalysisResultFuture> results = this.dispatchJobs(job, chunks, rowsPerChunk, publisher);
                DistributedAnalysisResultReducer reducer = new DistributedAnalysisResultReducer(job, lifeCycleHelper, publisher, (AnalysisListener)this._analysisListener);
                resultFuture = new DistributedAnalysisResultFuture(results, reducer);
            }
        }
        catch (RuntimeException e) {
            this._analysisListener.errorUnknown(job, (Throwable)e);
            throw e;
        }
        if (!this._analysisListener.isEmpty()) {
            this.awaitAndInformListener(job, analysisJobMetrics, rowProcessingMetrics, resultFuture);
        }
        return resultFuture;
    }

    private void awaitAndInformListener(final AnalysisJob job, final AnalysisJobMetrics analysisJobMetrics, RowProcessingMetrics rowProcessingMetrics, final AnalysisResultFuture resultFuture) {
        SharedExecutorService.get().execute(new Runnable(){

            @Override
            public void run() {
                resultFuture.await();
                if (resultFuture.isSuccessful()) {
                    DistributedAnalysisRunner.this._analysisListener.jobSuccess(job, analysisJobMetrics);
                }
            }
        });
    }

    public List<AnalysisResultFuture> dispatchJobs(AnalysisJob job, int chunks, int rowsPerChunk, RowProcessingPublisher publisher) {
        ArrayList<AnalysisResultFuture> results = new ArrayList<AnalysisResultFuture>();
        for (int i = 0; i < chunks; ++i) {
            int firstRow = i * rowsPerChunk + 1;
            int maxRows = i == chunks - 1 ? Integer.MAX_VALUE - firstRow - 1 : rowsPerChunk;
            AnalysisJob slaveJob = this.buildSlaveJob(job, i, firstRow, maxRows);
            DistributedJobContextImpl context = new DistributedJobContextImpl(this._configuration, job, i, chunks);
            try {
                logger.info("Dispatching slave job {} of {}", (Object)(i + 1), (Object)chunks);
                AnalysisResultFuture slaveResultFuture = this._clusterManager.dispatchJob(slaveJob, context);
                results.add(slaveResultFuture);
                continue;
            }
            catch (Exception e) {
                this._analysisListener.errorUnknown(job, (Throwable)e);
                FailedAnalysisResultFuture errorResult = new FailedAnalysisResultFuture(e);
                results.add(0, errorResult);
                break;
            }
        }
        return results;
    }

    private AnalysisJob buildSlaveJob(AnalysisJob job, int slaveJobIndex, int firstRow, int maxRows) {
        logger.info("Building slave job {} with firstRow={} and maxRow={}", new Object[]{slaveJobIndex + 1, firstRow, maxRows});
        try (AnalysisJobBuilder jobBuilder = new AnalysisJobBuilder(this._configuration, job);){
            FilterComponentBuilder maxRowsFilter = jobBuilder.addFilter(MaxRowsFilter.class);
            ((MaxRowsFilter)maxRowsFilter.getComponentInstance()).setFirstRow(firstRow);
            ((MaxRowsFilter)maxRowsFilter.getComponentInstance()).setMaxRows(maxRows);
            boolean naturalRecordOrderConsistent = jobBuilder.getDatastore().getPerformanceCharacteristics().isNaturalRecordOrderConsistent();
            if (!naturalRecordOrderConsistent) {
                InputColumn<?> orderColumn = this.findOrderByColumn(jobBuilder);
                ((MaxRowsFilter)maxRowsFilter.getComponentInstance()).setOrderColumn(orderColumn);
            }
            jobBuilder.setDefaultRequirement(maxRowsFilter, (Enum)MaxRowsFilter.Category.VALID);
            assert (jobBuilder.isConfigured(true));
            AnalysisJob analysisJob = jobBuilder.toAnalysisJob();
            return analysisJob;
        }
    }

    private InputColumn<?> findOrderByColumn(AnalysisJobBuilder jobBuilder) {
        Table sourceTable = (Table)jobBuilder.getSourceTables().get(0);
        Object[] primaryKeys = sourceTable.getPrimaryKeys();
        if (primaryKeys.length == 1) {
            Column primaryKey = primaryKeys[0];
            InputColumn sourceColumn = jobBuilder.getSourceColumnByName(primaryKey.getName());
            if (sourceColumn == null) {
                jobBuilder.addSourceColumn(primaryKey);
                logger.info("Added PK source column for ORDER BY clause on slave jobs: {}", (Object)sourceColumn);
                return jobBuilder.getSourceColumnByName(primaryKey.getName());
            }
            logger.info("Using existing PK source column for ORDER BY clause on slave jobs: {}", (Object)sourceColumn);
            return sourceColumn;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Found {} primary keys, cannot select a single for ORDER BY clause on slave jobs: {}", (Object)primaryKeys.length, (Object)Arrays.toString(primaryKeys));
        }
        List sourceColumns = jobBuilder.getSourceColumns();
        String tableName = sourceTable.getName().toLowerCase();
        for (MetaModelInputColumn sourceColumn : sourceColumns) {
            String name = sourceColumn.getName();
            if (name == null) continue;
            name = StringUtils.replaceWhitespaces((String)name, (String)"");
            name = StringUtils.replaceAll((String)name, (String)"_", (String)"");
            name = StringUtils.replaceAll((String)name, (String)"-", (String)"");
            if (!"id".equals(name = name.toLowerCase()) && !(tableName + "id").equals(name) && !(tableName + "number").equals(name) && !(tableName + "key").equals(name)) continue;
            logger.info("Using existing source column for ORDER BY clause on slave jobs: {}", (Object)sourceColumn);
            return sourceColumn;
        }
        MetaModelInputColumn sourceColumn = (MetaModelInputColumn)sourceColumns.get(0);
        logger.warn("Couldn't pick a good source column for ORDER BY clause on slave jobs. Picking the first column: {}", (Object)sourceColumn);
        return sourceColumn;
    }

    private RowProcessingPublishers getRowProcessingPublishers(AnalysisJob job, LifeCycleHelper lifeCycleHelper) {
        SourceColumnFinder sourceColumnFinder = new SourceColumnFinder();
        sourceColumnFinder.addSources(job);
        SingleThreadedTaskRunner taskRunner = new SingleThreadedTaskRunner();
        RowProcessingPublishers publishers = new RowProcessingPublishers(job, null, (TaskRunner)taskRunner, lifeCycleHelper, sourceColumnFinder);
        return publishers;
    }

    private RowProcessingPublisher getRowProcessingPublisher(RowProcessingPublishers publishers) {
        Table[] tables = publishers.getTables();
        if (tables.length != 1) {
            throw new UnsupportedOperationException("Jobs with multiple source tables are not distributable");
        }
        Table table = tables[0];
        RowProcessingPublisher publisher = publishers.getRowProcessingPublisher(table);
        return publisher;
    }

    private void failIfJobIsUnsupported(AnalysisJob job) throws UnsupportedOperationException {
        this.failIfComponentsAreUnsupported(job.getFilterJobs());
        this.failIfComponentsAreUnsupported(job.getTransformerJobs());
        this.failIfComponentsAreUnsupported(job.getAnalyzerJobs());
    }

    private void failIfComponentsAreUnsupported(Collection<? extends ComponentJob> jobs) throws UnsupportedOperationException {
        for (ComponentJob componentJob : jobs) {
            ComponentDescriptor descriptor = componentJob.getDescriptor();
            boolean distributable = descriptor.isDistributable();
            if (distributable) continue;
            throw new UnsupportedOperationException("Component is not distributable: " + componentJob);
        }
    }
}

