package org.apache.spark.sql.execution.streaming.continuous;

import org.apache.spark.internal.Logging;
import org.apache.spark.rpc.RpcAddress;
import org.apache.spark.rpc.RpcCallContext;
import org.apache.spark.rpc.RpcEndpoint;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.rpc.RpcEnv;
import org.apache.spark.rpc.ThreadSafeRpcEndpoint;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.slf4j.Logger;
import scala.Function0;
import scala.PartialFunction;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashSet;
import scala.collection.mutable.HashSet$;
import scala.collection.mutable.Iterable$;
import scala.collection.mutable.Map;
import scala.collection.mutable.Map$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: EpochCoordinator.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005%g!B\u0001\u0003\u0001\t\u0001\"\u0001E#q_\u000eD7i\\8sI&t\u0017\r^8s\u0015\t\u0019A!\u0001\u0006d_:$\u0018N\\;pkNT!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003%)\u00070Z2vi&|gN\u0003\u0002\n\u0015\u0005\u00191/\u001d7\u000b\u0005-a\u0011!B:qCJ\\'BA\u0007\u000f\u0003\u0019\t\u0007/Y2iK*\tq\"A\u0002pe\u001e\u001cB\u0001A\t\u0018;A\u0011!#F\u0007\u0002')\tA#A\u0003tG\u0006d\u0017-\u0003\u0002\u0017'\t1\u0011I\\=SK\u001a\u0004\"\u0001G\u000e\u000e\u0003eQ!A\u0007\u0006\u0002\u0007I\u00048-\u0003\u0002\u001d3\t)B\u000b\u001b:fC\u0012\u001c\u0016MZ3Sa\u000e,e\u000e\u001a9pS:$\bC\u0001\u0010\"\u001b\u0005y\"B\u0001\u0011\u000b\u0003!Ig\u000e^3s]\u0006d\u0017B\u0001\u0012 \u0005\u001daunZ4j]\u001eD\u0001\u0002\n\u0001\u0003\u0002\u0003\u0006IAJ\u0001\u0007oJLG/\u001a:\u0004\u0001A\u0011qEL\u0007\u0002Q)\u0011Q!\u000b\u0006\u0003I)R!a\u000b\u0017\u0002\u0005Y\u0014$BA\u0017\t\u0003\u001d\u0019x.\u001e:dKNL!a\f\u0015\u0003\u0019M#(/Z1n/JLG/\u001a:\t\u0011E\u0002!\u0011!Q\u0001\nI\naA]3bI\u0016\u0014\bCA\u001a7\u001b\u0005!$BA\u00036\u0015\t\t$&\u0003\u00028i\t\u00012i\u001c8uS:,x.^:SK\u0006$WM\u001d\u0005\ts\u0001\u0011\t\u0011)A\u0005u\u0005)\u0011/^3ssB\u00111\bP\u0007\u0002\u0005%\u0011QH\u0001\u0002\u0014\u0007>tG/\u001b8v_V\u001cX\t_3dkRLwN\u001c\u0005\t\u007f\u0001\u0011\t\u0011)A\u0005\u0001\u0006Q1\u000f^1si\u0016\u0003xn\u00195\u0011\u0005I\t\u0015B\u0001\"\u0014\u0005\u0011auN\\4\t\u0011\u0011\u0003!\u0011!Q\u0001\n\u0015\u000bqa]3tg&|g\u000e\u0005\u0002G\u000f6\t\u0001\"\u0003\u0002I\u0011\ta1\u000b]1sWN+7o]5p]\"A!\n\u0001BC\u0002\u0013\u00053*\u0001\u0004sa\u000e,eN^\u000b\u0002\u0019B\u0011\u0001$T\u0005\u0003\u001df\u0011aA\u00159d\u000b:4\b\u0002\u0003)\u0001\u0005\u0003\u0005\u000b\u0011\u0002'\u0002\u000fI\u00048-\u00128wA!)!\u000b\u0001C\u0001'\u00061A(\u001b8jiz\"r\u0001V+W/bK&\f\u0005\u0002<\u0001!)A%\u0015a\u0001M!)\u0011'\u0015a\u0001e!)\u0011(\u0015a\u0001u!)q(\u0015a\u0001\u0001\")A)\u0015a\u0001\u000b\")!*\u0015a\u0001\u0019\"9A\f\u0001a\u0001\n\u0013i\u0016AE9vKJLxK]5uKN\u001cFo\u001c9qK\u0012,\u0012A\u0018\t\u0003%}K!\u0001Y\n\u0003\u000f\t{w\u000e\\3b]\"9!\r\u0001a\u0001\n\u0013\u0019\u0017AF9vKJLxK]5uKN\u001cFo\u001c9qK\u0012|F%Z9\u0015\u0005\u0011<\u0007C\u0001\nf\u0013\t17C\u0001\u0003V]&$\bb\u00025b\u0003\u0003\u0005\rAX\u0001\u0004q\u0012\n\u0004B\u00026\u0001A\u0003&a,A\nrk\u0016\u0014\u0018p\u0016:ji\u0016\u001c8\u000b^8qa\u0016$\u0007\u0005C\u0005m\u0001\u0001\u0007\t\u0019!C\u0005[\u0006\u0019b.^7SK\u0006$WM\u001d)beRLG/[8ogV\ta\u000e\u0005\u0002\u0013_&\u0011\u0001o\u0005\u0002\u0004\u0013:$\b\"\u0003:\u0001\u0001\u0004\u0005\r\u0011\"\u0003t\u0003]qW/\u001c*fC\u0012,'\u000fU1si&$\u0018n\u001c8t?\u0012*\u0017\u000f\u0006\u0002ei\"9\u0001.]A\u0001\u0002\u0004q\u0007B\u0002<\u0001A\u0003&a.\u0001\u000bok6\u0014V-\u00193feB\u000b'\u000f^5uS>t7\u000f\t\u0005\nq\u0002\u0001\r\u00111A\u0005\n5\f1C\\;n/JLG/\u001a:QCJ$\u0018\u000e^5p]ND\u0011B\u001f\u0001A\u0002\u0003\u0007I\u0011B>\u0002/9,Xn\u0016:ji\u0016\u0014\b+\u0019:uSRLwN\\:`I\u0015\fHC\u00013}\u0011\u001dA\u00170!AA\u00029DaA \u0001!B\u0013q\u0017\u0001\u00068v[^\u0013\u0018\u000e^3s!\u0006\u0014H/\u001b;j_:\u001c\b\u0005C\u0005\u0002\u0002\u0001\u0001\r\u0011\"\u0003\u0002\u0004\u0005\u00112-\u001e:sK:$HI]5wKJ,\u0005o\\2i+\u0005\u0001\u0005\"CA\u0004\u0001\u0001\u0007I\u0011BA\u0005\u0003Y\u0019WO\u001d:f]R$%/\u001b<fe\u0016\u0003xn\u00195`I\u0015\fHc\u00013\u0002\f!A\u0001.!\u0002\u0002\u0002\u0003\u0007\u0001\tC\u0004\u0002\u0010\u0001\u0001\u000b\u0015\u0002!\u0002'\r,(O]3oi\u0012\u0013\u0018N^3s\u000bB|7\r\u001b\u0011\t\u0013\u0005M\u0001A1A\u0005\n\u0005U\u0011\u0001\u00059beRLG/[8o\u0007>lW.\u001b;t+\t\t9\u0002\u0005\u0005\u0002\u001a\u0005\r\u0012qEA\u0017\u001b\t\tYB\u0003\u0003\u0002\u001e\u0005}\u0011aB7vi\u0006\u0014G.\u001a\u0006\u0004\u0003C\u0019\u0012AC2pY2,7\r^5p]&!\u0011QEA\u000e\u0005\ri\u0015\r\u001d\t\u0006%\u0005%\u0002I\\\u0005\u0004\u0003W\u0019\"A\u0002+va2,'\u0007\u0005\u0003\u00020\u0005ER\"A\u0015\n\u0007\u0005M\u0012FA\nXe&$XM]\"p[6LG/T3tg\u0006<W\r\u0003\u0005\u00028\u0001\u0001\u000b\u0011BA\f\u0003E\u0001\u0018M\u001d;ji&|gnQ8n[&$8\u000f\t\u0005\n\u0003w\u0001!\u0019!C\u0005\u0003{\t\u0001\u0003]1si&$\u0018n\u001c8PM\u001a\u001cX\r^:\u0016\u0005\u0005}\u0002\u0003CA\r\u0003G\t9#!\u0011\u0011\u0007M\n\u0019%C\u0002\u0002FQ\u0012q\u0002U1si&$\u0018n\u001c8PM\u001a\u001cX\r\u001e\u0005\t\u0003\u0013\u0002\u0001\u0015!\u0003\u0002@\u0005\t\u0002/\u0019:uSRLwN\\(gMN,Go\u001d\u0011\t\u0013\u00055\u0003\u00011A\u0005\n\u0005\r\u0011A\u00057bgR\u001cu.\\7jiR,G-\u00129pG\"D\u0011\"!\u0015\u0001\u0001\u0004%I!a\u0015\u0002-1\f7\u000f^\"p[6LG\u000f^3e\u000bB|7\r[0%KF$2\u0001ZA+\u0011!A\u0017qJA\u0001\u0002\u0004\u0001\u0005bBA-\u0001\u0001\u0006K\u0001Q\u0001\u0014Y\u0006\u001cHoQ8n[&$H/\u001a3Fa>\u001c\u0007\u000e\t\u0005\n\u0003;\u0002!\u0019!C\u0005\u0003?\n!$\u001a9pG\"\u001cx+Y5uS:<Gk\u001c\"f\u0007>lW.\u001b;uK\u0012,\"!!\u0019\u0011\u000b\u0005e\u00111\r!\n\t\u0005\u0015\u00141\u0004\u0002\b\u0011\u0006\u001c\bnU3u\u0011!\tI\u0007\u0001Q\u0001\n\u0005\u0005\u0014aG3q_\u000eD7oV1ji&tw\rV8CK\u000e{W.\\5ui\u0016$\u0007\u0005C\u0004\u0002n\u0001!I!a\u001c\u0002+I,7o\u001c7wK\u000e{W.\\5ug\u0006#X\t]8dQR!\u0011\u0011OA<!\r\u0011\u00121O\u0005\u0004\u0003k\u001a\"AB!osZ\u000bG\u000eC\u0004\u0002z\u0005-\u0004\u0019\u0001!\u0002\u000b\u0015\u0004xn\u00195\t\u000f\u0005u\u0004\u0001\"\u0003\u0002��\u0005ab-\u001b8e!\u0006\u0014H/\u001b;j_:\u001cu.\\7jiN4uN]#q_\u000eDG\u0003BAA\u00033\u0003b!a!\u0002\u0014\u00065b\u0002BAC\u0003\u001fsA!a\"\u0002\u000e6\u0011\u0011\u0011\u0012\u0006\u0004\u0003\u0017+\u0013A\u0002\u001fs_>$h(C\u0001\u0015\u0013\r\t\tjE\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t)*a&\u0003\u0011%#XM]1cY\u0016T1!!%\u0014\u0011\u001d\tI(a\u001fA\u0002\u0001Cq!!(\u0001\t\u0013\ty*A\u0006d_6l\u0017\u000e^#q_\u000eDG#\u00023\u0002\"\u0006\r\u0006bBA=\u00037\u0003\r\u0001\u0011\u0005\t\u0003K\u000bY\n1\u0001\u0002\u0002\u0006AQ.Z:tC\u001e,7\u000fC\u0004\u0002*\u0002!\t%a+\u0002\u000fI,7-Z5wKV\u0011\u0011Q\u0016\t\u0007%\u0005=\u00161\u00173\n\u0007\u0005E6CA\bQCJ$\u0018.\u00197Gk:\u001cG/[8o!\r\u0011\u0012QW\u0005\u0004\u0003o\u001b\"aA!os\"9\u00111\u0018\u0001\u0005B\u0005u\u0016a\u0004:fG\u0016Lg/Z!oIJ+\u0007\u000f\\=\u0015\t\u00055\u0016q\u0018\u0005\t\u0003\u0003\fI\f1\u0001\u0002D\u000691m\u001c8uKb$\bc\u0001\r\u0002F&\u0019\u0011qY\r\u0003\u001dI\u00038mQ1mY\u000e{g\u000e^3yi\u0002")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.class */
public class EpochCoordinator implements ThreadSafeRpcEndpoint, Logging {
    private final StreamWriter writer;
    public final ContinuousReader org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$reader;
    public final ContinuousExecution org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$query;
    private final RpcEnv rpcEnv;
    private boolean org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$queryWritesStopped;
    private int org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numReaderPartitions;
    private int org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numWriterPartitions;
    private long org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$currentDriverEpoch;
    private final Map<Tuple2<Object, Object>, WriterCommitMessage> org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionCommits;
    private final Map<Tuple2<Object, Object>, PartitionOffset> org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionOffsets;
    private long org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch;
    private final HashSet<Object> epochsWaitingToBeCommitted;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    @Override // org.apache.spark.internal.Logging
    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    @Override // org.apache.spark.internal.Logging
    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    @Override // org.apache.spark.internal.Logging
    public String logName() {
        return Logging.Cclass.logName(this);
    }

