/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.runtime;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.elasticsoftware.elasticactors.ActorNode;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorShard;
import org.elasticsoftware.elasticactors.ActorSystem;
import org.elasticsoftware.elasticactors.ElasticActor;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.cluster.ActorRefFactory;
import org.elasticsoftware.elasticactors.cluster.ActorRefTools;
import org.elasticsoftware.elasticactors.cluster.ActorShardRef;
import org.elasticsoftware.elasticactors.cluster.BaseDisconnectedActorRef;
import org.elasticsoftware.elasticactors.cluster.ClusterEventListener;
import org.elasticsoftware.elasticactors.cluster.ClusterMessageHandler;
import org.elasticsoftware.elasticactors.cluster.ClusterService;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystems;
import org.elasticsoftware.elasticactors.cluster.LocalClusterActorNodeRef;
import org.elasticsoftware.elasticactors.cluster.RebalancingEventListener;
import org.elasticsoftware.elasticactors.cluster.RemoteActorSystems;
import org.elasticsoftware.elasticactors.cluster.ServiceActorRef;
import org.elasticsoftware.elasticactors.cluster.ShardDistributionStrategy;
import org.elasticsoftware.elasticactors.cluster.ShardDistributor;
import org.elasticsoftware.elasticactors.cluster.messaging.ShardReleasedMessage;
import org.elasticsoftware.elasticactors.cluster.protobuf.Clustering;
import org.elasticsoftware.elasticactors.cluster.strategies.RunningNodeScaleDownStrategy;
import org.elasticsoftware.elasticactors.cluster.strategies.RunningNodeScaleUpStrategy;
import org.elasticsoftware.elasticactors.cluster.strategies.SingleNodeScaleUpStrategy;
import org.elasticsoftware.elasticactors.cluster.strategies.StartingNodeScaleUpStrategy;
import org.elasticsoftware.elasticactors.serialization.MessageDeserializer;
import org.elasticsoftware.elasticactors.serialization.MessageSerializer;
import org.elasticsoftware.elasticactors.serialization.MessagingSystemDeserializers;
import org.elasticsoftware.elasticactors.serialization.MessagingSystemSerializers;
import org.elasticsoftware.elasticactors.serialization.SerializationFramework;
import org.elasticsoftware.elasticactors.serialization.SystemDeserializers;
import org.elasticsoftware.elasticactors.serialization.SystemSerializers;
import org.elasticsoftware.elasticactors.util.ManifestTools;
import org.elasticsoftware.elasticactors.util.concurrent.DaemonThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public final class ElasticActorsNode
extends PhysicalNode
implements InternalActorSystems,
ActorRefFactory,
ClusterEventListener,
ClusterMessageHandler,
ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(ElasticActorsNode.class);
    private final String clusterName;
    private final SystemSerializers systemSerializers = new MessagingSystemSerializers((InternalActorSystems)this);
    private final SystemDeserializers systemDeserializers;
    private final CountDownLatch waitLatch = new CountDownLatch(1);
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final Cache<Class<? extends ElasticActor>, String> actorStateVersionCache = CacheBuilder.newBuilder().maximumSize(1024L).build();
    private final Cache<String, ActorRef> actorRefCache;
    private final Map<Class<? extends SerializationFramework>, SerializationFramework> serializationFrameworks = new ConcurrentHashMap<Class<? extends SerializationFramework>, SerializationFramework>();
    private ApplicationContext applicationContext;
    private ClusterService clusterService;
    private final LinkedBlockingQueue<ShardReleasedMessage> shardReleasedMessages = new LinkedBlockingQueue();
    private final AtomicReference<List<PhysicalNode>> currentTopology = new AtomicReference<Object>(null);
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DaemonThreadFactory("CLUSTER_SCHEDULER"));
    private final List<RebalancingEventListener> rebalancingEventListeners = new CopyOnWriteArrayList<RebalancingEventListener>();
    private final ActorRefTools actorRefTools;
    private InternalActorSystem internalActorSystem;
    private RemoteActorSystems remoteActorSystems;

    public ElasticActorsNode(String clusterName, String nodeId, InetAddress nodeAddress, Cache<String, ActorRef> actorRefCache) {
        super(nodeId, nodeAddress, true);
        this.clusterName = clusterName;
        this.systemDeserializers = new MessagingSystemDeserializers((InternalActorSystems)this, (ActorRefFactory)this);
        this.actorRefCache = actorRefCache;
        this.actorRefTools = new ActorRefTools((InternalActorSystems)this);
    }

    public ElasticActorsNode(String clusterName, String nodeId, InetAddress nodeAddress, Cache<String, ActorRef> actorRefCache, ActorRefFactory actorRefFactory) {
        super(nodeId, nodeAddress, true);
        this.clusterName = clusterName;
        this.systemDeserializers = new MessagingSystemDeserializers((InternalActorSystems)this, actorRefFactory);
        this.actorRefCache = actorRefCache;
        this.actorRefTools = new ActorRefTools((InternalActorSystems)this);
    }

    @Autowired
    public void setClusterService(ClusterService clusterService) {
        this.clusterService = clusterService;
        clusterService.addEventListener((ClusterEventListener)this);
        clusterService.setClusterMessageHandler((ClusterMessageHandler)this);
    }

    @PostConstruct
    public void init() throws Exception {
        logger.info("Starting up Elastic Actors");
    }

    @PreDestroy
    public void destroy() {
        logger.info("Stopping Elastic Actors");
        this.clusterService.reportPlannedShutdown();
        this.waitLatch.countDown();
    }

    public void onTopologyChanged(List<PhysicalNode> topology) throws Exception {
        List<PhysicalNode> previousTopology = this.currentTopology.getAndSet(topology);
        Object shardDistributionStrategy = previousTopology == null ? (topology.size() == 1 ? new SingleNodeScaleUpStrategy() : new StartingNodeScaleUpStrategy(this.shardReleasedMessages)) : (previousTopology.size() < topology.size() ? new RunningNodeScaleUpStrategy(this.shardReleasedMessages, this.clusterService) : (previousTopology.size() > topology.size() ? new RunningNodeScaleDownStrategy() : new RunningNodeScaleDownStrategy()));
        this.scheduledExecutorService.submit(new RebalancingRunnable((ShardDistributionStrategy)shardDistributionStrategy, topology));
    }

    public void onMasterElected(PhysicalNode masterNode) throws Exception {
    }

    public void handleMessage(byte[] message, String senderToken) {
        try {
            Clustering.ClusterMessage clusterMessage = Clustering.ClusterMessage.parseFrom((byte[])message);
            if (clusterMessage.hasShardReleased()) {
                ShardReleasedMessage shardReleasedMessage = new ShardReleasedMessage(clusterMessage.getShardReleased().getActorSystem(), clusterMessage.getShardReleased().getShardId());
                this.shardReleasedMessages.add(shardReleasedMessage);
            }
        }
        catch (InvalidProtocolBufferException e) {
            logger.error("Exception while deserializing ClusterMessage", (Throwable)e);
        }
    }

    public void join() throws Exception {
        this.clusterService.reportReady();
        try {
            this.waitLatch.await();
        }
        catch (InterruptedException interruptedException) {}
    }

    public ActorRef create(String refSpec) {
        ActorRef actorRef = (ActorRef)this.actorRefCache.getIfPresent((Object)refSpec);
        if (actorRef == null && !((actorRef = this.actorRefTools.parse(refSpec)) instanceof BaseDisconnectedActorRef)) {
            this.actorRefCache.put((Object)refSpec, (Object)actorRef);
        }
        return actorRef;
    }

    public String getClusterName() {
        return this.clusterName;
    }

    public InternalActorSystem get(String name) {
        if (this.internalActorSystem == null) {
            this.internalActorSystem = (InternalActorSystem)this.applicationContext.getBean(InternalActorSystem.class);
        }
        return this.internalActorSystem;
    }

    public ActorSystem getRemote(String clusterName, String actorSystemName) {
        if (this.remoteActorSystems == null) {
            this.remoteActorSystems = (RemoteActorSystems)this.applicationContext.getBean(RemoteActorSystems.class);
        }
        return this.remoteActorSystems.get(clusterName, actorSystemName);
    }

    public ActorSystem getRemote(String actorSystemName) {
        if (this.remoteActorSystems == null) {
            this.remoteActorSystems = (RemoteActorSystems)this.applicationContext.getBean(RemoteActorSystems.class);
        }
        return this.remoteActorSystems.get(actorSystemName);
    }

    public void registerRebalancingEventListener(RebalancingEventListener eventListener) {
        this.rebalancingEventListeners.add(eventListener);
    }

    public <T> MessageSerializer<T> getSystemMessageSerializer(Class<T> messageClass) {
        return this.systemSerializers.get(messageClass);
    }

    public <T> MessageDeserializer<T> getSystemMessageDeserializer(Class<T> messageClass) {
        return this.systemDeserializers.get(messageClass);
    }

    public SerializationFramework getSerializationFramework(Class<? extends SerializationFramework> frameworkClass) {
        return this.serializationFrameworks.computeIfAbsent(frameworkClass, arg_0 -> ((ApplicationContext)this.applicationContext).getBean(arg_0));
    }

    public ActorRef createPersistentActorRef(ActorShard shard, String actorId) {
        String refSpec = ActorShardRef.generateRefSpec((String)this.clusterName, (ActorShard)shard, (String)actorId);
        ActorRef ref = (ActorRef)this.actorRefCache.getIfPresent((Object)refSpec);
        if (ref == null) {
            ref = new ActorShardRef(this.clusterName, shard, actorId, this.get(null));
            this.actorRefCache.put((Object)refSpec, (Object)ref);
        }
        return ref;
    }

    public ActorRef createTempActorRef(ActorNode node, String actorId) {
        String refSpec = LocalClusterActorNodeRef.generateRefSpec((String)this.clusterName, (ActorNode)node, (String)actorId);
        ActorRef ref = (ActorRef)this.actorRefCache.getIfPresent((Object)refSpec);
        if (ref == null) {
            ref = new LocalClusterActorNodeRef(this.get(null), this.clusterName, node, actorId);
            this.actorRefCache.put((Object)refSpec, (Object)ref);
        }
        return ref;
    }

    public ActorRef createServiceActorRef(ActorNode node, String actorId) {
        String refSpec = ServiceActorRef.generateRefSpec((String)this.clusterName, (ActorNode)node, (String)actorId);
        ActorRef ref = (ActorRef)this.actorRefCache.getIfPresent((Object)refSpec);
        if (ref == null) {
            ref = new ServiceActorRef(this.get(null), this.clusterName, node, actorId);
            this.actorRefCache.put((Object)refSpec, (Object)ref);
        }
        return ref;
    }

    public String getActorStateVersion(Class<? extends ElasticActor> actorClass) {
        String version = (String)this.actorStateVersionCache.getIfPresent(actorClass);
        if (version == null) {
            version = ManifestTools.extractActorStateVersion(actorClass);
            this.actorStateVersionCache.put(actorClass, (Object)version);
        }
        return version;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private final class RebalancingRunnable
    implements Runnable {
        private final ShardDistributionStrategy shardDistributionStrategy;
        private final List<PhysicalNode> clusterNodes;

        private RebalancingRunnable(ShardDistributionStrategy shardDistributionStrategy, List<PhysicalNode> clusterNodes) {
            this.shardDistributionStrategy = shardDistributionStrategy;
            this.clusterNodes = clusterNodes;
        }

        @Override
        public void run() {
            if (ElasticActorsNode.this.initialized.compareAndSet(false, true)) {
                ElasticActorsNode.this.applicationContext.getBeansOfType(RemoteActorSystems.class).forEach((s, remoteActorSystems) -> {
                    try {
                        remoteActorSystems.init();
                    }
                    catch (Exception e) {
                        logger.error("IMPORTANT: Initializing Remote ActorSystems failed, ElasticActors cluster is unstable. Please check all nodes", (Throwable)e);
                    }
                });
            }
            for (RebalancingEventListener rebalancingEventListener : ElasticActorsNode.this.rebalancingEventListeners) {
                try {
                    if (this.shardDistributionStrategy instanceof RunningNodeScaleDownStrategy) {
                        rebalancingEventListener.preScaleDown();
                        continue;
                    }
                    rebalancingEventListener.preScaleUp();
                }
                catch (Exception e) {
                    logger.warn("Exception while calling RebalancingEventListener preScaleUp/Down [{}]", (Object)rebalancingEventListener.getClass().getName(), (Object)e);
                }
            }
            ShardDistributor distributor = (ShardDistributor)ElasticActorsNode.this.applicationContext.getBean(ShardDistributor.class);
            InternalActorSystem instance = (InternalActorSystem)ElasticActorsNode.this.applicationContext.getBean(InternalActorSystem.class);
            logger.info("Updating {} nodes for ActorSystem[{}]", (Object)this.clusterNodes.size(), (Object)instance.getName());
            try {
                distributor.updateNodes(this.clusterNodes);
            }
            catch (Exception e) {
                logger.error("IMPORTANT: ActorSystem[{}] failed to update nodes, ElasticActors cluster is unstable. Please check all nodes", (Object)instance.getName(), (Object)e);
            }
            logger.info("Rebalancing {} shards for ActorSystem[{}] using {}", new Object[]{instance.getNumberOfShards(), instance.getName(), this.shardDistributionStrategy.getClass().getSimpleName()});
            try {
                distributor.distributeShards(this.clusterNodes, this.shardDistributionStrategy);
            }
            catch (Exception e) {
                logger.error("IMPORTANT: ActorSystem[{}] failed to (re-)distribute shards,ElasticActors cluster is unstable. Please check all nodes", (Object)instance.getName(), (Object)e);
            }
            for (RebalancingEventListener rebalancingEventListener : ElasticActorsNode.this.rebalancingEventListeners) {
                try {
                    if (this.shardDistributionStrategy instanceof RunningNodeScaleDownStrategy) {
                        rebalancingEventListener.postScaleDown();
                        continue;
                    }
                    rebalancingEventListener.postScaleUp();
                }
                catch (Exception e) {
                    logger.warn("Exception while calling RebalancingEventListener postScaleUp/Down [{}]", (Object)rebalancingEventListener.getClass().getName(), (Object)e);
                }
            }
        }
    }
}

