package org.apache.spark.sql.execution.streaming;

import java.util.Optional;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder$;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution$;
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2;
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
import org.apache.spark.sql.streaming.StreamingQueryStatus;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.util.Clock;
import org.apache.xbean.asm6.Opcodes;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Product;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.immutable.MapLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.NonLocalReturnControl;

/* compiled from: MicroBatchExecution.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\rd\u0001B\u0016-\u0001eB\u0011B\u0010\u0001\u0003\u0002\u0003\u0006IaP\"\t\u0013\u0011\u0003!\u0011!Q\u0001\n\u0015\u0013\u0006\u0002C*\u0001\u0005\u0003\u0005\u000b\u0011B#\t\u0011Q\u0003!\u0011!Q\u0001\nUC\u0011b\u0018\u0001\u0003\u0002\u0003\u0006I\u0001Y2\t\u0013\u0011\u0004!\u0011!Q\u0001\n\u0015T\u0007\"C6\u0001\u0005\u0003\u0005\u000b\u0011\u00027s\u0011%\u0019\bA!A!\u0002\u0013!x\u000f\u0003\u0005y\u0001\t\u0005\t\u0015!\u0003z\u0011!a\bA!A!\u0002\u0013i\bbBA\u0002\u0001\u0011\u0005\u0011Q\u0001\u0005\n\u0003;\u0001\u0001\u0019!C\t\u0003?A\u0011\"!\u000f\u0001\u0001\u0004%\t\"a\u000f\t\u0011\u0005\u001d\u0003\u0001)Q\u0005\u0003CA\u0011\"!\u0015\u0001\u0005\u0004%I!a\u0015\t\u0011\u0005\u0015\u0005\u0001)A\u0005\u0003+B\u0011\"a\"\u0001\u0005\u0004%I!!#\t\u0011\u0005\r\u0006\u0001)A\u0005\u0003\u0017C1\"!*\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002(\"Y\u0011q\u0016\u0001A\u0002\u0003\u0007I\u0011BAY\u0011-\t)\f\u0001a\u0001\u0002\u0003\u0006K!!+\t\u0015\u0005]\u0006\u0001#b\u0001\n\u0003\nI\fC\u0005\u0002<\u0002\u0001\r\u0011\"\u0003\u0002>\"I\u0011q\u0018\u0001A\u0002\u0013%\u0011\u0011\u0019\u0005\b\u0003\u000b\u0004\u0001\u0015)\u0003~\u0011\u001d\t9\r\u0001C!\u0003\u0013Dq!a3\u0001\t#\ti\rC\u0004\u0002T\u0002!I!!6\t\u000f\u0005m\u0007\u0001\"\u0003\u0002>\"9\u0011Q\u001c\u0001\u0005\n\u0005}\u0007bBAs\u0001\u0011%\u0011q\u001d\u0005\t\u0003[\u0004A\u0011\u0001\u0019\u0002p\"9!1\u0003\u0001\u0005\n\tU\u0001\"\u0004B\u001b\u0001A\u0005\u0019\u0011!A\u0005\n\t]2\tC\u0007\u0003:\u0001\u0001\n1!A\u0001\n\u0013\u0011YD\u001d\u0005\u000e\u0005{\u0001\u0001\u0013aA\u0001\u0002\u0013%!qH<\t\u001b\t\u0005\u0003\u0001%A\u0002\u0002\u0003%IAa\u0011d\u000f\u001d\u0011)\u0005\fE\u0001\u0005\u000f2aa\u000b\u0017\t\u0002\t%\u0003bBA\u0002O\u0011\u0005!\u0011\u000b\u0005\n\u0005':#\u0019!C\u0001\u0005+B\u0001B!\u0019(A\u0003%!q\u000b\u0002\u0014\u001b&\u001c'o\u001c\"bi\u000eDW\t_3dkRLwN\u001c\u0006\u0003[9\n\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005=\u0002\u0014!C3yK\u000e,H/[8o\u0015\t\t$'A\u0002tc2T!a\r\u001b\u0002\u000bM\u0004\u0018M]6\u000b\u0005U2\u0014AB1qC\u000eDWMC\u00018\u0003\ry'oZ\u0002\u0001'\t\u0001!\b\u0005\u0002<y5\tA&\u0003\u0002>Y\ty1\u000b\u001e:fC6,\u00050Z2vi&|g.\u0001\u0007ta\u0006\u00148nU3tg&|g\u000e\u0005\u0002A\u00036\t\u0001'\u0003\u0002Ca\ta1\u000b]1sWN+7o]5p]&\u0011a\bP\u0001\u0005]\u0006lW\r\u0005\u0002G\u001f:\u0011q)\u0014\t\u0003\u0011.k\u0011!\u0013\u0006\u0003\u0015b\na\u0001\u0010:p_Rt$\"\u0001'\u0002\u000bM\u001c\u0017\r\\1\n\u00059[\u0015A\u0002)sK\u0012,g-\u0003\u0002Q#\n11\u000b\u001e:j]\u001eT!AT&\n\u0005\u0011c\u0014AD2iK\u000e\\\u0007o\\5oiJ{w\u000e^\u0001\rC:\fG.\u001f>fIBc\u0017M\u001c\t\u0003-vk\u0011a\u0016\u0006\u00031f\u000bq\u0001\\8hS\u000e\fGN\u0003\u0002[7\u0006)\u0001\u000f\\1og*\u0011A\fM\u0001\tG\u0006$\u0018\r\\=ti&\u0011al\u0016\u0002\f\u0019><\u0017nY1m!2\fg.\u0001\u0003tS:\\\u0007CA\u001eb\u0013\t\u0011GFA\tCCN,7\u000b\u001e:fC6LgnZ*j].L!a\u0018\u001f\u0002\u000fQ\u0014\u0018nZ4feB\u0011a\r[\u0007\u0002O*\u0011Q\u0006M\u0005\u0003S\u001e\u0014q\u0001\u0016:jO\u001e,'/\u0003\u0002ey\u0005aAO]5hO\u0016\u00148\t\\8dWB\u0011Q\u000e]\u0007\u0002]*\u0011qNM\u0001\u0005kRLG.\u0003\u0002r]\n)1\t\\8dW&\u00111\u000eP\u0001\u000b_V$\b/\u001e;N_\u0012,\u0007C\u00014v\u0013\t1xM\u0001\u0006PkR\u0004X\u000f^'pI\u0016L!a\u001d\u001f\u0002\u0019\u0015DHO]1PaRLwN\\:\u0011\t\u0019SX)R\u0005\u0003wF\u00131!T1q\u0003Y!W\r\\3uK\u000eCWmY6q_&tGo\u00148Ti>\u0004\bC\u0001@��\u001b\u0005Y\u0015bAA\u0001\u0017\n9!i\\8mK\u0006t\u0017A\u0002\u001fj]&$h\b\u0006\f\u0002\b\u0005%\u00111BA\u0007\u0003\u001f\t\t\"a\u0005\u0002\u0016\u0005]\u0011\u0011DA\u000e!\tY\u0004\u0001C\u0003?\u0017\u0001\u0007q\bC\u0003E\u0017\u0001\u0007Q\tC\u0003T\u0017\u0001\u0007Q\tC\u0003U\u0017\u0001\u0007Q\u000bC\u0003`\u0017\u0001\u0007\u0001\rC\u0003e\u0017\u0001\u0007Q\rC\u0003l\u0017\u0001\u0007A\u000eC\u0003t\u0017\u0001\u0007A\u000fC\u0003y\u0017\u0001\u0007\u0011\u0010C\u0003}\u0017\u0001\u0007Q0A\u0004t_V\u00148-Z:\u0016\u0005\u0005\u0005\u0002CBA\u0012\u0003[\t\u0019D\u0004\u0003\u0002&\u0005%bb\u0001%\u0002(%\tA*C\u0002\u0002,-\u000bq\u0001]1dW\u0006<W-\u0003\u0003\u00020\u0005E\"aA*fc*\u0019\u00111F&\u0011\u0007m\n)$C\u0002\u000281\u00121CQ1tKN#(/Z1nS:<7k\\;sG\u0016\f1b]8ve\u000e,7o\u0018\u0013fcR!\u0011QHA\"!\rq\u0018qH\u0005\u0004\u0003\u0003Z%\u0001B+oSRD\u0011\"!\u0012\u000e\u0003\u0003\u0005\r!!\t\u0002\u0007a$\u0013'\u0001\u0005t_V\u00148-Z:!Q\rq\u00111\n\t\u0004}\u00065\u0013bAA(\u0017\nAao\u001c7bi&dW-A\u000bsK\u0006$WM\u001d+p\t\u0006$\u0018mU8ve\u000e,W*\u00199\u0016\u0005\u0005U\u0003\u0003CA,\u0003C\n\u0019'a\u001e\u000e\u0005\u0005e#\u0002BA.\u0003;\nq!\\;uC\ndWMC\u0002\u0002`-\u000b!bY8mY\u0016\u001cG/[8o\u0013\rY\u0018\u0011\f\t\u0005\u0003K\n\u0019(\u0004\u0002\u0002h)\u0019Q&!\u001b\u000b\t\u0005-\u0014QN\u0001\u0007e\u0016\fG-\u001a:\u000b\t\u0005=\u0014\u0011O\u0001\u0003mJR1!!\b1\u0013\u0011\t)(a\u001a\u0003!5K7M]8CCR\u001c\u0007NU3bI\u0016\u0014\bC\u0002@\u0002z\u0005u\u00140C\u0002\u0002|-\u0013a\u0001V;qY\u0016\u0014\u0004\u0003BA@\u0003\u0003k!!!\u001c\n\t\u0005\r\u0015Q\u000e\u0002\r\t\u0006$\u0018mU8ve\u000e,gKM\u0001\u0017e\u0016\fG-\u001a:U_\u0012\u000bG/Y*pkJ\u001cW-T1qA\u0005yAO]5hO\u0016\u0014X\t_3dkR|'/\u0006\u0002\u0002\fJA\u0011QRAI\u0003/\u000biJ\u0002\u0004\u0002\u0010\u0002\u0001\u00111\u0012\u0002\ryI,g-\u001b8f[\u0016tGO\u0010\t\u0004}\u0006M\u0015bAAK\u0017\n9\u0001K]8ek\u000e$\bc\u0001@\u0002\u001a&\u0019\u00111T&\u0003\u0019M+'/[1mSj\f'\r\\3\u0011\u0007m\ny*C\u0002\u0002\"2\u0012q\u0002\u0016:jO\u001e,'/\u0012=fGV$xN]\u0001\u0011iJLwmZ3s\u000bb,7-\u001e;pe\u0002\n\u0001c^1uKJl\u0017M]6Ue\u0006\u001c7.\u001a:\u0016\u0005\u0005%\u0006cA\u001e\u0002,&\u0019\u0011Q\u0016\u0017\u0003!]\u000bG/\u001a:nCJ\\GK]1dW\u0016\u0014\u0018\u0001F<bi\u0016\u0014X.\u0019:l)J\f7m[3s?\u0012*\u0017\u000f\u0006\u0003\u0002>\u0005M\u0006\"CA#)\u0005\u0005\t\u0019AAU\u0003E9\u0018\r^3s[\u0006\u00148\u000e\u0016:bG.,'\u000fI\u0001\fY><\u0017nY1m!2\fg.F\u0001V\u0003eI7oQ;se\u0016tGOQ1uG\"\u001cuN\\:ueV\u001cG/\u001a3\u0016\u0003u\fQ$[:DkJ\u0014XM\u001c;CCR\u001c\u0007nQ8ogR\u0014Xo\u0019;fI~#S-\u001d\u000b\u0005\u0003{\t\u0019\r\u0003\u0005\u0002Fa\t\t\u00111\u0001~\u0003iI7oQ;se\u0016tGOQ1uG\"\u001cuN\\:ueV\u001cG/\u001a3!\u0003\u0011\u0019Ho\u001c9\u0015\u0005\u0005u\u0012A\u0005:v]\u0006\u001bG/\u001b<bi\u0016$7\u000b\u001e:fC6$B!!\u0010\u0002P\"1\u0011\u0011[\u000eA\u0002}\nQc\u001d9be.\u001cVm]:j_:4uN]*ue\u0016\fW.\u0001\u000bq_B,H.\u0019;f'R\f'\u000f^(gMN,Go\u001d\u000b\u0005\u0003{\t9\u000e\u0003\u0004\u0002Zr\u0001\raP\u0001\u0019gB\f'o[*fgNLwN\u001c+p%Vt')\u0019;dQ\u0016\u001c\u0018AE5t\u001d\u0016<H)\u0019;b\u0003Z\f\u0017\u000e\\1cY\u0016\f!cY8ogR\u0014Xo\u0019;OKb$()\u0019;dQR\u0019Q0!9\t\r\u0005\rh\u00041\u0001~\u0003Qqw\u000eR1uC\n\u000bGo\u00195fg\u0016s\u0017M\u00197fI\u0006A!/\u001e8CCR\u001c\u0007\u000e\u0006\u0003\u0002>\u0005%\bBBAv?\u0001\u0007q(\u0001\fta\u0006\u00148nU3tg&|g\u000eV8Sk:\u0014\u0015\r^2i\u0003I9\u0018\u000e\u001e5Qe><'/Z:t\u0019>\u001c7.\u001a3\u0016\t\u0005E\u0018q\u001f\u000b\u0005\u0003g\u0014I\u0001\u0005\u0003\u0002v\u0006]H\u0002\u0001\u0003\b\u0003s\u0004#\u0019AA~\u0005\u0005!\u0016\u0003BA\u007f\u0005\u0007\u00012A`A��\u0013\r\u0011\ta\u0013\u0002\b\u001d>$\b.\u001b8h!\rq(QA\u0005\u0004\u0005\u000fY%aA!os\"A!1\u0002\u0011\u0005\u0002\u0004\u0011i!A\u0001g!\u0015q(qBAz\u0013\r\u0011\tb\u0013\u0002\ty\tLh.Y7f}\u00051Ao\u001c&bm\u0006$BAa\u0006\u0003,A1!\u0011\u0004B\u0011\u0005Ki!Aa\u0007\u000b\u0007=\u0014iB\u0003\u0002\u0003 \u0005!!.\u0019<b\u0013\u0011\u0011\u0019Ca\u0007\u0003\u0011=\u0003H/[8oC2\u0004B!!\u001a\u0003(%!!\u0011FA4\u0005\u0019yeMZ:fi\"9!QF\u0011A\u0002\t=\u0012aC:dC2\fw\n\u001d;j_:\u0004RA B\u0019\u0005KI1Aa\rL\u0005\u0019y\u0005\u000f^5p]\u0006\u00112/\u001e9fe\u0012\u001a\b/\u0019:l'\u0016\u001c8/[8o+\u0005y\u0014AE:va\u0016\u0014H\u0005\u001e:jO\u001e,'o\u00117pG.,\u0012\u0001\\\u0001\u0011gV\u0004XM\u001d\u0013pkR\u0004X\u000f^'pI\u0016,\u0012\u0001^\u0001\u000bgV\u0004XM\u001d\u0013tS:\\W#\u00011\u0002'5K7M]8CCR\u001c\u0007.\u0012=fGV$\u0018n\u001c8\u0011\u0005m:3cA\u0014\u0003LA\u0019aP!\u0014\n\u0007\t=3J\u0001\u0004B]f\u0014VM\u001a\u000b\u0003\u0005\u000f\nABQ!U\u0007\"{\u0016\nR0L\u000bf+\"Aa\u0016\u0011\t\te#qL\u0007\u0003\u00057RAA!\u0018\u0003\u001e\u0005!A.\u00198h\u0013\r\u0001&1L\u0001\u000e\u0005\u0006#6\tS0J\t~[U)\u0017\u0011")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/MicroBatchExecution.class */
public class MicroBatchExecution extends StreamExecution {
    private LogicalPlan logicalPlan;
    private final LogicalPlan analyzedPlan;
    private final Map<String, String> extraOptions;
    private volatile Seq<BaseStreamingSource> sources;
    private final scala.collection.mutable.Map<MicroBatchReader, Tuple2<DataSourceV2, Map<String, String>>> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$readerToDataSourceMap;
    private final Product triggerExecutor;
    private WatermarkTracker watermarkTracker;
    private boolean isCurrentBatchConstructed;
    private volatile boolean bitmap$0;

