package org.opendaylight.controller.cluster.databroker.actors.dds;

import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.compat.java8.FutureConverters;

/* loaded from: input_file:org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver.class */
abstract class AbstractShardBackendResolver extends BackendInfoResolver<ShardBackendInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractShardBackendResolver.class);
    private static final Timeout CONNECT_TIMEOUT = Timeout.apply(5, TimeUnit.SECONDS);
    private final Function1<ActorRef, ?> connectFunction;
    private final ActorUtils actorUtils;
    private final AtomicLong nextSessionId = new AtomicLong();
    private final Set<Consumer<Long>> staleBackendInfoCallbacks = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:org/opendaylight/controller/cluster/databroker/actors/dds/AbstractShardBackendResolver$ShardState.class */
    static final class ShardState {
        private final CompletionStage<ShardBackendInfo> stage;
        private ShardBackendInfo result;

        ShardState(CompletionStage<ShardBackendInfo> completionStage) {
            this.stage = (CompletionStage) Objects.requireNonNull(completionStage);
            completionStage.whenComplete(this::onStageResolved);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public CompletionStage<ShardBackendInfo> getStage() {
            return this.stage;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public synchronized ShardBackendInfo getResult() {
            return this.result;
        }

        private synchronized void onStageResolved(ShardBackendInfo shardBackendInfo, Throwable th) {
            if (th == null) {
                this.result = (ShardBackendInfo) Objects.requireNonNull(shardBackendInfo);
            } else {
                AbstractShardBackendResolver.LOG.warn("Failed to resolve shard", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractShardBackendResolver(ClientIdentifier clientIdentifier, ActorUtils actorUtils) {
        this.actorUtils = (ActorUtils) Objects.requireNonNull(actorUtils);
        this.connectFunction = ExplicitAsk.toScala(actorRef -> {
            return new ConnectClientRequest(clientIdentifier, actorRef, ABIVersion.BORON, ABIVersion.current());
        });
    }

    public Registration notifyWhenBackendInfoIsStale(Consumer<Long> consumer) {
        this.staleBackendInfoCallbacks.add(consumer);
        return () -> {
            this.staleBackendInfoCallbacks.remove(consumer);
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyStaleBackendInfoCallbacks(Long l) {
        this.staleBackendInfoCallbacks.forEach(consumer -> {
            consumer.accept(l);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActorUtils actorUtils() {
        return this.actorUtils;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void flushCache(String str) {
        this.actorUtils.getPrimaryShardInfoCache().remove(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ShardState resolveBackendInfo(String str, long j) {
        LOG.debug("Resolving cookie {} to shard {}", Long.valueOf(j), str);
        CompletableFuture completableFuture = new CompletableFuture();
        FutureConverters.toJava(this.actorUtils.findPrimaryShardAsync(str)).whenComplete((primaryShardInfo, th) -> {
            if (th == null) {
                connectShard(str, j, primaryShardInfo, completableFuture);
                return;
            }
            LOG.debug("Shard {} failed to resolve", str, th);
            if (th instanceof NoShardLeaderException) {
                completableFuture.completeExceptionally(wrap("Shard has no current leader", th));
                return;
            }
            if (th instanceof NotInitializedException) {
                LOG.info("Shard {} has not initialized yet", str);
                completableFuture.completeExceptionally(th);
            } else if (!(th instanceof PrimaryNotFoundException)) {
                completableFuture.completeExceptionally(th);
            } else {
                LOG.info("Failed to find primary for shard {}", str);
                completableFuture.completeExceptionally(th);
            }
        });
        return new ShardState(completableFuture);
    }

    private static TimeoutException wrap(String str, Throwable th) {
        TimeoutException timeoutException = new TimeoutException(str);
        timeoutException.initCause((Throwable) Objects.requireNonNull(th));
        return timeoutException;
    }

    private void connectShard(String str, long j, PrimaryShardInfo primaryShardInfo, CompletableFuture<ShardBackendInfo> completableFuture) {
        LOG.debug("Shard {} resolved to {}, attempting to connect", str, primaryShardInfo);
        FutureConverters.toJava(ExplicitAsk.ask(primaryShardInfo.getPrimaryShardActor(), this.connectFunction, CONNECT_TIMEOUT)).whenComplete((obj, th) -> {
            onConnectResponse(str, j, completableFuture, obj, th);
        });
    }

    private void onConnectResponse(String str, long j, CompletableFuture<ShardBackendInfo> completableFuture, Object obj, Throwable th) {
        if (th != null) {
            LOG.debug("Connect attempt to {} failed, will retry", str, th);
            completableFuture.completeExceptionally(wrap("Connection attempt failed", th));
        } else if (obj instanceof RequestFailure) {
            Throwable unwrap = ((RequestFailure) obj).getCause().unwrap();
            LOG.debug("Connect attempt to {} failed to process", str, unwrap);
            completableFuture.completeExceptionally(unwrap instanceof NotLeaderException ? wrap("Leader moved during establishment", unwrap) : unwrap);
        } else {
            LOG.debug("Resolved backend information to {}", obj);
            Preconditions.checkArgument(obj instanceof ConnectClientSuccess, "Unhandled response %s", obj);
            ConnectClientSuccess connectClientSuccess = (ConnectClientSuccess) obj;
            completableFuture.complete(new ShardBackendInfo(connectClientSuccess.getBackend(), this.nextSessionId.getAndIncrement(), connectClientSuccess.getVersion(), str, UnsignedLong.fromLongBits(j), connectClientSuccess.getDataTree(), connectClientSuccess.getMaxMessages()));
        }
    }
}
