package com.coxautodata.waimak.azure.table;

import com.coxautodata.waimak.dataflow.DataFlowException;
import com.coxautodata.waimak.dataflow.DataFlowException$;
import com.coxautodata.waimak.log.Logging;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.table.DynamicTableEntity;
import com.microsoft.azure.storage.table.EntityProperty;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric$IntIsIntegral$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkAzureTable.scala */
/* loaded from: input_file:com/coxautodata/waimak/azure/table/SparkAzureTable$.class */
public final class SparkAzureTable$ implements Logging {
    public static SparkAzureTable$ MODULE$;
    private final Logger com$coxautodata$waimak$log$Logging$$log;

    static {
        new SparkAzureTable$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public Logger com$coxautodata$waimak$log$Logging$$log() {
        return this.com$coxautodata$waimak$log$Logging$$log;
    }

    public final void com$coxautodata$waimak$log$Logging$_setter_$com$coxautodata$waimak$log$Logging$$log_$eq(Logger logger) {
        this.com$coxautodata$waimak$log$Logging$$log = logger;
    }

    public String azureWriterOutputLabel(String str) {
        return new StringBuilder(15).append("__azure_writer_").append(str).toString();
    }

    public boolean createIfNotExists(String str, String str2) {
        boolean createIfNotExists = CloudStorageAccount.parse(str).createCloudTableClient().getTableReference(str2).createIfNotExists();
        logInfo(() -> {
            return new StringBuilder(27).append("Azure Table [").append(str2).append("] was created ").append(createIfNotExists).toString();
        });
        return createIfNotExists;
    }

    public void pushToTable(Dataset<?> dataset, String str, String str2, int i, long j, Seq<Object> seq) {
        Set apply = Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new String[]{"_partition", "_id"}));
        Dataset withColumn = dataset.withColumn("_partition", dataset.sparkSession().implicits().StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"_partition"}))).$(Nil$.MODULE$).cast("string")).withColumn("_id", dataset.sparkSession().implicits().StringToColumn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"_id"}))).$(Nil$.MODULE$).cast("string"));
        StructType schema = withColumn.schema();
        int unboxToInt = BoxesRunTime.unboxToInt(new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps((int[]) withColumn.mapPartitions(iterator -> {
            Map<String, Function1<Row, EntityProperty>> encoder = PropertyEncoder$.MODULE$.toEncoder(schema, apply);
            SameElemIterator sameElemIterator = new SameElemIterator(iterator.map(row -> {
                return new DynamicTableEntity((String) row.getAs("_partition"), (String) row.getAs("_id"), (HashMap) encoder.foldLeft(new HashMap(), (hashMap, tuple2) -> {
                    hashMap.put(tuple2._1(), ((Function1) tuple2._2()).apply(row));
                    return hashMap;
                }));
            }).buffered(), 100, (dynamicTableEntity, dynamicTableEntity2) -> {
                return BoxesRunTime.boxToBoolean($anonfun$pushToTable$4(dynamicTableEntity, dynamicTableEntity2));
            });
            AzureTableMultiWriter azureTableMultiWriter = new AzureTableMultiWriter(str2, str, i, j, seq);
            azureTableMultiWriter.run();
            sameElemIterator.foreach(seq2 -> {
                $anonfun$pushToTable$5(azureTableMultiWriter, seq2);
                return BoxedUnit.UNIT;
            });
            return Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{azureTableMultiWriter.finish()})).iterator();
        }, dataset.sparkSession().implicits().newIntEncoder()).collect())).sum(Numeric$IntIsIntegral$.MODULE$));
        logInfo(() -> {
            return new StringBuilder(42).append("TOTAL RECORDS PUSHED INTO AZURE TABLE [").append(str2).append("]: ").append(unboxToInt).toString();
        });
    }

    public static final /* synthetic */ boolean $anonfun$pushToTable$4(DynamicTableEntity dynamicTableEntity, DynamicTableEntity dynamicTableEntity2) {
        String partitionKey = dynamicTableEntity.getPartitionKey();
        String partitionKey2 = dynamicTableEntity2.getPartitionKey();
        return partitionKey != null ? partitionKey.equals(partitionKey2) : partitionKey2 == null;
    }

    public static final /* synthetic */ void $anonfun$pushToTable$5(AzureTableMultiWriter azureTableMultiWriter, Seq seq) {
        while (!azureTableMultiWriter.queue().offer(seq, 500L, TimeUnit.MILLISECONDS)) {
            if (azureTableMultiWriter.threadFailed().get()) {
                azureTableMultiWriter.queue().clear();
                azureTableMultiWriter.threadPool().shutdown();
                throw new DataFlowException("Thread failure when writing to the Azure Table", DataFlowException$.MODULE$.$lessinit$greater$default$2());
            }
        }
    }

    private SparkAzureTable$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}
