package org.hpccsystems.spark;

import java.io.File;
import java.io.FileOutputStream;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.log4j.Logger;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.execution.python.EvaluatePython;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.hpccsystems.spark.thor.BinaryRecordWriter;
import org.hpccsystems.spark.thor.RFCCodes;
import org.hpccsystems.spark.thor.SparkField;
import org.hpccsystems.ws.client.HPCCWsDFUClient;
import org.hpccsystems.ws.client.utils.Connection;
import org.hpccsystems.ws.client.wrappers.wsdfu.DFUCreateFileWrapper;
import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFileCopyWrapper;
import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFilePartWrapper;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/hpccsystems/spark/HpccFileWriter.class */
public class HpccFileWriter implements Serializable {
    private static final long serialVersionUID = 1;
    private static final int DefaultExpiryTimeSecs = 300;
    private static final Logger log = Logger.getLogger(HpccFileWriter.class.getName());
    private transient HPCCWsDFUClient dfuClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hpccsystems/spark/HpccFileWriter$FilePartWriteResults.class */
    public class FilePartWriteResults implements Serializable {
        private static final long serialVersionUID = 1;
        public long numRecords;
        public long dataLength;
        public boolean successful;

        private FilePartWriteResults() {
            this.numRecords = 0L;
            this.dataLength = 0L;
            this.successful = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hpccsystems/spark/HpccFileWriter$PartitionLocation.class */
    public class PartitionLocation implements Serializable {
        private static final long serialVersionUID = 1;
        public InetAddress host;
        public int partitionIndex;

        private PartitionLocation() {
            this.host = null;
            this.partitionIndex = -1;
        }
    }

    public HpccFileWriter(String str, String str2, String str3) throws Exception {
        this.dfuClient = null;
        Matcher matcher = Pattern.compile("(http|https)://([^:]+):([0-9]+)", 2).matcher(str);
        if (!matcher.find()) {
            throw new Exception("Invalid connection string. Expected format: {http|https}://{HOST}:{PORT}");
        }
        Connection connection = new Connection(matcher.group(1), matcher.group(2), matcher.group(3));
        connection.setUserName(str2);
        connection.setPassword(str3);
        this.dfuClient = HPCCWsDFUClient.get(connection);
    }

    private void abortFileCreation() {
        log.error("Abort file creation was called. This is currently a stub.");
    }

    private static InetAddress getLocalAddress() throws SocketException {
        ArrayList arrayList = new ArrayList();
        Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
        while (networkInterfaces.hasMoreElements()) {
            NetworkInterface nextElement = networkInterfaces.nextElement();
            Enumeration<InetAddress> inetAddresses = nextElement.getInetAddresses();
            InetAddress inetAddress = null;
            while (inetAddresses.hasMoreElements()) {
                InetAddress nextElement2 = inetAddresses.nextElement();
                if ((nextElement2 instanceof Inet4Address) && !nextElement2.isLoopbackAddress()) {
                    inetAddress = nextElement2;
                }
            }
            while (arrayList.size() < nextElement.getIndex() + 1) {
                arrayList.add(null);
            }
            arrayList.set(nextElement.getIndex(), inetAddress);
        }
        for (int i = 0; i < arrayList.size(); i++) {
            if (arrayList.get(i) != null) {
                return (InetAddress) arrayList.get(i);
            }
        }
        return null;
    }

    public long saveToHPCC(RDD<Row> rdd, String str, String str2) throws Exception {
        return saveToHPCC(SparkContext.getOrCreate(), rdd, str, str2);
    }

    public long saveToHPCC(SparkContext sparkContext, RDD<Row> rdd, String str, String str2) throws Exception {
        JavaRDD fromRDD = JavaRDD.fromRDD(rdd, ClassTag$.MODULE$.apply(Row.class));
        fromRDD.cache();
        List<PartitionLocation> collect = fromRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<PartitionLocation>>() { // from class: org.hpccsystems.spark.HpccFileWriter.1
            public Iterator<PartitionLocation> call(Integer num, Iterator<Row> it) throws Exception {
                PartitionLocation partitionLocation = new PartitionLocation();
                partitionLocation.host = HpccFileWriter.access$100();
                partitionLocation.partitionIndex = num.intValue();
                return Arrays.asList(partitionLocation).iterator();
            }
        }, true).collect();
        String[] strArr = new String[collect.size()];
        for (PartitionLocation partitionLocation : collect) {
            strArr[partitionLocation.partitionIndex] = partitionLocation.host.getHostAddress();
        }
        String ecl = SparkField.toECL(new SparkField(new StructField("root", ((Row) rdd.first()).schema(), false, Metadata.empty())));
        try {
            DFUCreateFileWrapper createFile = this.dfuClient.createFile(str2, str, ecl, strArr, DefaultExpiryTimeSecs);
            DFUFilePartWrapper[] fileParts = createFile.getFileParts();
            String[] strArr2 = new String[fileParts.length];
            for (int i = 0; i < fileParts.length; i++) {
                DFUFileCopyWrapper[] copies = fileParts[i].getCopies();
                if (copies.length == 0) {
                    abortFileCreation();
                    throw new Exception("File creation error: File part: " + (i + 1) + " does not have an file copies associated with it. Aborting write.");
                }
                strArr2[i] = copies[0].getCopyPath();
            }
            if (strArr2.length != fromRDD.getNumPartitions()) {
                abortFileCreation();
                throw new Exception("File creation error: Invalid number of file parts returned during creation request.");
            }
            List collect2 = fromRDD.mapPartitionsWithIndex((num, it) -> {
                FilePartWriteResults writeLocalFilePart;
                if (!getLocalAddress().getHostAddress().equals(strArr[num.intValue()])) {
                    log.error("File part mapping changed before writing began. Aborting write.");
                    writeLocalFilePart = new FilePartWriteResults();
                    writeLocalFilePart.successful = false;
                } else {
                    writeLocalFilePart = writeLocalFilePart(strArr2[num.intValue()], it);
                }
                return Arrays.asList(writeLocalFilePart).iterator();
            }, true).collect();
            long j = 0;
            long j2 = 0;
            for (int i2 = 0; i2 < collect2.size(); i2++) {
                FilePartWriteResults filePartWriteResults = (FilePartWriteResults) collect2.get(i2);
                j += filePartWriteResults.numRecords;
                j2 += filePartWriteResults.dataLength;
                if (!filePartWriteResults.successful) {
                    abortFileCreation();
                    throw new Exception("Writing failed. An error occured on node: " + strArr[i2] + " check it's error logs for more information.");
                }
            }
            try {
                this.dfuClient.publishFile(createFile.getFileID(), ecl, j, j2);
                return j;
            } catch (Exception e) {
                throw new Exception("Failed to publish file with error: " + e.getMessage());
            }
        } catch (Exception e2) {
            log.error("DFU File Creation Error: " + e2.toString());
            throw new Exception("DFU File Creation Error: " + e2.toString());
        }
    }

