package io.vlingo.xoom.actors;

import io.vlingo.xoom.actors.ActorFactory;
import io.vlingo.xoom.actors.Definition;
import io.vlingo.xoom.actors.Stage;
import io.vlingo.xoom.cluster.model.Properties;
import io.vlingo.xoom.common.identity.IdentityGeneratorType;
import io.vlingo.xoom.lattice.grid.GridNodeBootstrap;
import io.vlingo.xoom.lattice.grid.application.GridActorControl;
import io.vlingo.xoom.lattice.grid.application.QuorumObserver;
import io.vlingo.xoom.lattice.grid.hashring.HashRing;
import io.vlingo.xoom.lattice.grid.hashring.MurmurSortedMapHashRing;
import io.vlingo.xoom.wire.node.Id;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/vlingo/xoom/actors/Grid.class */
public class Grid extends Stage implements GridRuntime {
    private static final int GridStageBuckets = 32;
    private static final int GridStageInitialCapacity = 16384;
    private static final Logger logger = LoggerFactory.getLogger(Grid.class);
    private static final String INSTANCE_NAME = UUID.randomUUID().toString();
    private final GridNodeBootstrap gridNodeBootstrap;
    private final HashRing<Id> hashRing;
    private Id nodeId;
    private GridActorControl.Outbound outbound;
    private volatile boolean hasQuorum;
    private final long clusterHealthCheckInterval;

    public static Grid instance(World world) {
        return (Grid) world.resolveDynamic(INSTANCE_NAME, Grid.class);
    }

    public static Grid start(String str, String str2) throws Exception {
        return start(str, Configuration.define(), Properties.open(), str2);
    }

    public static Grid start(World world, String str) throws Exception {
        return start(world, (AddressFactory) new GridAddressFactory(IdentityGeneratorType.RANDOM), Properties.open(), str);
    }

    public static Grid start(String str, Configuration configuration, String str2) throws Exception {
        return start(str, configuration, Properties.open(), str2);
    }

    public static Grid start(String str, Configuration configuration, Properties properties, String str2) throws Exception {
        return start(str, new GridAddressFactory(IdentityGeneratorType.RANDOM), configuration, properties, str2);
    }

    public static Grid start(World world, Properties properties, String str) throws Exception {
        return start(world, (AddressFactory) new GridAddressFactory(IdentityGeneratorType.RANDOM), properties, str);
    }

    public static Grid start(String str, AddressFactory addressFactory, Configuration configuration, Properties properties, String str2) throws Exception {
        return new Grid(World.start(str, configuration), addressFactory, properties, str2);
    }

    public static Grid start(World world, AddressFactory addressFactory, Properties properties, String str) throws Exception {
        return new Grid(world, addressFactory, properties, str);
    }

    public Grid(World world, AddressFactory addressFactory, Properties properties, String str) throws Exception {
        super(world, addressFactory, str, GridStageBuckets, GridStageInitialCapacity);
        this.hashRing = new MurmurSortedMapHashRing(100);
        extenderStartDirectoryScanner();
        this.gridNodeBootstrap = GridNodeBootstrap.boot(this, str, properties, false);
        this.hasQuorum = false;
        this.clusterHealthCheckInterval = properties.clusterHealthCheckInterval();
        world.registerDynamic(INSTANCE_NAME, this);
    }

    protected ActorFactory.MailboxWrapper mailboxWrapper() {
        return (address, mailbox) -> {
            return new GridMailbox(mailbox, this.nodeId, address, this.hashRing, this.outbound);
        };
    }

    public void terminate() {
        world().terminate();
    }

    @Override // io.vlingo.xoom.lattice.grid.application.QuorumObserver
    public void quorumAchieved() {
        this.hasQuorum = true;
    }

