package org.apache.spark.sql.execution.streaming.state;

import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.apache.spark.sql.execution.streaming.state.StateStore;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: StateStore.scala */
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/state/StateStore$.class */
public final class StateStore$ implements Logging {
    public static StateStore$ MODULE$;
    private final String MAINTENANCE_INTERVAL_CONFIG;
    private final int MAINTENANCE_INTERVAL_DEFAULT_SECS;

    @GuardedBy("loadedProviders")
    private final HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders;

    @GuardedBy("loadedProviders")
    private StateStore.MaintenanceTask maintenanceTask;

    @GuardedBy("loadedProviders")
    private StateStoreCoordinatorRef _coordRef;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new StateStore$();
    }

    @Override // org.apache.spark.internal.Logging
    public String logName() {
        String logName;
        logName = logName();
        return logName;
    }

    @Override // org.apache.spark.internal.Logging
    public Logger log() {
        Logger log;
        log = log();
        return log;
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0) {
        logInfo(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0) {
        logDebug(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0) {
        logTrace(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0) {
        logWarning(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0) {
        logError(function0);
    }

    @Override // org.apache.spark.internal.Logging
    public void logInfo(Function0<String> function0, Throwable th) {
        logInfo(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logDebug(Function0<String> function0, Throwable th) {
        logDebug(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logTrace(Function0<String> function0, Throwable th) {
        logTrace(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logWarning(Function0<String> function0, Throwable th) {
        logWarning(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public void logError(Function0<String> function0, Throwable th) {
        logError(function0, th);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // org.apache.spark.internal.Logging
    public void initializeLogIfNecessary(boolean z) {
        initializeLogIfNecessary(z);
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        boolean initializeLogIfNecessary;
        initializeLogIfNecessary = initializeLogIfNecessary(z, z2);
        return initializeLogIfNecessary;
    }

    @Override // org.apache.spark.internal.Logging
    public boolean initializeLogIfNecessary$default$2() {
        boolean initializeLogIfNecessary$default$2;
        initializeLogIfNecessary$default$2 = initializeLogIfNecessary$default$2();
        return initializeLogIfNecessary$default$2;
    }

    @Override // org.apache.spark.internal.Logging
    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    @Override // org.apache.spark.internal.Logging
    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public String MAINTENANCE_INTERVAL_CONFIG() {
        return this.MAINTENANCE_INTERVAL_CONFIG;
    }

    public int MAINTENANCE_INTERVAL_DEFAULT_SECS() {
        return this.MAINTENANCE_INTERVAL_DEFAULT_SECS;
    }

    private HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders() {
        return this.loadedProviders;
    }

    private StateStore.MaintenanceTask maintenanceTask() {
        return this.maintenanceTask;
    }

    private void maintenanceTask_$eq(StateStore.MaintenanceTask maintenanceTask) {
        this.maintenanceTask = maintenanceTask;
    }

    private StateStoreCoordinatorRef _coordRef() {
        return this._coordRef;
    }

    private void _coordRef_$eq(StateStoreCoordinatorRef stateStoreCoordinatorRef) {
        this._coordRef = stateStoreCoordinatorRef;
    }

    public StateStore get(StateStoreProviderId stateStoreProviderId, StructType structType, StructType structType2, Option<Object> option, long j, StateStoreConf stateStoreConf, Configuration configuration) {
        StateStoreProvider stateStoreProvider;
        Predef$.MODULE$.require(j >= 0);
        HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            startMaintenanceIfNeeded();
            stateStoreProvider = (StateStoreProvider) loadedProviders().getOrElseUpdate(stateStoreProviderId, () -> {
                return StateStoreProvider$.MODULE$.createAndInit(stateStoreProviderId.storeId(), structType, structType2, option, stateStoreConf, configuration);
            });
            reportActiveStoreInstance(stateStoreProviderId);
        }
        return stateStoreProvider.getStore(j);
    }

    public void unload(StateStoreProviderId stateStoreProviderId) {
        HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            loadedProviders().remove(stateStoreProviderId).foreach(stateStoreProvider -> {
                stateStoreProvider.close();
                return BoxedUnit.UNIT;
            });
        }
    }

    public boolean isLoaded(StateStoreProviderId stateStoreProviderId) {
        boolean contains;
        HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            contains = loadedProviders().contains(stateStoreProviderId);
        }
        return contains;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v1, types: [scala.collection.mutable.HashMap] */
    /* JADX WARN: Type inference failed for: r0v11, types: [boolean] */
    /* JADX WARN: Type inference failed for: r0v2, types: [java.lang.Throwable] */
    public boolean isMaintenanceRunning() {
        boolean z;
        boolean z2;
        ?? loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            if (maintenanceTask() != null) {
                loadedProviders = maintenanceTask().isRunning();
                if (loadedProviders != 0) {
                    z = true;
                    z2 = z;
                }
            }
            z = false;
            z2 = z;
        }
        return z2;
    }

    public void stop() {
        HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            loadedProviders().keySet().foreach(stateStoreProviderId -> {
                $anonfun$stop$1(stateStoreProviderId);
                return BoxedUnit.UNIT;
            });
            loadedProviders().clear();
            _coordRef_$eq(null);
            if (maintenanceTask() != null) {
                maintenanceTask().stop();
                maintenanceTask_$eq(null);
            }
            logInfo(() -> {
                return "StateStore stopped";
            });
        }
    }

    private void startMaintenanceIfNeeded() {
        HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            SparkEnv sparkEnv = SparkEnv$.MODULE$.get();
            if (sparkEnv != null && !isMaintenanceRunning()) {
                maintenanceTask_$eq(new StateStore.MaintenanceTask(sparkEnv.conf().getTimeAsMs(MAINTENANCE_INTERVAL_CONFIG(), new StringBuilder(1).append(MAINTENANCE_INTERVAL_DEFAULT_SECS()).append("s").toString()), () -> {
                    MODULE$.doMaintenance();
                }, () -> {
                    HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders2 = MODULE$.loadedProviders();
                    synchronized (loadedProviders2) {
                        MODULE$.loadedProviders().clear();
                    }
                }));
                loadedProviders = this;
                loadedProviders.logInfo(() -> {
                    return "State Store maintenance task started";
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doMaintenance() {
        Seq seq;
        logDebug(() -> {
            return "Doing maintenance";
        });
        if (SparkEnv$.MODULE$.get() == null) {
            throw new IllegalStateException("SparkEnv not active, cannot do maintenance on StateStores");
        }
        HashMap<StateStoreProviderId, StateStoreProvider> loadedProviders = loadedProviders();
        synchronized (loadedProviders) {
            seq = loadedProviders().toSeq();
        }
        seq.foreach(tuple2 -> {
            $anonfun$doMaintenance$2(tuple2);
            return BoxedUnit.UNIT;
        });
    }

    private void reportActiveStoreInstance(StateStoreProviderId stateStoreProviderId) {
        if (SparkEnv$.MODULE$.get() != null) {
            String host = SparkEnv$.MODULE$.get().blockManager().blockManagerId().host();
            String executorId = SparkEnv$.MODULE$.get().blockManager().blockManagerId().executorId();
            coordinatorRef().foreach(stateStoreCoordinatorRef -> {
                stateStoreCoordinatorRef.reportActiveInstance(stateStoreProviderId, host, executorId);
                return BoxedUnit.UNIT;
            });
            logInfo(() -> {
                return new StringBuilder(44).append("Reported that the loaded instance ").append(stateStoreProviderId).append(" is active").toString();
            });
        }
    }

    private boolean verifyIfStoreInstanceActive(StateStoreProviderId stateStoreProviderId) {
        if (SparkEnv$.MODULE$.get() == null) {
            return false;
        }
        String executorId = SparkEnv$.MODULE$.get().blockManager().blockManagerId().executorId();
        boolean unboxToBoolean = BoxesRunTime.unboxToBoolean(coordinatorRef().map(stateStoreCoordinatorRef -> {
            return BoxesRunTime.boxToBoolean($anonfun$verifyIfStoreInstanceActive$1(stateStoreProviderId, executorId, stateStoreCoordinatorRef));
        }).getOrElse(() -> {
            return false;
        }));
        logDebug(() -> {
            return new StringBuilder(49).append("Verified whether the loaded instance ").append(stateStoreProviderId).append(" is active: ").append(unboxToBoolean).toString();
        });
        return unboxToBoolean;
    }

    /* JADX WARN: Code restructure failed: missing block: B:29:0x0051, code lost:
    
        if (r0.equals(r1) != false) goto L20;
     */
    /* JADX WARN: Code restructure failed: missing block: B:32:0x0030, code lost:
    
        if (r0.equals(r1) == false) goto L13;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private scala.Option<org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef> coordinatorRef() {
        /*
            r4 = this;
            r0 = r4
            scala.collection.mutable.HashMap r0 = r0.loadedProviders()
            r1 = r0
            r5 = r1
            monitor-enter(r0)
            org.apache.spark.SparkEnv$ r0 = org.apache.spark.SparkEnv$.MODULE$     // Catch: java.lang.Throwable -> La4
            org.apache.spark.SparkEnv r0 = r0.get()     // Catch: java.lang.Throwable -> La4
            r7 = r0
            r0 = r7
            if (r0 == 0) goto L95
            r0 = r7
            java.lang.String r0 = r0.executorId()     // Catch: java.lang.Throwable -> La4
            org.apache.spark.SparkContext$ r1 = org.apache.spark.SparkContext$.MODULE$     // Catch: java.lang.Throwable -> La4
            java.lang.String r1 = r1.DRIVER_IDENTIFIER()     // Catch: java.lang.Throwable -> La4
            r9 = r1
            r1 = r0
            if (r1 != 0) goto L2b
        L23:
            r0 = r9
            if (r0 == 0) goto L54
            goto L33
        L2b:
            r1 = r9
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> La4
            if (r0 != 0) goto L54
        L33:
            r0 = r7
            java.lang.String r0 = r0.executorId()     // Catch: java.lang.Throwable -> La4
            org.apache.spark.SparkContext$ r1 = org.apache.spark.SparkContext$.MODULE$     // Catch: java.lang.Throwable -> La4
            java.lang.String r1 = r1.LEGACY_DRIVER_IDENTIFIER()     // Catch: java.lang.Throwable -> La4
            r10 = r1
            r1 = r0
            if (r1 != 0) goto L4c
        L44:
            r0 = r10
            if (r0 == 0) goto L54
            goto L58
        L4c:
            r1 = r10
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> La4
            if (r0 == 0) goto L58
        L54:
            r0 = 1
            goto L59
        L58:
            r0 = 0
        L59:
            r8 = r0
            r0 = r8
            if (r0 != 0) goto L67
            r0 = r4
            org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef r0 = r0._coordRef()     // Catch: java.lang.Throwable -> La4
            if (r0 != 0) goto L7e
        L67:
            r0 = r4
            scala.Option<org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$coordinatorRef$1();
            }     // Catch: java.lang.Throwable -> La4
            r0.logDebug(r1)     // Catch: java.lang.Throwable -> La4
            r0 = r4
            org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef$ r1 = org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef$.MODULE$     // Catch: java.lang.Throwable -> La4
            r2 = r7
            org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef r1 = r1.forExecutor(r2)     // Catch: java.lang.Throwable -> La4
            r0._coordRef_$eq(r1)     // Catch: java.lang.Throwable -> La4
            goto L7e
        L7e:
            r0 = r4
            scala.Option<org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef> r1 = () -> { // scala.Function0.apply():java.lang.Object
                return $anonfun$coordinatorRef$2();
            }     // Catch: java.lang.Throwable -> La4
            r0.logInfo(r1)     // Catch: java.lang.Throwable -> La4
            scala.Some r0 = new scala.Some     // Catch: java.lang.Throwable -> La4
            r1 = r0
            r2 = r4
            org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef r2 = r2._coordRef()     // Catch: java.lang.Throwable -> La4
            r1.<init>(r2)     // Catch: java.lang.Throwable -> La4
            goto L9d
        L95:
            r0 = r4
            r1 = 0
            r0._coordRef_$eq(r1)     // Catch: java.lang.Throwable -> La4
            scala.None$ r0 = scala.None$.MODULE$     // Catch: java.lang.Throwable -> La4
        L9d:
            r6 = r0
            r0 = r5
            monitor-exit(r0)
            r0 = r6
            goto La7
        La4:
            r1 = move-exception
            monitor-exit(r1)
            throw r0
        La7:
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.spark.sql.execution.streaming.state.StateStore$.coordinatorRef():scala.Option");
    }

    public static final /* synthetic */ void $anonfun$stop$1(StateStoreProviderId stateStoreProviderId) {
        MODULE$.unload(stateStoreProviderId);
    }

    public static final /* synthetic */ void $anonfun$doMaintenance$2(Tuple2 tuple2) {
        BoxedUnit boxedUnit;
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        StateStoreProviderId stateStoreProviderId = (StateStoreProviderId) tuple2._1();
        StateStoreProvider stateStoreProvider = (StateStoreProvider) tuple2._2();
        try {
            if (MODULE$.verifyIfStoreInstanceActive(stateStoreProviderId)) {
                stateStoreProvider.doMaintenance();
                boxedUnit = BoxedUnit.UNIT;
            } else {
                MODULE$.unload(stateStoreProviderId);
                MODULE$.logInfo(() -> {
                    return new StringBuilder(9).append("Unloaded ").append(stateStoreProvider).toString();
                });
                boxedUnit = BoxedUnit.UNIT;
            }
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            Throwable th2 = (Throwable) unapply.get();
            MODULE$.logWarning(() -> {
                return new StringBuilder(43).append("Error managing ").append(stateStoreProvider).append(", stopping management thread").toString();
            });
            throw th2;
        }
    }

    public static final /* synthetic */ boolean $anonfun$verifyIfStoreInstanceActive$1(StateStoreProviderId stateStoreProviderId, String str, StateStoreCoordinatorRef stateStoreCoordinatorRef) {
        return stateStoreCoordinatorRef.verifyIfInstanceActive(stateStoreProviderId, str);
    }

    private StateStore$() {
        MODULE$ = this;
        org$apache$spark$internal$Logging$$log__$eq(null);
        this.MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval";
        this.MAINTENANCE_INTERVAL_DEFAULT_SECS = 60;
        this.loadedProviders = new HashMap<>();
        this.maintenanceTask = null;
        this._coordRef = null;
    }
}
