/*
 * Decompiled with CFR 0.152.
 */
package org.hpccsystems.spark;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import net.razorvine.pickle.IObjectConstructor;
import net.razorvine.pickle.Unpickler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.python.EvaluatePython;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
import org.hpccsystems.dfs.client.CompressionAlgorithm;
import org.hpccsystems.dfs.client.DataPartition;
import org.hpccsystems.dfs.client.HPCCRemoteFileWriter;
import org.hpccsystems.dfs.cluster.NullRemapper;
import org.hpccsystems.dfs.cluster.RemapInfo;
import org.hpccsystems.spark.GenericRowRecordAccessor;
import org.hpccsystems.spark.PySparkField;
import org.hpccsystems.spark.PySparkFieldConstructor;
import org.hpccsystems.spark.RowConstructor;
import org.hpccsystems.spark.SparkSchemaTranslator;
import org.hpccsystems.ws.client.HPCCWsDFUClient;
import org.hpccsystems.ws.client.utils.Connection;
import org.hpccsystems.ws.client.wrappers.ArrayOfEspExceptionWrapper;
import org.hpccsystems.ws.client.wrappers.wsdfu.DFUCreateFileWrapper;
import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFilePartWrapper;
import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFileTypeWrapper;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

public class HpccFileWriter
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final int DefaultExpiryTimeSecs = 300;
    private static final Logger log = LogManager.getLogger(HpccFileWriter.class);
    private transient HPCCWsDFUClient dfuClient = null;
    private transient Connection connectionInfo = null;

    private static void registerPicklingFunctions() {
        EvaluatePython.registerPicklers();
        Unpickler.registerConstructor((String)"pyspark.sql.types", (String)"Row", (IObjectConstructor)new RowConstructor());
        Unpickler.registerConstructor((String)"pyspark.sql.types", (String)"_create_row", (IObjectConstructor)new RowConstructor());
        Unpickler.registerConstructor((String)"org.hpccsystems", (String)"PySparkField", (IObjectConstructor)new PySparkFieldConstructor());
    }

    public HpccFileWriter(Connection espconninfo) throws HpccFileException {
        this.connectionInfo = espconninfo;
    }

    public HpccFileWriter(String connectionString, String user, String pass) throws Exception {
        Pattern connectionRegex = Pattern.compile("(http|https)://([^:]+):([0-9]+)", 2);
        Matcher matches = connectionRegex.matcher(connectionString);
        if (!matches.find()) {
            throw new Exception("Invalid connection string. Expected format: {http|https}://{HOST}:{PORT}");
        }
        this.connectionInfo = new Connection(matches.group(1), matches.group(2), matches.group(3));
        this.connectionInfo.setUserName(user);
        this.connectionInfo.setPassword(pass);
    }

    private void abortFileCreation() {
        log.error("Abort file creation was called. This is currently a stub.");
    }

    public long saveToHPCC(RDD<Row> scalaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper {
        JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, (ClassTag)ClassTag$.MODULE$.apply(Row.class));
        return this.saveToHPCC((JavaRDD<Row>)rdd, clusterName, fileName);
    }

    public long saveToHPCC(StructType schema, RDD<Row> scalaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper {
        JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, (ClassTag)ClassTag$.MODULE$.apply(Row.class));
        return this.saveToHPCC(schema, (JavaRDD<Row>)rdd, clusterName, fileName);
    }

    public long saveToHPCC(JavaRDD<Row> javaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper {
        return this.saveToHPCC(SparkContext.getOrCreate(), null, javaRDD, clusterName, fileName, CompressionAlgorithm.DEFAULT, false);
    }

    public long saveToHPCC(StructType schema, JavaRDD<Row> javaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper {
        return this.saveToHPCC(SparkContext.getOrCreate(), schema, javaRDD, clusterName, fileName, CompressionAlgorithm.DEFAULT, false);
    }

    public long saveToHPCC(RDD<Row> scalaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper {
        JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, (ClassTag)ClassTag$.MODULE$.apply(Row.class));
        return this.saveToHPCC(SparkContext.getOrCreate(), null, (JavaRDD<Row>)rdd, clusterName, fileName, fileCompression, overwrite);
    }

    public long saveToHPCC(StructType schema, RDD<Row> scalaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper {
        JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, (ClassTag)ClassTag$.MODULE$.apply(Row.class));
        return this.saveToHPCC(SparkContext.getOrCreate(), schema, (JavaRDD<Row>)rdd, clusterName, fileName, fileCompression, overwrite);
    }

    public long saveToHPCC(JavaRDD<Row> javaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper {
        return this.saveToHPCC(SparkContext.getOrCreate(), null, javaRDD, clusterName, fileName, fileCompression, overwrite);
    }

    public long saveToHPCC(StructType schema, JavaRDD<Row> javaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper {
        return this.saveToHPCC(SparkContext.getOrCreate(), schema, javaRDD, clusterName, fileName, fileCompression, overwrite);
    }

    public long saveToHPCC(SparkContext sc, RDD<Row> scalaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper {
        JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, (ClassTag)ClassTag$.MODULE$.apply(Row.class));
        return this.saveToHPCC(sc, null, (JavaRDD<Row>)rdd, clusterName, fileName, CompressionAlgorithm.DEFAULT, false);
    }

    public long saveToHPCC(SparkContext sc, JavaRDD<Row> javaRDD, String clusterName, String fileName) throws Exception, ArrayOfEspExceptionWrapper {
        return this.saveToHPCC(sc, null, javaRDD, clusterName, fileName, CompressionAlgorithm.DEFAULT, false);
    }

    public long saveToHPCC(SparkContext sc, RDD<Row> scalaRDD, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper {
        JavaRDD rdd = JavaRDD.fromRDD(scalaRDD, (ClassTag)ClassTag$.MODULE$.apply(Row.class));
        return this.saveToHPCC(sc, null, (JavaRDD<Row>)rdd, clusterName, fileName, fileCompression, overwrite);
    }

    public long saveToHPCC(SparkContext sc, StructType rddSchema, JavaRDD<Row> rdd, String clusterName, String fileName, CompressionAlgorithm fileCompression, boolean overwrite) throws Exception, ArrayOfEspExceptionWrapper {
        boolean isCompressed;
        FieldDef recordDef;
        String eclRecordDefn;
        DFUCreateFileWrapper createResult;
        DFUFilePartWrapper[] dfuFileParts;
        DataPartition[] hpccPartitions;
        this.dfuClient = HPCCWsDFUClient.get(this.connectionInfo);
        if (sc == null || rdd == null) {
            throw new Exception("Aborting write. A valid non-null SparkContext and RDD must be provided.");
        }
        StructType schema = rddSchema;
        if (schema == null) {
            Row firstRow = (Row)rdd.first();
            schema = firstRow.schema();
        }
        if ((hpccPartitions = DataPartition.createPartitions(dfuFileParts = (createResult = this.dfuClient.createFile(fileName, clusterName, eclRecordDefn = RecordDefinitionTranslator.toECLRecord(recordDef = SparkSchemaTranslator.toHPCCRecordDef(schema)), 300, isCompressed = fileCompression != CompressionAlgorithm.NONE, DFUFileTypeWrapper.Flat, null)).getFileParts(), new NullRemapper(new RemapInfo(), createResult.getFileAccessInfo()), dfuFileParts.length, createResult.getFileAccessInfoBlob())).length != rdd.getNumPartitions() && (rdd = rdd.repartition(hpccPartitions.length)).getNumPartitions() != hpccPartitions.length) {
            throw new Exception("Repartitioning RDD failed. Aborting write.");
        }
        Function2 & Serializable writeFunc = (Function2 & Serializable)(partitionIndex, it) -> {
            HpccFileWriter.registerPicklingFunctions();
            GenericRowRecordAccessor recordAccessor = new GenericRowRecordAccessor(recordDef);
            HPCCRemoteFileWriter fileWriter = new HPCCRemoteFileWriter(hpccPartitions[partitionIndex], recordDef, recordAccessor, fileCompression);
            FilePartWriteResults result = new FilePartWriteResults();
            try {
                fileWriter.writeRecords(it);
                fileWriter.close();
                result.dataLength = fileWriter.getBytesWritten();
                result.numRecords = fileWriter.getRecordsWritten();
                result.successful = true;
            }
            catch (Exception e) {
                result.successful = false;
                result.errorMessage = e.getMessage();
            }
            List<FilePartWriteResults> resultList = Arrays.asList(result);
            return resultList.iterator();
        };
        JavaRDD writeResultsRDD = rdd.mapPartitionsWithIndex((Function2)writeFunc, true);
        List writeResultsList = writeResultsRDD.collect();
        long recordsWritten = 0L;
        long dataWritten = 0L;
        for (int i = 0; i < writeResultsList.size(); ++i) {
            FilePartWriteResults result = (FilePartWriteResults)writeResultsList.get(i);
            recordsWritten += result.numRecords;
            dataWritten += result.dataLength;
            if (result.successful) continue;
            this.abortFileCreation();
            throw new Exception("Writing failed with error: " + result.errorMessage);
        }
        try {
            this.dfuClient.publishFile(createResult.getFileID(), eclRecordDefn, recordsWritten, dataWritten, overwrite);
        }
        catch (Exception e) {
            throw new Exception("Failed to publish file with error: " + e.getMessage());
        }
        return recordsWritten;
    }

    public StructType inferSchema(List<PySparkField> exampleFields) throws Exception {
        return this.generateRowSchema(exampleFields);
    }

    private StructType generateRowSchema(List<PySparkField> exampleFields) throws Exception {
        StructField[] fields = new StructField[exampleFields.size()];
        int index = 0;
        for (PySparkField fieldInfo : exampleFields) {
            fields[index] = this.generateSchemaField(fieldInfo.getName(), fieldInfo.getValue());
            ++index;
        }
        return new StructType(fields);
    }

    private StructField generateSchemaField(String name, Object obj) throws Exception {
        Metadata empty = Metadata.empty();
        boolean nullable = false;
        DataType type = DataTypes.NullType;
        if (obj instanceof String) {
            type = DataTypes.StringType;
        } else if (obj instanceof Byte) {
            type = DataTypes.ByteType;
        } else if (obj instanceof Short) {
            type = DataTypes.ShortType;
        } else if (obj instanceof Integer) {
            type = DataTypes.IntegerType;
        } else if (obj instanceof Long) {
            type = DataTypes.LongType;
        } else if (obj instanceof byte[]) {
            type = DataTypes.BinaryType;
        } else if (obj instanceof Boolean) {
            type = DataTypes.BooleanType;
        } else if (obj instanceof Float) {
            type = DataTypes.FloatType;
        } else if (obj instanceof Double) {
            type = DataTypes.DoubleType;
        } else if (obj instanceof BigDecimal) {
            BigDecimal decimal = (BigDecimal)obj;
            int precision = decimal.precision();
            int scale = decimal.scale();
            if (precision > DecimalType.MAX_PRECISION()) {
                if ((scale -= precision - DecimalType.MAX_PRECISION()) < 0) {
                    scale = 0;
                }
                precision = DecimalType.MAX_PRECISION();
            }
            type = DataTypes.createDecimalType((int)precision, (int)scale);
        } else if (obj instanceof List) {
            List list = (List)obj;
            if (list.size() == 0) {
                throw new Exception("Unable to infer row schema. Encountered an empty List: " + name + ". All lists must have an example row to infer schema.");
            }
            Object firstChild = list.get(0);
            if (firstChild instanceof PySparkField) {
                List rowFields = list;
                type = this.generateRowSchema(rowFields);
            } else {
                StructField childField = this.generateSchemaField("temp", firstChild);
                type = DataTypes.createArrayType((DataType)childField.dataType());
                nullable = true;
            }
        } else {
            throw new Exception("Encountered unsupported type: " + obj.getClass().getName() + ". Ensure that the entire example row hierarchy has been converted to a Dictionary. Including rows in child datasets.");
        }
        return new StructField(name, type, nullable, empty);
    }

    static {
        HpccFileWriter.registerPicklingFunctions();
    }

    private class FilePartWriteResults
    implements Serializable {
        private static final long serialVersionUID = 1L;
        public long numRecords = 0L;
        public long dataLength = 0L;
        public boolean successful = true;
        public String errorMessage = null;

        private FilePartWriteResults() {
        }
    }
}