    public static String BATCH_ID_KEY() {
        return MicroBatchExecution$.MODULE$.BATCH_ID_KEY();
    }

    public /* synthetic */ SparkSession org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession() {
        return super.sparkSession();
    }

    private /* synthetic */ Clock super$triggerClock() {
        return super.triggerClock();
    }

    private /* synthetic */ OutputMode super$outputMode() {
        return super.outputMode();
    }

    private /* synthetic */ BaseStreamingSink super$sink() {
        return super.sink();
    }

    @Override // org.apache.spark.sql.execution.streaming.ProgressReporter
    public Seq<BaseStreamingSource> sources() {
        return this.sources;
    }

    public void sources_$eq(Seq<BaseStreamingSource> seq) {
        this.sources = seq;
    }

    public scala.collection.mutable.Map<MicroBatchReader, Tuple2<DataSourceV2, Map<String, String>>> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$readerToDataSourceMap() {
        return this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$readerToDataSourceMap;
    }

    private Product triggerExecutor() {
        return this.triggerExecutor;
    }

    private WatermarkTracker watermarkTracker() {
        return this.watermarkTracker;
    }

    private void watermarkTracker_$eq(WatermarkTracker watermarkTracker) {
        this.watermarkTracker = watermarkTracker;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [org.apache.spark.sql.execution.streaming.MicroBatchExecution] */
    private LogicalPlan logicalPlan$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                Predef$.MODULE$.assert(queryExecutionThread() == Thread.currentThread(), () -> {
                    return new StringBuilder(83).append("logicalPlan must be initialized in QueryExecutionThread ").append("but the current thread was ").append(Thread.currentThread()).toString();
                });
                LongRef create = LongRef.create(0L);
                LogicalPlan transform = this.analyzedPlan.transform(new MicroBatchExecution$$anonfun$1(this, Map$.MODULE$.apply(Nil$.MODULE$), create, super.sparkSession().sqlContext().conf().disabledV2StreamingMicroBatchReaders().split(StringUtils.COMMA_STR), Map$.MODULE$.apply(Nil$.MODULE$)));
                sources_$eq(transform.collect(new MicroBatchExecution$$anonfun$logicalPlan$lzycompute$1(null)));
                uniqueSources_$eq((Seq) sources().distinct());
                this.logicalPlan = transform;
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.logicalPlan;
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution, org.apache.spark.sql.execution.streaming.ProgressReporter
    public LogicalPlan logicalPlan() {
        return !this.bitmap$0 ? logicalPlan$lzycompute() : this.logicalPlan;
    }

    private boolean isCurrentBatchConstructed() {
        return this.isCurrentBatchConstructed;
    }

    private void isCurrentBatchConstructed_$eq(boolean z) {
        this.isCurrentBatchConstructed = z;
    }

    @Override // org.apache.spark.sql.streaming.StreamingQuery
    public void stop() {
        state().set(TERMINATED$.MODULE$);
        if (queryExecutionThread().isAlive()) {
            super.sparkSession().sparkContext().cancelJobGroup(runId().toString());
            queryExecutionThread().interrupt();
            queryExecutionThread().join();
            super.sparkSession().sparkContext().cancelJobGroup(runId().toString());
        }
        logInfo(() -> {
            return new StringBuilder(18).append("Query ").append(this.prettyIdString()).append(" was stopped").toString();
        });
    }

    @Override // org.apache.spark.sql.execution.streaming.StreamExecution
    public void runActivatedStream(SparkSession sparkSession) {
        boolean streamingNoDataMicroBatchesEnabled = sparkSession.sessionState().conf().streamingNoDataMicroBatchesEnabled();
        triggerExecutor().execute(() -> {
            if (this.isActive()) {
                BooleanRef create = BooleanRef.create(false);
                this.startTrigger();
                this.reportTimeTaken("triggerExecution", () -> {
                    if (this.currentBatchId() < 0) {
                        this.populateStartOffsets(sparkSession);
                        this.logInfo(() -> {
                            return new StringBuilder(20).append("Stream started from ").append(this.committedOffsets()).toString();
                        });
                    }
                    this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$super$sparkSession().sparkContext().setJobDescription(this.getBatchDescriptionString());
                    if (!this.isCurrentBatchConstructed()) {
                        this.isCurrentBatchConstructed_$eq(this.constructNextBatch(streamingNoDataMicroBatchesEnabled));
                    }
                    this.recordTriggerOffsets(this.committedOffsets(), this.availableOffsets());
                    create.elem = this.isNewDataAvailable();
                    StreamingQueryStatus currentStatus = this.currentStatus();
                    this.currentStatus_$eq(currentStatus.copy(currentStatus.copy$default$1(), this.isNewDataAvailable(), currentStatus.copy$default$3()));
                    if (!this.isCurrentBatchConstructed()) {
                        this.updateStatusMessage("Waiting for data to arrive");
                        return;
                    }
                    if (create.elem) {
                        this.updateStatusMessage("Processing new data");
                    } else {
                        this.updateStatusMessage("No new data but cleaning up state");
                    }
                    this.runBatch(sparkSession);
                });
                this.finishTrigger(create.elem);
                this.withProgressLocked(() -> {
                    this.awaitProgressLockCondition().signalAll();
                });
                if (this.isCurrentBatchConstructed()) {
                    this.currentBatchId_$eq(this.currentBatchId() + 1);
                    this.isCurrentBatchConstructed_$eq(false);
                } else {
                    Thread.sleep(this.pollingDelayMs());
                }
            }
            this.updateStatusMessage("Waiting for next trigger");
            return this.isActive();
        });
    }

    private void populateStartOffsets(SparkSession sparkSession) {
        Tuple2 tuple2;
        Tuple2 tuple22;
        BoxedUnit boxedUnit;
        Some latest = offsetLog().getLatest();
        if (!(latest instanceof Some) || (tuple2 = (Tuple2) latest.value()) == null) {
            if (!None$.MODULE$.equals(latest)) {
                throw new MatchError(latest);
            }
            logInfo(() -> {
                return "Starting new streaming query.";
            });
            currentBatchId_$eq(0L);
            watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSession.conf()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        long _1$mcJ$sp = tuple2._1$mcJ$sp();
        OffsetSeq offsetSeq = (OffsetSeq) tuple2._2();
        currentBatchId_$eq(_1$mcJ$sp);
        isCurrentBatchConstructed_$eq(true);
        availableOffsets_$eq(offsetSeq.toStreamProgress(sources()));
        if (_1$mcJ$sp != 0) {
            committedOffsets_$eq(((OffsetSeq) offsetLog().get(_1$mcJ$sp - 1).getOrElse(() -> {
                throw new IllegalStateException(new StringBuilder(20).append("batch ").append(_1$mcJ$sp - 1).append(" doesn't exist").toString());
            })).toStreamProgress(sources()));
        }
        offsetSeq.metadata().foreach(offsetSeqMetadata -> {
            $anonfun$populateStartOffsets$2(this, sparkSession, offsetSeqMetadata);
            return BoxedUnit.UNIT;
        });
        Some latest2 = commitLog().getLatest();
        if ((latest2 instanceof Some) && (tuple22 = (Tuple2) latest2.value()) != null) {
            long _1$mcJ$sp2 = tuple22._1$mcJ$sp();
            CommitMetadata commitMetadata = (CommitMetadata) tuple22._2();
            if (_1$mcJ$sp == _1$mcJ$sp2) {
                availableOffsets().foreach(tuple23 -> {
                    Dataset<Row> dataset;
                    if (tuple23 != null) {
                        BaseStreamingSource baseStreamingSource = (BaseStreamingSource) tuple23._1();
                        Offset offset = (Offset) tuple23._2();
                        if (baseStreamingSource instanceof Source) {
                            Source source = (Source) baseStreamingSource;
                            if (offset != null) {
                                dataset = source.getBatch(this.committedOffsets().get((BaseStreamingSource) source), offset);
                                return dataset;
                            }
                        }
                    }
                    dataset = BoxedUnit.UNIT;
                    return dataset;
                });
                currentBatchId_$eq(_1$mcJ$sp2 + 1);
                isCurrentBatchConstructed_$eq(false);
                committedOffsets_$eq(committedOffsets().m22610$plus$plus((GenTraversableOnce<Tuple2<BaseStreamingSource, Offset>>) availableOffsets()));
                watermarkTracker().setWatermark(package$.MODULE$.max(watermarkTracker().currentWatermark(), commitMetadata.nextBatchWatermarkMs()));
                boxedUnit = BoxedUnit.UNIT;
            } else if (_1$mcJ$sp2 == _1$mcJ$sp - 1) {
                availableOffsets().foreach(tuple24 -> {
                    Dataset<Row> dataset;
                    if (tuple24 != null) {
                        BaseStreamingSource baseStreamingSource = (BaseStreamingSource) tuple24._1();
                        Offset offset = (Offset) tuple24._2();
                        if (baseStreamingSource instanceof Source) {
                            Source source = (Source) baseStreamingSource;
                            if (offset != null) {
                                Option<Offset> map = this.committedOffsets().get((BaseStreamingSource) source).map(offset2 -> {
                                    return offset2;
                                });
                                dataset = BoxesRunTime.unboxToBoolean(map.map(offset3 -> {
                                    return BoxesRunTime.boxToBoolean($anonfun$populateStartOffsets$6(offset, offset3));
                                }).getOrElse(() -> {
                                    return true;
                                })) ? source.getBatch(map, offset) : BoxedUnit.UNIT;
                                return dataset;
                            }
                        }
                    }
                    dataset = BoxedUnit.UNIT;
                    return dataset;
                });
                boxedUnit = BoxedUnit.UNIT;
            } else if (_1$mcJ$sp2 < _1$mcJ$sp - 1) {
                logWarning(() -> {
                    return new StringBuilder(79).append("Batch completion log latest batch id is ").append(_1$mcJ$sp2).append(", which is not trailing ").append("batchid ").append(_1$mcJ$sp).append(" by one").toString();
                });
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        } else {
            if (!None$.MODULE$.equals(latest2)) {
                throw new MatchError(latest2);
            }
            logInfo(() -> {
                return "no commit log present";
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
        logInfo(() -> {
            return new StringBuilder(65).append("Resuming at batch ").append(this.currentBatchId()).append(" with committed offsets ").append(this.committedOffsets()).append(" and available offsets ").append(this.availableOffsets()).toString();
        });
        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
    }

    private boolean isNewDataAvailable() {
        return availableOffsets().exists(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNewDataAvailable$1(this, tuple2));
        });
    }

    private boolean constructNextBatch(boolean z) {
        Object obj = new Object();
        try {
            return BoxesRunTime.unboxToBoolean(withProgressLocked(() -> {
                if (this.isCurrentBatchConstructed()) {
                    throw new NonLocalReturnControl.mcZ.sp(obj, true);
                }
                this.availableOffsets_$eq(this.availableOffsets().m22610$plus$plus((GenTraversableOnce<Tuple2<BaseStreamingSource, Offset>>) ((MapLike) ((TraversableOnce) this.uniqueSources().map(baseStreamingSource -> {
                    Tuple2 tuple2;
                    if (baseStreamingSource instanceof Source) {
                        Source source = (Source) baseStreamingSource;
                        this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(source).toString());
                        tuple2 = (Tuple2) this.reportTimeTaken("getOffset", () -> {
                            return new Tuple2(source, source.getOffset());
                        });
                    } else {
                        if (!(baseStreamingSource instanceof MicroBatchReader)) {
                            throw new MatchError(baseStreamingSource);
                        }
                        MicroBatchReader microBatchReader = (MicroBatchReader) baseStreamingSource;
                        this.updateStatusMessage(new StringBuilder(21).append("Getting offsets from ").append(microBatchReader).toString());
                        this.reportTimeTaken("setOffsetRange", () -> {
                            microBatchReader.setOffsetRange(this.toJava(this.availableOffsets().get((BaseStreamingSource) microBatchReader).map(offset -> {
                                return microBatchReader.deserializeOffset(offset.json());
                            })), Optional.empty());
                        });
                        tuple2 = new Tuple2(microBatchReader, Option$.MODULE$.apply((org.apache.spark.sql.sources.v2.reader.streaming.Offset) this.reportTimeTaken("getEndOffset", () -> {
                            return microBatchReader.getEndOffset();
                        })));
                    }
                    return tuple2;
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()).filter(tuple2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$7(tuple2));
                })).mapValues(option -> {
                    return (Offset) option.get();
                })));
                OffsetSeqMetadata offsetSeqMetadata = this.offsetSeqMetadata();
                this.offsetSeqMetadata_$eq(offsetSeqMetadata.copy(this.watermarkTracker().currentWatermark(), this.super$triggerClock().getTimeMillis(), offsetSeqMetadata.copy$default$3()));
                boolean z2 = z && Option$.MODULE$.apply(this.lastExecution()).exists(incrementalExecution -> {
                    return BoxesRunTime.boxToBoolean($anonfun$constructNextBatch$9(this, incrementalExecution));
                });
                boolean z3 = this.isNewDataAvailable() || z2;
                this.logTrace(() -> {
                    return new StringBuilder(Opcodes.LREM).append("noDataBatchesEnabled = ").append(z).append(", ").append("lastExecutionRequiresAnotherBatch = ").append(z2).append(", ").append("isNewDataAvailable = ").append(this.isNewDataAvailable()).append(", ").append("shouldConstructNextBatch = ").append(z3).toString();
                });
                if (z3) {
                    this.updateStatusMessage("Writing offsets to log");
                    this.reportTimeTaken("walCommit", () -> {
                        Predef$.MODULE$.assert(this.offsetLog().add(this.currentBatchId(), this.availableOffsets().toOffsetSeq(this.sources(), this.offsetSeqMetadata())), () -> {
                            return new StringBuilder(67).append("Concurrent update to the log. Multiple streaming jobs detected for ").append(this.currentBatchId()).toString();
                        });
                        this.logInfo(() -> {
                            return new StringBuilder(39).append("Committed offsets for batch ").append(this.currentBatchId()).append(". ").append("Metadata ").append(this.offsetSeqMetadata().toString()).toString();
                        });
                        if (this.currentBatchId() != 0) {
                            Option<OffsetSeq> option2 = this.offsetLog().get(this.currentBatchId() - 1);
                            if (!option2.isDefined()) {
                                throw new IllegalStateException(new StringBuilder(20).append("batch ").append(this.currentBatchId() - 1).append(" doesn't exist").toString());
                            }
                            ((OffsetSeq) option2.get()).toStreamProgress(this.sources()).foreach(tuple22 -> {
                                $anonfun$constructNextBatch$14(tuple22);
                                return BoxedUnit.UNIT;
                            });
                        }
                        if (this.minLogEntriesToMaintain() < this.currentBatchId()) {
                            this.offsetLog().purge(this.currentBatchId() - this.minLogEntriesToMaintain());
                            this.commitLog().purge(this.currentBatchId() - this.minLogEntriesToMaintain());
                        }
                    });
                    this.noNewData_$eq(false);
                } else {
                    this.noNewData_$eq(true);
                    this.awaitProgressLockCondition().signalAll();
                }
                return z3;
            }));
        } catch (NonLocalReturnControl e) {
            if (e.key() == obj) {
                return e.value$mcZ$sp();
            }
            throw e;
        }
    }

    private void runBatch(SparkSession sparkSession) {
        LogicalPlan writeToDataSourceV2;
        logDebug(() -> {
            return new StringBuilder(14).append("Running batch ").append(this.currentBatchId()).toString();
        });
        newData_$eq((Map) reportTimeTaken("getBatch", () -> {
            return (Map) this.availableOffsets().flatMap(tuple2 -> {
                Iterable option2Iterable;
                org.apache.spark.sql.sources.v2.reader.streaming.Offset offset;
                if (tuple2 != null) {
                    BaseStreamingSource baseStreamingSource = (BaseStreamingSource) tuple2._1();
                    Offset offset2 = (Offset) tuple2._2();
                    if (baseStreamingSource instanceof Source) {
                        Source source = (Source) baseStreamingSource;
                        if (BoxesRunTime.unboxToBoolean(this.committedOffsets().get((BaseStreamingSource) source).map(offset3 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$runBatch$4(offset2, offset3));
                        }).getOrElse(() -> {
                            return true;
                        }))) {
                            Option<Offset> option = this.committedOffsets().get((BaseStreamingSource) source);
                            Dataset<Row> batch = source.getBatch(option, offset2);
                            Predef$.MODULE$.assert(batch.isStreaming(), () -> {
                                return new StringBuilder(67).append("DataFrame returned by getBatch from ").append(source).append(" did not have isStreaming=true\n").append(batch.queryExecution().logical()).toString();
                            });
                            this.logDebug(() -> {
                                return new StringBuilder(27).append("Retrieving data from ").append(source).append(": ").append(option).append(" -> ").append(offset2).toString();
                            });
                            option2Iterable = Option$.MODULE$.option2Iterable(new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(source), batch.logicalPlan())));
                            return option2Iterable;
                        }
                    }
                }
                if (tuple2 != null) {
                    BaseStreamingSource baseStreamingSource2 = (BaseStreamingSource) tuple2._1();
                    Offset offset4 = (Offset) tuple2._2();
                    if (baseStreamingSource2 instanceof MicroBatchReader) {
                        MicroBatchReader microBatchReader = (MicroBatchReader) baseStreamingSource2;
                        if (BoxesRunTime.unboxToBoolean(this.committedOffsets().get((BaseStreamingSource) microBatchReader).map(offset5 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$runBatch$8(offset4, offset5));
                        }).getOrElse(() -> {
                            return true;
                        }))) {
                            Option<org.apache.spark.sql.sources.v2.reader.streaming.Offset> map = this.committedOffsets().get((BaseStreamingSource) microBatchReader).map(offset6 -> {
                                return microBatchReader.deserializeOffset(offset6.json());
                            });
                            if (offset4 instanceof SerializedOffset) {
                                offset = microBatchReader.deserializeOffset(((SerializedOffset) offset4).json());
                            } else {
                                if (!(offset4 instanceof org.apache.spark.sql.sources.v2.reader.streaming.Offset)) {
                                    throw new MatchError(offset4);
                                }
                                offset = (org.apache.spark.sql.sources.v2.reader.streaming.Offset) offset4;
                            }
                            org.apache.spark.sql.sources.v2.reader.streaming.Offset offset7 = offset;
                            microBatchReader.setOffsetRange(this.toJava(map), Optional.of(offset7));
                            this.logDebug(() -> {
                                return new StringBuilder(27).append("Retrieving data from ").append(microBatchReader).append(": ").append(map).append(" -> ").append(offset7).toString();
                            });
                            Tuple2 $minus$greater$extension = microBatchReader instanceof MemoryStream ? Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(MemoryStreamDataSource$.MODULE$), Predef$.MODULE$.Map().empty()) : (Tuple2) this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$readerToDataSourceMap().getOrElse(microBatchReader, () -> {
                                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(FakeDataSourceV2$.MODULE$), Predef$.MODULE$.Map().empty());
                            });
                            if ($minus$greater$extension == null) {
                                throw new MatchError($minus$greater$extension);
                            }
                            Tuple2 tuple2 = new Tuple2((DataSourceV2) $minus$greater$extension._1(), (Map) $minus$greater$extension._2());
                            option2Iterable = Option$.MODULE$.option2Iterable(new Some(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(microBatchReader), new StreamingDataSourceV2Relation(microBatchReader.readSchema().toAttributes(), (DataSourceV2) tuple2._1(), (Map) tuple2._2(), microBatchReader))));
                            return option2Iterable;
                        }
                    }
                }
                option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
                return option2Iterable;
            }, scala.collection.immutable.Map$.MODULE$.canBuildFrom());
        }));
        LogicalPlan transformAllExpressions = logicalPlan().transform(new MicroBatchExecution$$anonfun$2(this)).transformAllExpressions(new MicroBatchExecution$$anonfun$3(this));
        BaseStreamingSink sink = super.sink();
        if (sink instanceof Sink) {
            writeToDataSourceV2 = transformAllExpressions;
        } else {
            if (!(sink instanceof StreamWriteSupport)) {
                throw new IllegalArgumentException(new StringBuilder(22).append("unknown sink type for ").append(super.sink()).toString());
            }
            writeToDataSourceV2 = new WriteToDataSourceV2(new MicroBatchWriter(currentBatchId(), ((StreamWriteSupport) sink).createStreamWriter(String.valueOf(runId()), transformAllExpressions.schema(), super.outputMode(), new DataSourceOptions((java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(this.extraOptions).asJava()))), transformAllExpressions);
        }
        LogicalPlan logicalPlan = writeToDataSourceV2;
        sparkSession.sparkContext().setLocalProperty(MicroBatchExecution$.MODULE$.BATCH_ID_KEY(), BoxesRunTime.boxToLong(currentBatchId()).toString());
        sparkSession.sparkContext().setLocalProperty(StreamExecution$.MODULE$.IS_CONTINUOUS_PROCESSING(), BoxesRunTime.boxToBoolean(false).toString());
        reportTimeTaken("queryPlanning", () -> {
            this.lastExecution_$eq(new IncrementalExecution(sparkSession, logicalPlan, this.super$outputMode(), this.checkpointFile("state"), this.runId(), this.currentBatchId(), this.offsetSeqMetadata()));
            return this.lastExecution().executedPlan();
        });
        Dataset dataset = new Dataset(sparkSession, (QueryExecution) lastExecution(), (Encoder) RowEncoder$.MODULE$.apply(lastExecution().analyzed().schema()));
        reportTimeTaken("addBatch", () -> {
            return SQLExecution$.MODULE$.withNewExecutionId(sparkSession, this.lastExecution(), () -> {
                Object collect;
                BaseStreamingSink super$sink = this.super$sink();
                if (super$sink instanceof Sink) {
                    ((Sink) super$sink).addBatch(this.currentBatchId(), dataset);
                    collect = BoxedUnit.UNIT;
                } else {
                    if (!(super$sink instanceof StreamWriteSupport)) {
                        throw new MatchError(super$sink);
                    }
                    collect = dataset.collect();
                }
                return collect;
            });
        });
        withProgressLocked(() -> {
            this.watermarkTracker().updateWatermark(this.lastExecution().executedPlan());
            this.commitLog().add(this.currentBatchId(), new CommitMetadata(this.watermarkTracker().currentWatermark()));
            this.committedOffsets_$eq(this.committedOffsets().m22610$plus$plus((GenTraversableOnce<Tuple2<BaseStreamingSource, Offset>>) this.availableOffsets()));
        });
        logDebug(() -> {
            return new StringBuilder(16).append("Completed batch ").append(this.currentBatchId()).toString();
        });
    }

    public <T> T withProgressLocked(Function0<T> function0) {
        awaitProgressLock().lock();
        try {
            return (T) function0.apply();
        } finally {
            awaitProgressLock().unlock();
        }
    }

    private Optional<org.apache.spark.sql.sources.v2.reader.streaming.Offset> toJava(Option<org.apache.spark.sql.sources.v2.reader.streaming.Offset> option) {
        return Optional.ofNullable(option.orNull(Predef$.MODULE$.$conforms()));
    }

    public static final /* synthetic */ void $anonfun$populateStartOffsets$2(MicroBatchExecution microBatchExecution, SparkSession sparkSession, OffsetSeqMetadata offsetSeqMetadata) {
        OffsetSeqMetadata$.MODULE$.setSessionConf(offsetSeqMetadata, sparkSession.conf());
        microBatchExecution.offsetSeqMetadata_$eq(OffsetSeqMetadata$.MODULE$.apply(offsetSeqMetadata.batchWatermarkMs(), offsetSeqMetadata.batchTimestampMs(), sparkSession.conf()));
        microBatchExecution.watermarkTracker_$eq(WatermarkTracker$.MODULE$.apply(sparkSession.conf()));
        microBatchExecution.watermarkTracker().setWatermark(offsetSeqMetadata.batchWatermarkMs());
    }

    public static final /* synthetic */ boolean $anonfun$populateStartOffsets$6(Offset offset, Offset offset2) {
        return offset2 != null ? offset2.equals(offset) : offset == null;
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$2(Offset offset, Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$isNewDataAvailable$1(MicroBatchExecution microBatchExecution, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        BaseStreamingSource baseStreamingSource = (BaseStreamingSource) tuple2._1();
        Offset offset = (Offset) tuple2._2();
        return BoxesRunTime.unboxToBoolean(microBatchExecution.committedOffsets().get(baseStreamingSource).map(offset2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$isNewDataAvailable$2(offset, offset2));
        }).getOrElse(() -> {
            return true;
        }));
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$7(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((Option) tuple2._2()).nonEmpty();
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$constructNextBatch$9(MicroBatchExecution microBatchExecution, IncrementalExecution incrementalExecution) {
        return incrementalExecution.shouldRunAnotherBatch(microBatchExecution.offsetSeqMetadata());
    }

    public static final /* synthetic */ void $anonfun$constructNextBatch$14(Tuple2 tuple2) {
        if (tuple2 != null) {
            BaseStreamingSource baseStreamingSource = (BaseStreamingSource) tuple2._1();
            Offset offset = (Offset) tuple2._2();
            if (baseStreamingSource instanceof Source) {
                ((Source) baseStreamingSource).commit(offset);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            BaseStreamingSource baseStreamingSource2 = (BaseStreamingSource) tuple2._1();
            Offset offset2 = (Offset) tuple2._2();
            if (baseStreamingSource2 instanceof MicroBatchReader) {
                MicroBatchReader microBatchReader = (MicroBatchReader) baseStreamingSource2;
                microBatchReader.commit(microBatchReader.deserializeOffset(offset2.json()));
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalArgumentException(new StringBuilder(47).append("Unknown source is found at constructNextBatch: ").append((BaseStreamingSource) tuple2._1()).toString());
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$4(Offset offset, Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    public static final /* synthetic */ boolean $anonfun$runBatch$8(Offset offset, Offset offset2) {
        return offset2 != null ? !offset2.equals(offset) : offset != null;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MicroBatchExecution(SparkSession sparkSession, String str, String str2, LogicalPlan logicalPlan, BaseStreamingSink baseStreamingSink, Trigger trigger, Clock clock, OutputMode outputMode, Map<String, String> map, boolean z) {
        super(sparkSession, str, str2, logicalPlan, baseStreamingSink, trigger, clock, outputMode, z);
        Serializable oneTimeExecutor;
        this.analyzedPlan = logicalPlan;
        this.extraOptions = map;
        this.sources = Seq$.MODULE$.empty();
        this.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$readerToDataSourceMap = Map$.MODULE$.empty();
        Trigger trigger2 = super.trigger();
        if (trigger2 instanceof ProcessingTime) {
            oneTimeExecutor = new ProcessingTimeExecutor((ProcessingTime) trigger2, super.triggerClock());
        } else {
            if (!OneTimeTrigger$.MODULE$.equals(trigger2)) {
                throw new IllegalStateException(new StringBuilder(25).append("Unknown type of trigger: ").append(super.trigger()).toString());
            }
            oneTimeExecutor = new OneTimeExecutor();
        }
        this.triggerExecutor = oneTimeExecutor;
        this.isCurrentBatchConstructed = false;
    }
}
