package org.apache.spark.sql.delta.commands;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.hadoop.fs.Path;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.JsonToStructs;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaLog$;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.MLSQLMultiDeltaOptions$;
import org.apache.spark.sql.delta.TableMetaInfo;
import org.apache.spark.sql.delta.actions.Metadata;
import org.apache.spark.sql.delta.files.DelayedCommitProtocol;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.sql.types.StructType;
import scala.Array$;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.math.Ordering$Long$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.util.Try$;
import tech.mlsql.common.utils.Md5$;

/* compiled from: BinlogSyncToDelta.scala */
/* loaded from: input_file:org/apache/spark/sql/delta/commands/BinlogSyncToDelta$.class */
public final class BinlogSyncToDelta$ implements DeltaCommandsFun {
    public static BinlogSyncToDelta$ MODULE$;

    static {
        new BinlogSyncToDelta$();
    }

    @Override // org.apache.spark.sql.delta.commands.DeltaCommandsFun
    public Tuple2<QueryExecution, Seq<Attribute>> normalizeData(Metadata metadata, Dataset<?> dataset, Seq<String> seq) {
        Tuple2<QueryExecution, Seq<Attribute>> normalizeData;
        normalizeData = normalizeData(metadata, dataset, seq);
        return normalizeData;
    }

    @Override // org.apache.spark.sql.delta.commands.DeltaCommandsFun
    public DelayedCommitProtocol getCommitter(Path path) {
        DelayedCommitProtocol committer;
        committer = getCommitter(path);
        return committer;
    }

    @Override // org.apache.spark.sql.delta.commands.DeltaCommandsFun
    public Dataset<?> convertStreamDataFrame(Dataset<?> dataset) {
        Dataset<?> convertStreamDataFrame;
        convertStreamDataFrame = convertStreamDataFrame(dataset);
        return convertStreamDataFrame;
    }

