package sparkengine.plan.app.runner;

import java.io.FileOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.log4j.Logger;
import org.apache.spark.SparkEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import sparkengine.plan.model.builder.ModelFactory;
import sparkengine.plan.model.builder.ModelFormatException;
import sparkengine.plan.model.plan.Plan;
import sparkengine.plan.model.plan.mapper.PlanMapperException;
import sparkengine.plan.model.plan.visitor.DefaultPlanVisitor;
import sparkengine.plan.model.plan.visitor.PlanVisitorException;
import sparkengine.plan.model.sink.visitor.SinkVisitorForComponents;
import sparkengine.plan.runtime.builder.RuntimeContext;
import sparkengine.plan.runtime.builder.runner.ModelPipelineRunnersFactory;
import sparkengine.plan.runtime.datasetconsumer.DatasetConsumerException;
import sparkengine.plan.runtime.runner.PipelineRunnersFactory;
import sparkengine.plan.runtime.runner.PipelineRunnersFactoryException;

/* loaded from: input_file:sparkengine/plan/app/runner/PlanRunner.class */
public final class PlanRunner {

    @Nonnull
    private final SparkSession sparkSession;

    @Nonnull
    private final PlanDefinition planDefinition;

    @Nonnull
    private final RuntimeArgs runtimeArgs;

    @Nonnull
    private final Logger log;

    /* loaded from: input_file:sparkengine/plan/app/runner/PlanRunner$PlanInspection.class */
    public static final class PlanInspection {
        private final Set<String> accumulatorNames;

        public PlanInspection(Set<String> set) {
            this.accumulatorNames = set;
        }