    @Override // org.apache.spark.internal.Logging
    public Logger log() {
        return Logging.Cclass.log(this);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0) {
        Logging.Cclass.logInfo(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0) {
        Logging.Cclass.logDebug(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0) {
        Logging.Cclass.logTrace(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0) {
        Logging.Cclass.logWarning(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0) {
        Logging.Cclass.logError(this, function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.Cclass.logInfo(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.Cclass.logDebug(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.Cclass.logTrace(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.Cclass.logWarning(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0, Throwable th) {
        Logging.Cclass.logError(this, function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean isTraceEnabled() {
        return Logging.Cclass.isTraceEnabled(this);
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeLogIfNecessary(boolean z) {
        Logging.Cclass.initializeLogIfNecessary(this, z);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.Cclass.initializeLogIfNecessary(this, z, z2);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary$default$2() {
        return Logging.Cclass.initializeLogIfNecessary$default$2(this);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public final RpcEndpointRef self() {
        return RpcEndpoint.Cclass.self(this);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public void onError(Throwable th) {
        RpcEndpoint.Cclass.onError(this, th);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public void onConnected(RpcAddress rpcAddress) {
        RpcEndpoint.Cclass.onConnected(this, rpcAddress);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public void onDisconnected(RpcAddress rpcAddress) {
        RpcEndpoint.Cclass.onDisconnected(this, rpcAddress);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public void onNetworkError(Throwable th, RpcAddress rpcAddress) {
        RpcEndpoint.Cclass.onNetworkError(this, th, rpcAddress);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public void onStart() {
        RpcEndpoint.Cclass.onStart(this);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public void onStop() {
        RpcEndpoint.Cclass.onStop(this);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public final void stop() {
        RpcEndpoint.Cclass.stop(this);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public RpcEnv rpcEnv() {
        return this.rpcEnv;
    }

    public boolean org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$queryWritesStopped() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$queryWritesStopped;
    }

    public void org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$queryWritesStopped_$eq(boolean z) {
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$queryWritesStopped = z;
    }

    public int org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numReaderPartitions() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numReaderPartitions;
    }

    public void org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numReaderPartitions_$eq(int i) {
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numReaderPartitions = i;
    }

    private int org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numWriterPartitions() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numWriterPartitions;
    }

    public void org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numWriterPartitions_$eq(int i) {
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numWriterPartitions = i;
    }

    public long org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$currentDriverEpoch() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$currentDriverEpoch;
    }

    public void org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$currentDriverEpoch_$eq(long j) {
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$currentDriverEpoch = j;
    }

    public Map<Tuple2<Object, Object>, WriterCommitMessage> org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionCommits() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionCommits;
    }

    public Map<Tuple2<Object, Object>, PartitionOffset> org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionOffsets() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionOffsets;
    }

    public long org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch() {
        return this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch;
    }

    private void org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch_$eq(long j) {
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch = j;
    }

    private HashSet<Object> epochsWaitingToBeCommitted() {
        return this.epochsWaitingToBeCommitted;
    }

    public Object org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$resolveCommitsAtEpoch(long j) {
        Iterable<WriterCommitMessage> findPartitionCommitsForEpoch = findPartitionCommitsForEpoch(j);
        scala.collection.mutable.Iterable iterable = (scala.collection.mutable.Iterable) org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionOffsets().collect(new EpochCoordinator$$anonfun$1(this, j), Iterable$.MODULE$.canBuildFrom());
        if (findPartitionCommitsForEpoch.size() != org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numWriterPartitions() || iterable.size() != org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$numReaderPartitions()) {
            return BoxedUnit.UNIT;
        }
        if (org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch() != j - 1) {
            logDebug(new EpochCoordinator$$anonfun$org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$resolveCommitsAtEpoch$1(this, j));
            return BoxesRunTime.boxToBoolean(epochsWaitingToBeCommitted().add(BoxesRunTime.boxToLong(j)));
        }
        commitEpoch(j, findPartitionCommitsForEpoch);
        org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch_$eq(j);
        long org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch = org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch();
        while (true) {
            long j2 = org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch + 1;
            if (!epochsWaitingToBeCommitted().contains(BoxesRunTime.boxToLong(j2))) {
                ((IterableLike) org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionCommits().keys().filter(new EpochCoordinator$$anonfun$org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$resolveCommitsAtEpoch$2(this))).foreach(new EpochCoordinator$$anonfun$org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$resolveCommitsAtEpoch$3(this));
                ((IterableLike) org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionOffsets().keys().filter(new EpochCoordinator$$anonfun$org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$resolveCommitsAtEpoch$4(this))).foreach(new EpochCoordinator$$anonfun$org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$resolveCommitsAtEpoch$5(this));
                return BoxedUnit.UNIT;
            }
            commitEpoch(j2, findPartitionCommitsForEpoch(j2));
            epochsWaitingToBeCommitted().remove(BoxesRunTime.boxToLong(j2));
            org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch_$eq(j2);
            org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch = j2;
        }
    }

    private Iterable<WriterCommitMessage> findPartitionCommitsForEpoch(long j) {
        return (Iterable) org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionCommits().collect(new EpochCoordinator$$anonfun$findPartitionCommitsForEpoch$1(this, j), Iterable$.MODULE$.canBuildFrom());
    }

    private void commitEpoch(long j, Iterable<WriterCommitMessage> iterable) {
        logDebug(new EpochCoordinator$$anonfun$commitEpoch$1(this, j));
        this.writer.commit(j, (WriterCommitMessage[]) iterable.toArray(ClassTag$.MODULE$.apply(WriterCommitMessage.class)));
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$query.commit(j);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public PartialFunction<Object, BoxedUnit> receive() {
        return new EpochCoordinator$$anonfun$receive$1(this);
    }

    @Override // org.apache.spark.rpc.RpcEndpoint
    public PartialFunction<Object, BoxedUnit> receiveAndReply(RpcCallContext rpcCallContext) {
        return new EpochCoordinator$$anonfun$receiveAndReply$1(this, rpcCallContext);
    }

    public EpochCoordinator(StreamWriter streamWriter, ContinuousReader continuousReader, ContinuousExecution continuousExecution, long j, SparkSession sparkSession, RpcEnv rpcEnv) {
        this.writer = streamWriter;
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$reader = continuousReader;
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$query = continuousExecution;
        this.rpcEnv = rpcEnv;
        RpcEndpoint.Cclass.$init$(this);
        org$apache$spark$internal$Logging$$log__$eq(null);
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$queryWritesStopped = false;
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$currentDriverEpoch = j;
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionCommits = Map$.MODULE$.apply(Nil$.MODULE$);
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$partitionOffsets = Map$.MODULE$.apply(Nil$.MODULE$);
        this.org$apache$spark$sql$execution$streaming$continuous$EpochCoordinator$$lastCommittedEpoch = j - 1;
        this.epochsWaitingToBeCommitted = HashSet$.MODULE$.empty();
    }
}