    @Override // io.vlingo.xoom.lattice.grid.application.QuorumObserver
    public void quorumLost() {
        this.hasQuorum = false;
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public Actor actorAt(Address address) {
        return this.directory.actorOf(address);
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public void relocateActors() {
        HashRing<Id> copy = this.hashRing.copy();
        this.hashRing.excludeNode(this.nodeId);
        this.directory.addresses().stream().filter(address -> {
            return address.isDistributable() && isAssignedTo(copy, address, this.nodeId);
        }).forEach(address2 -> {
            Actor actorOf = this.directory.actorOf(address2);
            Id nodeOf = this.hashRing.nodeOf(address2.idString());
            if (nodeOf != null) {
                relocateActorTo(actorOf, address2, nodeOf);
            }
        });
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public Stage asStage() {
        return this;
    }

    <T> T actorThunkFor(Class<T> cls, Definition definition, Address address) {
        Mailbox allocateMailbox = allocateMailbox(definition, address, null);
        allocateMailbox.suspendExceptFor("GridActor.Resume", new Class[]{Relocatable.class});
        return (T) actorProtocolFor(cls, definition, definition.parentOr(this.world.defaultParent()), address, allocateMailbox, definition.supervisor(), definition.loggerOr(this.world.defaultLogger())).protocolActor();
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public GridNodeBootstrap gridNodeBootstrap() {
        return this.gridNodeBootstrap;
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public HashRing<Id> hashRing() {
        return this.hashRing;
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public void nodeJoined(Id id) {
        if (this.nodeId.equals(id)) {
            return;
        }
        HashRing<Id> copy = this.hashRing.copy();
        this.hashRing.includeNode(id);
        this.directory.addresses().stream().filter(address -> {
            return address.isDistributable() && shouldRelocateTo(copy, address, id);
        }).forEach(address2 -> {
            relocateActorTo(this.directory.actorOf(address2), address2, id);
        });
    }

    private void relocateActorTo(Actor actor, Address address, Id id) {
        if (GridActorOperations.isSuspendedForRelocation(actor)) {
            return;
        }
        logger.debug("Relocating actor [{}] to [{}]", address, id);
        GridActorOperations.suspendForRelocation(actor);
        this.outbound.relocate(id, this.nodeId, Definition.SerializationProxy.from(actor.definition()), address, GridActorOperations.supplyRelocationSnapshot(actor), GridActorOperations.pending(actor));
        this.outbound.disburse(id);
    }

    private boolean shouldRelocateTo(HashRing<Id> hashRing, Address address, Id id) {
        return isAssignedTo(hashRing, address, this.nodeId) && isAssignedTo(this.hashRing, address, id);
    }

    private static boolean isAssignedTo(HashRing<Id> hashRing, Address address, Id id) {
        return id.equals(hashRing.nodeOf(address.idString()));
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public QuorumObserver quorumObserver() {
        return this;
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public void setOutbound(GridActorControl.Outbound outbound) {
        this.outbound = outbound;
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public void setNodeId(Id id) {
        this.nodeId = id;
    }

    @Override // io.vlingo.xoom.actors.GridRuntime
    public ClassLoader worldClassLoader() {
        return world().classLoader();
    }

    protected <T> Stage.ActorProtocolActor<T> actorProtocolFor(Class<T> cls, Definition definition, Actor actor, Address address, Mailbox mailbox, Supervisor supervisor, Logger logger2) {
        Address unique = address == null ? addressFactory().unique() : address;
        Id nodeOf = this.hashRing.nodeOf(unique.idString());
        return super.actorProtocolFor(cls, definition, actor, unique, maybeRemoteMailbox(unique, definition, mailbox, () -> {
            this.outbound.start(nodeOf, this.nodeId, cls, unique, Definition.SerializationProxy.from(definition));
        }), supervisor, logger2);
    }

    protected Stage.ActorProtocolActor<Object>[] actorProtocolFor(Class<?>[] clsArr, Definition definition, Actor actor, Address address, Mailbox mailbox, Supervisor supervisor, Logger logger2) {
        Address unique = address == null ? addressFactory().unique() : address;
        Id nodeOf = this.hashRing.nodeOf(unique.idString());
        return super.actorProtocolFor(clsArr, definition, actor, unique, maybeRemoteMailbox(unique, definition, mailbox, () -> {
            this.outbound.start(nodeOf, this.nodeId, clsArr[0], unique, Definition.SerializationProxy.from(definition));
        }), supervisor, logger2);
    }

    private Mailbox maybeRemoteMailbox(Address address, Definition definition, Mailbox mailbox, Runnable runnable) {
        Mailbox mailbox2;
        while (!this.hasQuorum && address.isDistributable()) {
            logger.debug("Mailbox allocation waiting for cluster quorum...");
            try {
                Thread.sleep(this.clusterHealthCheckInterval);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        Id nodeOf = this.hashRing.nodeOf(address.idString());
        if (nodeOf == null || nodeOf.equals(this.nodeId)) {
            mailbox2 = mailbox;
        } else {
            runnable.run();
            mailbox2 = allocateMailbox(definition, address, mailbox);
            if (!mailbox2.isSuspendedFor("GridActor.Resume")) {
                mailbox2.suspendExceptFor("GridActor.Resume", new Class[]{Relocatable.class});
            }
        }
        return mailbox2;
    }
}