    public void run(Dataset<Row> dataset, Map<String, String> map) {
        Seq seq = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(((String) map.apply(UpsertTableInDelta$.MODULE$.ID_COLS())).split(","))).toSeq();
        int i = new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse(UpsertTableInDelta$.MODULE$.NEW_DATA_PARALLEL_NUM(), () -> {
            return "8";
        }))).toInt();
        ObjectRef create = ObjectRef.create(convertStreamDataFrame(dataset));
        if (i != new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(((Dataset) create.elem).rdd().partitions())).size()) {
            create.elem = ((Dataset) create.elem).repartition(i);
        }
        ((Dataset) create.elem).cache();
        try {
            if (new StringOps(Predef$.MODULE$.augmentString((String) map.getOrElse(MLSQLMultiDeltaOptions$.MODULE$.KEEP_BINLOG(), () -> {
                return "false";
            }))).toBoolean()) {
                ((Dataset) create.elem).write().format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").mode(SaveMode.Append).save((String) map.apply(MLSQLMultiDeltaOptions$.MODULE$.BINLOG_PATH()));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                BoxesRunTime.boxToLong(((Dataset) create.elem).count());
            }
            SparkSession sparkSession = ((Dataset) create.elem).sparkSession();
            RDD map2 = ((Dataset) create.elem).rdd().flatMap(row -> {
                JSONObject fromObject = JSONObject.fromObject(row.getString(0));
                return (Buffer) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter((JSONArray) fromObject.remove("rows")).asScala()).map(obj -> {
                    ((JSONObject) obj).put(MLSQLMultiDeltaOptions$.MODULE$.META_KEY(), fromObject);
                    return (JSONObject) obj;
                }, Buffer$.MODULE$.canBuildFrom());
            }, ClassTag$.MODULE$.apply(JSONObject.class)).map(jSONObject -> {
                return new Tuple2(Md5$.MODULE$.md5Hash(new StringBuilder(2).append(_getInfoFromMeta$1(jSONObject, "databaseName")).append("_").append(_getInfoFromMeta$1(jSONObject, "tableName")).append("_").append(((TraversableOnce) seq.map(str -> {
                    return jSONObject.get(str).toString();
                }, Seq$.MODULE$.canBuildFrom())).mkString("")).toString()), jSONObject.toString());
            }, ClassTag$.MODULE$.apply(Tuple2.class)).groupBy(tuple2 -> {
                return (String) tuple2._1();
            }, ClassTag$.MODULE$.apply(String.class)).map(tuple22 -> {
                return (Iterable) ((TraversableLike) tuple22._2()).map(tuple22 -> {
                    return JSONObject.fromObject(tuple22._2());
                }, Iterable$.MODULE$.canBuildFrom());
            }, ClassTag$.MODULE$.apply(Iterable.class)).map(iterable -> {
                return (JSONObject) ((Seq) iterable.toSeq().sortBy(jSONObject2 -> {
                    return BoxesRunTime.boxToLong($anonfun$run$11(jSONObject2));
                }, Ordering$Long$.MODULE$)).last();
            }, ClassTag$.MODULE$.apply(JSONObject.class));
            Map map3 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) map2.map(jSONObject2 -> {
                return new TableMetaInfo(_getInfoFromMeta$1(jSONObject2, "databaseName"), _getInfoFromMeta$1(jSONObject2, "tableName"), _getInfoFromMeta$1(jSONObject2, "schema"));
            }, ClassTag$.MODULE$.apply(TableMetaInfo.class)).distinct().collect())).zipWithIndex(Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Tuple2.class))))).toMap(Predef$.MODULE$.$conforms());
            RDD filter = map2.filter(jSONObject3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$run$17(jSONObject3));
            });
            if (filter.count() > 0) {
                saveToSink$1(filter, UpsertTableInDelta$.MODULE$.OPERATION_TYPE_UPSERT(), map3, map, sparkSession, create, seq);
            } else {
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            RDD filter2 = map2.filter(jSONObject4 -> {
                return BoxesRunTime.boxToBoolean($anonfun$run$18(jSONObject4));
            });
            if (filter2.count() > 0) {
                saveToSink$1(filter2, UpsertTableInDelta$.MODULE$.OPERATION_TYPE_DELETE(), map3, map, sparkSession, create, seq);
            }
        } finally {
            ((Dataset) create.elem).unpersist();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final String _getInfoFromMeta$1(JSONObject jSONObject, String str) {
        return jSONObject.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY()).getString(str);
    }

    public static final /* synthetic */ long $anonfun$run$11(JSONObject jSONObject) {
        return jSONObject.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY()).getLong("timestamp");
    }

    public static final /* synthetic */ boolean $anonfun$run$14(TableMetaInfo tableMetaInfo, JSONObject jSONObject) {
        String _getInfoFromMeta$1 = _getInfoFromMeta$1(jSONObject, "databaseName");
        String db = tableMetaInfo.db();
        if (_getInfoFromMeta$1 != null ? _getInfoFromMeta$1.equals(db) : db == null) {
            String _getInfoFromMeta$12 = _getInfoFromMeta$1(jSONObject, "tableName");
            String table = tableMetaInfo.table();
            if (_getInfoFromMeta$12 != null ? _getInfoFromMeta$12.equals(table) : table == null) {
                return true;
            }
        }
        return false;
    }

    private static final StructType deserializeSchema$1(String str) {
        StructType structType = (DataType) Try$.MODULE$.apply(() -> {
            return DataType$.MODULE$.fromJson(str);
        }).get();
        if (structType instanceof StructType) {
            return structType;
        }
        throw new RuntimeException(new StringBuilder(27).append("Failed parsing StructType: ").append(str).toString());
    }

    private static final scala.collection.immutable.Iterable saveToSink$1(RDD rdd, String str, Map map, Map map2, SparkSession sparkSession, ObjectRef objectRef, Seq seq) {
        return (scala.collection.immutable.Iterable) map.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TableMetaInfo tableMetaInfo = (TableMetaInfo) tuple2._1();
            Dataset select = sparkSession.createDataset(rdd.filter(jSONObject -> {
                return BoxesRunTime.boxToBoolean($anonfun$run$14(tableMetaInfo, jSONObject));
            }).map(jSONObject2 -> {
                jSONObject2.remove(MLSQLMultiDeltaOptions$.MODULE$.META_KEY());
                return jSONObject2.toString();
            }, ClassTag$.MODULE$.apply(String.class)), sparkSession.implicits().newStringEncoder()).toDF(Predef$.MODULE$.wrapRefArray(new String[]{"value"})).select(Predef$.MODULE$.wrapRefArray(new Column[]{new Column(new JsonToStructs(deserializeSchema$1(tableMetaInfo.schema()), map2, functions$.MODULE$.col("value").expr(), None$.MODULE$)).as("data")})).select("data.*", Predef$.MODULE$.wrapRefArray(new String[0]));
            String replace = ((String) map2.apply(MLSQLMultiDeltaOptions$.MODULE$.FULL_PATH_KEY())).replace("{db}", tableMetaInfo.db()).replace("{table}", tableMetaInfo.table());
            DeltaLog forTable = DeltaLog$.MODULE$.forTable(sparkSession, replace);
            if (forTable.snapshot().version() < 0) {
                throw new RuntimeException(new StringBuilder(17).append(replace).append(" is not initialed").toString());
            }
            return new UpsertTableInDelta(select, new Some(SaveMode.Append), None$.MODULE$, forTable, new DeltaOptions(Predef$.MODULE$.Map().apply(Nil$.MODULE$), ((Dataset) objectRef.elem).sparkSession().sessionState().conf()), Nil$.MODULE$, map2.$plus$plus(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(UpsertTableInDelta$.MODULE$.OPERATION_TYPE()), str), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(UpsertTableInDelta$.MODULE$.ID_COLS()), seq.mkString(","))}))), UpsertTableInDelta$.MODULE$.$lessinit$greater$default$8()).run(sparkSession);
        }, scala.collection.immutable.Iterable$.MODULE$.canBuildFrom());
    }

    public static final /* synthetic */ boolean $anonfun$run$17(JSONObject jSONObject) {
        String string = jSONObject.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY()).getString("type");
        return string != null ? !string.equals("delete") : "delete" != 0;
    }

    public static final /* synthetic */ boolean $anonfun$run$18(JSONObject jSONObject) {
        String string = jSONObject.getJSONObject(MLSQLMultiDeltaOptions$.MODULE$.META_KEY()).getString("type");
        return string != null ? string.equals("delete") : "delete" == 0;
    }

    private BinlogSyncToDelta$() {
        MODULE$ = this;
        DeltaCommandsFun.$init$(this);
    }
}