    private FilePartWriteResults writeLocalFilePart(String str, Iterator<Row> it) throws Exception {
        FilePartWriteResults filePartWriteResults = new FilePartWriteResults();
        File file = new File(str.substring(0, str.lastIndexOf(File.separator)));
        if (!file.exists()) {
            file.mkdirs();
        }
        FileOutputStream fileOutputStream = new FileOutputStream(str);
        FileChannel channel = fileOutputStream.getChannel();
        if (it.hasNext()) {
            Row next = it.next();
            try {
                BinaryRecordWriter binaryRecordWriter = new BinaryRecordWriter(channel, next.schema());
                binaryRecordWriter.writeRecord(next);
                filePartWriteResults.numRecords++;
                while (it.hasNext()) {
                    binaryRecordWriter.writeRecord(it.next());
                    filePartWriteResults.numRecords++;
                }
                binaryRecordWriter.finalize();
                filePartWriteResults.dataLength = binaryRecordWriter.getTotalBytesWritten();
                filePartWriteResults.successful = true;
            } catch (Exception e) {
                filePartWriteResults.successful = false;
                log.error(e.getMessage());
            }
        }
        if (channel != null) {
            channel.close();
        }
        if (fileOutputStream != null) {
            fileOutputStream.close();
        }
        return filePartWriteResults;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 87940214:
                if (implMethodName.equals("lambda$saveToHPCC$e36769e5$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case RFCCodes.RFCStreamNoError /* 0 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/hpccsystems/spark/HpccFileWriter") && serializedLambda.getImplMethodSignature().equals("([Ljava/lang/String;[Ljava/lang/String;Ljava/lang/Integer;Ljava/util/Iterator;)Ljava/util/Iterator;")) {
                    HpccFileWriter hpccFileWriter = (HpccFileWriter) serializedLambda.getCapturedArg(0);
                    String[] strArr = (String[]) serializedLambda.getCapturedArg(1);
                    String[] strArr2 = (String[]) serializedLambda.getCapturedArg(2);
                    return (num, it) -> {
                        FilePartWriteResults writeLocalFilePart;
                        if (!getLocalAddress().getHostAddress().equals(strArr[num.intValue()])) {
                            log.error("File part mapping changed before writing began. Aborting write.");
                            writeLocalFilePart = new FilePartWriteResults();
                            writeLocalFilePart.successful = false;
                        } else {
                            writeLocalFilePart = writeLocalFilePart(strArr2[num.intValue()], it);
                        }
                        return Arrays.asList(writeLocalFilePart).iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static /* synthetic */ InetAddress access$100() throws SocketException {
        return getLocalAddress();
    }

    static {
        EvaluatePython.registerPicklers();
    }
}