        public Set<String> getAccumulatorNames() {
            return this.accumulatorNames;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof PlanInspection)) {
                return false;
            }
            Set<String> accumulatorNames = getAccumulatorNames();
            Set<String> accumulatorNames2 = ((PlanInspection) obj).getAccumulatorNames();
            return accumulatorNames == null ? accumulatorNames2 == null : accumulatorNames.equals(accumulatorNames2);
        }

        public int hashCode() {
            Set<String> accumulatorNames = getAccumulatorNames();
            return (1 * 59) + (accumulatorNames == null ? 43 : accumulatorNames.hashCode());
        }

        public String toString() {
            return "PlanRunner.PlanInspection(accumulatorNames=" + getAccumulatorNames() + ")";
        }
    }

    /* loaded from: input_file:sparkengine/plan/app/runner/PlanRunner$PlanRunnerBuilder.class */
    public static class PlanRunnerBuilder {
        private SparkSession sparkSession;
        private PlanDefinition planDefinition;
        private boolean runtimeArgs$set;
        private RuntimeArgs runtimeArgs$value;
        private boolean log$set;
        private Logger log$value;

        public PlanRunnerBuilder planLocation(String str) {
            return planDefinition(PlanDefinition.planLocation(str));
        }

        PlanRunnerBuilder() {
        }

        public PlanRunnerBuilder sparkSession(@Nonnull SparkSession sparkSession) {
            this.sparkSession = sparkSession;
            return this;
        }

        public PlanRunnerBuilder planDefinition(@Nonnull PlanDefinition planDefinition) {
            this.planDefinition = planDefinition;
            return this;
        }

        public PlanRunnerBuilder runtimeArgs(@Nonnull RuntimeArgs runtimeArgs) {
            this.runtimeArgs$value = runtimeArgs;
            this.runtimeArgs$set = true;
            return this;
        }

        public PlanRunnerBuilder log(@Nonnull Logger logger) {
            this.log$value = logger;
            this.log$set = true;
            return this;
        }

        public PlanRunner build() {
            RuntimeArgs runtimeArgs = this.runtimeArgs$value;
            if (!this.runtimeArgs$set) {
                runtimeArgs = PlanRunner.$default$runtimeArgs();
            }
            Logger logger = this.log$value;
            if (!this.log$set) {
                logger = PlanRunner.$default$log();
            }
            return new PlanRunner(this.sparkSession, this.planDefinition, runtimeArgs, logger);
        }

        public String toString() {
            return "PlanRunner.PlanRunnerBuilder(sparkSession=" + this.sparkSession + ", planDefinition=" + this.planDefinition + ", runtimeArgs$value=" + this.runtimeArgs$value + ", log$value=" + this.log$value + ")";
        }
    }

    public void run() throws IOException, ModelFormatException, PlanVisitorException, PlanMapperException, PipelineRunnersFactoryException, DatasetConsumerException {
        Plan plan = getPlan();
        RuntimeContext init = RuntimeContext.init(this.sparkSession);
        registerMetrics(inspectPlan(plan, init), init);
        PipelineRunnersFactory pipelineRunnersFactory = getPipelineRunnersFactory(plan, init);
        if (this.runtimeArgs.isSkipRun()) {
            this.log.warn("skip run");
        } else {
            executePipelines(pipelineRunnersFactory);
            waitOnSpark();
        }
    }

    @Nonnull
    private Plan getPlan() throws IOException, ModelFormatException, PlanMapperException {
        Plan readPlanFromYaml = ModelFactory.readPlanFromYaml(this.planDefinition.getPlanInputStreamFactory());
        this.log.trace(String.format("source plan [%s]", readPlanFromYaml));
        Plan map = PlanResolver.of(this.planDefinition.getPlanLocation(), this.runtimeArgs, this.sparkSession, this.log).map(readPlanFromYaml);
        writeResolvedPlan(map);
        return map;
    }

    private PlanInspection inspectPlan(Plan plan, RuntimeContext runtimeContext) throws PlanVisitorException {
        UdfAccumulatorsFinder udfAccumulatorsFinder = new UdfAccumulatorsFinder();
        DefaultPlanVisitor.builder().componentVisitor(udfAccumulatorsFinder).sinkVisitor(new SinkVisitorForComponents(udfAccumulatorsFinder)).build().visit(plan);
        return new PlanInspection(udfAccumulatorsFinder.getAccumulatorNames());
    }

    private void registerMetrics(PlanInspection planInspection, RuntimeContext runtimeContext) {
        Set<String> accumulatorNames = planInspection.getAccumulatorNames();
        Objects.requireNonNull(runtimeContext);
        accumulatorNames.forEach(runtimeContext::getOrCreateAccumulator);
        SparkEnv.get().metricsSystem().registerSource(MetricSource.buildWithDefaults().withRuntimeAccumulators(runtimeContext));
    }

    private PipelineRunnersFactory getPipelineRunnersFactory(@Nonnull Plan plan, @Nonnull RuntimeContext runtimeContext) throws IOException, PlanMapperException, ModelFormatException {
        return ModelPipelineRunnersFactory.ofPlan(plan, runtimeContext);
    }

    private void writeResolvedPlan(Plan plan) throws IOException, ModelFormatException {
        if (this.runtimeArgs.getWriteResolvedPlanToFile() != null) {
            String writeResolvedPlanToFile = this.runtimeArgs.getWriteResolvedPlanToFile();
            this.log.warn(String.format("writing resolved plan to [%s]", writeResolvedPlanToFile));
            FileOutputStream fileOutputStream = new FileOutputStream(writeResolvedPlanToFile);
            try {
                ModelFactory.writePlanAsYaml(plan, fileOutputStream);
                fileOutputStream.close();
            } catch (Throwable th) {
                try {
                    fileOutputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        if (this.runtimeArgs.isWriteResolvedPlan()) {
            ModelFactory.writePlanAsYaml(plan, System.out);
        }
    }

    private void executePipelines(@Nonnull PipelineRunnersFactory pipelineRunnersFactory) throws PipelineRunnersFactoryException, DatasetConsumerException {
        Set pipelineNames = pipelineRunnersFactory.getPipelineNames();
        LinkedList linkedList = new LinkedList(pipelineNames);
        if (this.runtimeArgs.getPipelines() != null) {
            linkedList = new LinkedList(this.runtimeArgs.getPipelines());
            linkedList.retainAll(pipelineNames);
        }
        Logger logger = this.log;
        Object[] objArr = new Object[3];
        objArr[0] = linkedList;
        objArr[1] = Boolean.valueOf(this.runtimeArgs.getPipelines() != null);
        objArr[2] = Boolean.valueOf(this.runtimeArgs.isParallelPipelineExecution());
        logger.info(String.format("found pipelines [%s] (user override: %b; parallel execution: %b)", objArr));
        (this.runtimeArgs.isParallelPipelineExecution() ? linkedList.parallelStream() : linkedList.stream()).forEach(str -> {
            runPipeline(pipelineRunnersFactory, str);
        });
    }

    private void runPipeline(PipelineRunnersFactory pipelineRunnersFactory, String str) {
        this.log.info(String.format("running pipeline [%s]", str));
        try {
            pipelineRunnersFactory.buildPipelineRunner(str).run();
        } catch (DatasetConsumerException e) {
            String format = String.format("can't execute pipeline [%s]", str);
            if (!this.runtimeArgs.isSkipFaultyPipelines()) {
                throw new DatasetConsumerException(format, e);
            }
            this.log.warn(format, e);
        } catch (PipelineRunnersFactoryException e2) {
            String format2 = String.format("can't instantiate pipeline [%s]", str);
            if (!this.runtimeArgs.isSkipFaultyPipelines()) {
                throw new PipelineRunnersFactoryException(format2, e2);
            }
            this.log.warn(format2, e2);
        }
    }

    private void waitOnSpark() throws IOException {
        if (this.sparkSession.sessionState().streamingQueryManager().active().length > 0) {
            try {
                this.log.info("waiting for any stream to finish");
                this.sparkSession.sessionState().streamingQueryManager().awaitAnyTermination();
            } catch (StreamingQueryException e) {
                throw new IOException((Throwable) e);
            }
        }
    }

    private static RuntimeArgs $default$runtimeArgs() {
        return RuntimeArgs.builder().build();
    }

    private static Logger $default$log() {
        return Logger.getLogger(PlanRunner.class);
    }

    PlanRunner(@Nonnull SparkSession sparkSession, @Nonnull PlanDefinition planDefinition, @Nonnull RuntimeArgs runtimeArgs, @Nonnull Logger logger) {
        if (sparkSession == null) {
            throw new NullPointerException("sparkSession is marked non-null but is null");
        }
        if (planDefinition == null) {
            throw new NullPointerException("planDefinition is marked non-null but is null");
        }
        if (runtimeArgs == null) {
            throw new NullPointerException("runtimeArgs is marked non-null but is null");
        }
        if (logger == null) {
            throw new NullPointerException("log is marked non-null but is null");
        }
        this.sparkSession = sparkSession;
        this.planDefinition = planDefinition;
        this.runtimeArgs = runtimeArgs;
        this.log = logger;
    }

    public static PlanRunnerBuilder builder() {
        return new PlanRunnerBuilder();
    }

    @Nonnull
    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    @Nonnull
    public PlanDefinition getPlanDefinition() {
        return this.planDefinition;
    }

    @Nonnull
    public RuntimeArgs getRuntimeArgs() {
        return this.runtimeArgs;
    }

    @Nonnull
    public Logger getLog() {
        return this.log;
    }

    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof PlanRunner)) {
            return false;
        }
        PlanRunner planRunner = (PlanRunner) obj;
        SparkSession sparkSession = getSparkSession();
        SparkSession sparkSession2 = planRunner.getSparkSession();
        if (sparkSession == null) {
            if (sparkSession2 != null) {
                return false;
            }
        } else if (!sparkSession.equals(sparkSession2)) {
            return false;
        }
        PlanDefinition planDefinition = getPlanDefinition();
        PlanDefinition planDefinition2 = planRunner.getPlanDefinition();
        if (planDefinition == null) {
            if (planDefinition2 != null) {
                return false;
            }
        } else if (!planDefinition.equals(planDefinition2)) {
            return false;
        }
        RuntimeArgs runtimeArgs = getRuntimeArgs();
        RuntimeArgs runtimeArgs2 = planRunner.getRuntimeArgs();
        if (runtimeArgs == null) {
            if (runtimeArgs2 != null) {
                return false;
            }
        } else if (!runtimeArgs.equals(runtimeArgs2)) {
            return false;
        }
        Logger log = getLog();
        Logger log2 = planRunner.getLog();
        return log == null ? log2 == null : log.equals(log2);
    }

    public int hashCode() {
        SparkSession sparkSession = getSparkSession();
        int hashCode = (1 * 59) + (sparkSession == null ? 43 : sparkSession.hashCode());
        PlanDefinition planDefinition = getPlanDefinition();
        int hashCode2 = (hashCode * 59) + (planDefinition == null ? 43 : planDefinition.hashCode());
        RuntimeArgs runtimeArgs = getRuntimeArgs();
        int hashCode3 = (hashCode2 * 59) + (runtimeArgs == null ? 43 : runtimeArgs.hashCode());
        Logger log = getLog();
        return (hashCode3 * 59) + (log == null ? 43 : log.hashCode());
    }

    public String toString() {
        return "PlanRunner(sparkSession=" + getSparkSession() + ", planDefinition=" + getPlanDefinition() + ", runtimeArgs=" + getRuntimeArgs() + ", log=" + getLog() + ")";
    }
}
