/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.spark.sql.streaming;

import java.io.Serializable;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.streaming.MetadataLog;
import org.apache.spark.sql.execution.streaming.Sink;
import org.apache.spark.sql.types.StructType;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.spark.sql.streaming.EsCommitProtocol;
import org.elasticsearch.spark.sql.streaming.EsSinkMetadataLog;
import org.elasticsearch.spark.sql.streaming.EsSinkStatus;
import org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter;
import org.elasticsearch.spark.sql.streaming.JobState;
import org.elasticsearch.spark.sql.streaming.NullMetadataLog;
import org.elasticsearch.spark.sql.streaming.SparkSqlStreamingConfigs$;
import org.elasticsearch.spark.sql.streaming.TaskCommit;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.collection.Iterator;
import scala.collection.Seq;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001U4A!\u0003\u0006\u0001+!A\u0011\u0006\u0001B\u0001B\u0003%!\u0006\u0003\u0005/\u0001\t\u0005\t\u0015!\u00030\u0011\u00159\u0004\u0001\"\u00019\u0011\u001di\u0004A1A\u0005\nyBaa\u0012\u0001!\u0002\u0013y\u0004b\u0002%\u0001\u0005\u0004%I!\u0013\u0005\u0007-\u0002\u0001\u000b\u0011\u0002&\t\u000b]\u0003A\u0011\t-\u0003/\u0015\u001b8\u000b]1sWN\u000bHn\u0015;sK\u0006l\u0017N\\4TS:\\'BA\u0006\r\u0003%\u0019HO]3b[&twM\u0003\u0002\u000e\u001d\u0005\u00191/\u001d7\u000b\u0005=\u0001\u0012!B:qCJ\\'BA\t\u0013\u00035)G.Y:uS\u000e\u001cX-\u0019:dQ*\t1#A\u0002pe\u001e\u001c\u0001aE\u0002\u0001-y\u0001\"a\u0006\u000f\u000e\u0003aQ!!\u0007\u000e\u0002\t1\fgn\u001a\u0006\u00027\u0005!!.\u0019<b\u0013\ti\u0002D\u0001\u0004PE*,7\r\u001e\t\u0003?\u001dj\u0011\u0001\t\u0006\u0003\u0017\u0005R!AI\u0012\u0002\u0013\u0015DXmY;uS>t'BA\u0007%\u0015\tyQE\u0003\u0002'%\u00051\u0011\r]1dQ\u0016L!\u0001\u000b\u0011\u0003\tMKgn[\u0001\rgB\f'o[*fgNLwN\u001c\t\u0003W1j\u0011aI\u0005\u0003[\r\u0012Ab\u00159be.\u001cVm]:j_:\f\u0001b]3ui&twm\u001d\t\u0003aUj\u0011!\r\u0006\u0003eM\n1a\u00194h\u0015\t!\u0004#\u0001\u0004iC\u0012|w\u000e]\u0005\u0003mE\u0012\u0001bU3ui&twm]\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0007eZD\b\u0005\u0002;\u00015\t!\u0002C\u0003*\u0007\u0001\u0007!\u0006C\u0003/\u0007\u0001\u0007q&\u0001\u0004m_\u001e<WM]\u000b\u0002\u007fA\u0011\u0001)R\u0007\u0002\u0003*\u0011!iQ\u0001\bY><w-\u001b8h\u0015\t!U%A\u0004d_6lwN\\:\n\u0005\u0019\u000b%a\u0001'pO\u00069An\\4hKJ\u0004\u0013\u0001C<sSR,Gj\\4\u0016\u0003)\u00032aH&N\u0013\ta\u0005EA\u0006NKR\fG-\u0019;b\u0019><\u0007c\u0001(R'6\tqJC\u0001Q\u0003\u0015\u00198-\u00197b\u0013\t\u0011vJA\u0003BeJ\f\u0017\u0010\u0005\u0002;)&\u0011QK\u0003\u0002\r\u000bN\u001c\u0016N\\6Ti\u0006$Xo]\u0001\noJLG/\u001a'pO\u0002\n\u0001\"\u00193e\u0005\u0006$8\r\u001b\u000b\u00043r\u000b\u0007C\u0001([\u0013\tYvJ\u0001\u0003V]&$\b\"B/\t\u0001\u0004q\u0016a\u00022bi\u000eD\u0017\n\u001a\t\u0003\u001d~K!\u0001Y(\u0003\t1{gn\u001a\u0005\u0006E\"\u0001\raY\u0001\u0005I\u0006$\u0018\r\u0005\u0002ee:\u0011Q\r\u001d\b\u0003M>t!a\u001a8\u000f\u0005!lgBA5m\u001b\u0005Q'BA6\u0015\u0003\u0019a$o\\8u}%\t1#\u0003\u0002'%%\u0011q\"J\u0005\u0003\u001b\u0011J!!]\u0012\u0002\u000fA\f7m[1hK&\u00111\u000f\u001e\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T!!]\u0012")
public class EsSparkSqlStreamingSink
implements Sink {
    private final Settings settings;
    private final Log logger;
    private final MetadataLog<EsSinkStatus[]> writeLog;

    public String name() {
        return Sink.name$((Sink)this);
    }

    public StructType schema() {
        return Sink.schema$((Sink)this);
    }

    public Set<TableCapability> capabilities() {
        return Sink.capabilities$((Sink)this);
    }

    private Log logger() {
        return this.logger;
    }

    private MetadataLog<EsSinkStatus[]> writeLog() {
        return this.writeLog;
    }

    public void addBatch(long batchId, Dataset<Row> data) {
        if (batchId <= BoxesRunTime.unboxToLong((Object)this.writeLog().getLatest().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)x$1._1$mcJ$sp())).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> -1L))) {
            this.logger().info((Object)new StringBuilder(35).append("Skipping already committed batch [").append(batchId).append("]").toString());
        } else {
            EsCommitProtocol commitProtocol = new EsCommitProtocol(this.writeLog());
            QueryExecution queryExecution = data.queryExecution();
            StructType schema = data.schema();
            SQLExecution$.MODULE$.withNewExecutionId(queryExecution, SQLExecution$.MODULE$.withNewExecutionId$default$2(), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
                SparkSession sparkSession = queryExecution.sparkSession();
                String queryName = (String)SparkSqlStreamingConfigs$.MODULE$.getQueryName($this.settings).getOrElse((Function0 & Serializable & scala.Serializable)() -> UUID.randomUUID().toString());
                JobState jobState = new JobState(queryName, batchId);
                commitProtocol.initJob(jobState);
                try {
                    String serializedSettings = $this.settings.save();
                    TaskCommit[] taskCommits = (TaskCommit[])sparkSession.sparkContext().runJob(queryExecution.toRdd(), (Function2 & Serializable & scala.Serializable)(taskContext, iter) -> new EsStreamQueryWriter(serializedSettings, schema, commitProtocol).run((TaskContext)taskContext, (Iterator<InternalRow>)iter), ClassTag$.MODULE$.apply(TaskCommit.class));
                    commitProtocol.commitJob(jobState, (Seq<TaskCommit>)Predef$.MODULE$.wrapRefArray((Object[])taskCommits));
                }
                catch (Throwable t) {
                    commitProtocol.abortJob(jobState);
                    throw t;
                }
            });
        }
    }

    public EsSparkSqlStreamingSink(SparkSession sparkSession, Settings settings) {
        Object object;
        this.settings = settings;
        Sink.$init$((Sink)this);
        this.logger = LogFactory.getLog(EsSparkSqlStreamingSink.class);
        if (SparkSqlStreamingConfigs$.MODULE$.getSinkLogEnabled(settings)) {
            String logPath = SparkSqlStreamingConfigs$.MODULE$.constructCommitLogPath(settings);
            this.logger().info((Object)new StringBuilder(20).append("Using log path of [").append(logPath).append("]").toString());
            object = new EsSinkMetadataLog(settings, sparkSession, logPath);
        } else {
            this.logger().warn((Object)"EsSparkSqlStreamingSink is continuing without write commit log. Be advised that data may be duplicated!");
            object = new NullMetadataLog<EsSinkStatus[]>();
        }
        this.writeLog = object;
    }
}

