package org.elasticsoftware.elasticactors.kafka;

import com.google.common.cache.Cache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.elasticsoftware.elasticactors.Actor;
import org.elasticsoftware.elasticactors.ActorContainer;
import org.elasticsoftware.elasticactors.ActorContainerRef;
import org.elasticsoftware.elasticactors.ActorContextHolder;
import org.elasticsoftware.elasticactors.ActorLifecycleListener;
import org.elasticsoftware.elasticactors.ActorLifecycleListenerRegistry;
import org.elasticsoftware.elasticactors.ActorNode;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorRefGroup;
import org.elasticsoftware.elasticactors.ActorShard;
import org.elasticsoftware.elasticactors.ActorState;
import org.elasticsoftware.elasticactors.ElasticActor;
import org.elasticsoftware.elasticactors.InitialStateProvider;
import org.elasticsoftware.elasticactors.InternalActorSystemConfiguration;
import org.elasticsoftware.elasticactors.ManagedActor;
import org.elasticsoftware.elasticactors.ManagedActorsRegistry;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.ShardKey;
import org.elasticsoftware.elasticactors.SingletonActor;
import org.elasticsoftware.elasticactors.TempActor;
import org.elasticsoftware.elasticactors.cache.NodeActorCacheManager;
import org.elasticsoftware.elasticactors.cache.ShardActorCacheManager;
import org.elasticsoftware.elasticactors.cluster.ActorRefFactory;
import org.elasticsoftware.elasticactors.cluster.ActorRefTools;
import org.elasticsoftware.elasticactors.cluster.ActorShardRef;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEvent;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListenerImpl;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListenerRegistry;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystems;
import org.elasticsoftware.elasticactors.cluster.LocalActorRefGroup;
import org.elasticsoftware.elasticactors.cluster.NodeSelector;
import org.elasticsoftware.elasticactors.cluster.NodeSelectorFactory;
import org.elasticsoftware.elasticactors.cluster.NodeSelectorHasher;
import org.elasticsoftware.elasticactors.cluster.ShardDistributionStrategy;
import org.elasticsoftware.elasticactors.cluster.ShardDistributor;
import org.elasticsoftware.elasticactors.cluster.scheduler.InternalScheduler;
import org.elasticsoftware.elasticactors.kafka.cluster.KafkaInternalActorSystems;
import org.elasticsoftware.elasticactors.kafka.scheduler.KafkaTopicScheduler;
import org.elasticsoftware.elasticactors.kafka.state.PersistentActorStoreFactory;
import org.elasticsoftware.elasticactors.kafka.utils.TopicHelper;
import org.elasticsoftware.elasticactors.messaging.ActorShardHasher;
import org.elasticsoftware.elasticactors.messaging.Hasher;
import org.elasticsoftware.elasticactors.messaging.internal.ActorType;
import org.elasticsoftware.elasticactors.messaging.internal.CreateActorMessage;
import org.elasticsoftware.elasticactors.messaging.internal.DestroyActorMessage;
import org.elasticsoftware.elasticactors.runtime.ElasticActorsNode;
import org.elasticsoftware.elasticactors.scheduler.Scheduler;
import org.elasticsoftware.elasticactors.serialization.Deserializer;
import org.elasticsoftware.elasticactors.serialization.Message;
import org.elasticsoftware.elasticactors.serialization.MessageDeserializer;
import org.elasticsoftware.elasticactors.serialization.MessageSerializer;
import org.elasticsoftware.elasticactors.serialization.Serializer;
import org.elasticsoftware.elasticactors.state.PersistentActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/elasticactors/kafka/KafkaActorSystemInstance.class */
public final class KafkaActorSystemInstance implements InternalActorSystem, ShardDistributor, ActorSystemEventListenerRegistry {
    private static final Logger logger = LoggerFactory.getLogger(KafkaActorSystemInstance.class);
    private final InternalActorSystemConfiguration configuration;
    private final NodeSelectorFactory nodeSelectorFactory;
    private final KafkaInternalActorSystems cluster;
    private final PhysicalNode localNode;
    private final ActorRefFactory actorRefFactory;
    private final KafkaActorThread[] shardThreads;
    private final KafkaActorShard[] actorShards;
    private final KafkaActorNode localActorNode;
    private final List<KafkaActorNode> activeNodes = new LinkedList();
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicBoolean stable = new AtomicBoolean(false);
    private final ConcurrentMap<Class, ElasticActor> actorInstances = new ConcurrentHashMap();
    private final KafkaTopicScheduler schedulerService = new KafkaTopicScheduler(this);
    private final Hasher actorShardHasher;
    private final Hasher nodeHasher;
    private final ActorLifecycleListenerRegistry actorLifecycleListenerRegistry;
    private final ManagedActorsRegistry managedActorsRegistry;

    public KafkaActorSystemInstance(ElasticActorsNode elasticActorsNode, InternalActorSystemConfiguration internalActorSystemConfiguration, NodeSelectorFactory nodeSelectorFactory, Integer num, String str, Cache<String, ActorRef> cache, ShardActorCacheManager shardActorCacheManager, NodeActorCacheManager nodeActorCacheManager, Serializer<PersistentActor<ShardKey>, byte[]> serializer, Deserializer<byte[], PersistentActor<ShardKey>> deserializer, ActorLifecycleListenerRegistry actorLifecycleListenerRegistry, PersistentActorStoreFactory persistentActorStoreFactory, ManagedActorsRegistry managedActorsRegistry) {
        this.actorLifecycleListenerRegistry = actorLifecycleListenerRegistry;
        this.localNode = elasticActorsNode;
        this.cluster = new KafkaInternalActorSystems(elasticActorsNode, cache);
        this.configuration = internalActorSystemConfiguration;
        this.nodeSelectorFactory = nodeSelectorFactory;
        this.actorRefFactory = this.cluster;
        this.actorShards = new KafkaActorShard[internalActorSystemConfiguration.getNumberOfShards()];
        this.shardThreads = new KafkaActorThread[num.intValue()];
        try {
            TopicHelper.ensureTopicsExists(str, elasticActorsNode.getId(), num.intValue(), this);
            for (int i = 0; i < num.intValue(); i++) {
                this.shardThreads[i] = new KafkaActorThread(this.cluster.getClusterName(), str, this.localNode.getId(), this, this.actorRefFactory, shardActorCacheManager, nodeActorCacheManager, serializer, deserializer, persistentActorStoreFactory);
            }
            for (int i2 = 0; i2 < internalActorSystemConfiguration.getNumberOfShards(); i2++) {
                this.actorShards[i2] = new KafkaActorShard(new ShardKey(internalActorSystemConfiguration.getName(), i2), this.shardThreads[i2 % num.intValue()], this);
            }
            this.localActorNode = new KafkaActorNode(this.localNode, this.shardThreads[0], this);
            this.activeNodes.add(this.localActorNode);
            for (int i3 = 1; i3 < this.shardThreads.length; i3++) {
                this.shardThreads[i3].assign(this.localActorNode, false);
            }
            this.managedActorsRegistry = managedActorsRegistry;
            this.actorShardHasher = new ActorShardHasher(Integer.valueOf(internalActorSystemConfiguration.getShardHashSeed()));
            this.nodeHasher = new NodeSelectorHasher(Integer.valueOf(internalActorSystemConfiguration.getShardDistributionHashSeed()));
        } catch (Exception e) {
            throw new RuntimeException("FATAL Exception on ensureTopicsExist", e);
        }
    }

    @PostConstruct
    public void init() {
        for (KafkaActorThread kafkaActorThread : this.shardThreads) {
            kafkaActorThread.start();
        }
    }

    @PreDestroy
    public void destroy() {
        logger.info("Shutting down ActorSystem [{}]", getName());
        for (KafkaActorThread kafkaActorThread : this.shardThreads) {
            kafkaActorThread.stopRunning();
        }
    }

    /* renamed from: getConfiguration, reason: merged with bridge method [inline-methods] */
    public InternalActorSystemConfiguration m1getConfiguration() {
        return this.configuration;
    }

    public ActorSystemEventListenerRegistry getEventListenerRegistry() {
        return this;
    }

    public ActorRefGroup groupOf(Collection<ActorRef> collection) throws IllegalArgumentException {
        Iterator<ActorRef> it = collection.iterator();
        while (it.hasNext()) {
            if (!(it.next() instanceof ActorShardRef)) {
                throw new IllegalArgumentException("Only Persistent Actors (annotated with @Actor) of the same ElasticActors cluster are allowed to form a group");
            }
        }
        ImmutableListMultimap.Builder builder = ImmutableListMultimap.builder();
        Iterator<ActorRef> it2 = collection.iterator();
        while (it2.hasNext()) {
            ActorShardRef actorShardRef = (ActorRef) it2.next();
            builder.put(actorShardRef.getActorContainer().getActorRef(), actorShardRef);
        }
        return new LocalActorRefGroup(builder.build());
    }

    public ActorRef tempActorFor(String str) {
        throw new UnsupportedOperationException("KafkaActorSystemInstance does not support tempActorFor logic because node partition cannot be determined");
    }

    public ElasticActor getActorInstance(ActorRef actorRef, Class<? extends ElasticActor> cls) {
        return this.actorInstances.computeIfAbsent(cls, cls2 -> {
            try {
                return (ElasticActor) cls.newInstance();
            } catch (Exception e) {
                logger.error("Exception creating actor instance for actorClass [{}]", cls.getName(), e);
                return null;
            }
        });
    }

    public ElasticActor getServiceInstance(ActorRef actorRef) {
        if (ActorRefTools.isService(actorRef)) {
            return this.configuration.getService(actorRef.getActorId());
        }
        return null;
    }

    public ActorNode getNode(String str) {
        return this.activeNodes.stream().filter(kafkaActorNode -> {
            return kafkaActorNode.getKey().getNodeId().equals(str);
        }).findFirst().orElse(null);
    }

    public ActorNode getNode() {
        return this.localActorNode;
    }

    public <T> MessageSerializer<T> getSerializer(Class<T> cls) {
        Message annotation;
        MessageSerializer<T> systemMessageSerializer = this.cluster.getSystemMessageSerializer(cls);
        if (systemMessageSerializer == null && (annotation = cls.getAnnotation(Message.class)) != null) {
            systemMessageSerializer = this.cluster.getSerializationFramework(annotation.serializationFramework()).getSerializer(cls);
        }
        return systemMessageSerializer;
    }

    public <T> MessageDeserializer<T> getDeserializer(Class<T> cls) {
        Message annotation;
        MessageDeserializer<T> systemMessageDeserializer = this.cluster.getSystemMessageDeserializer(cls);
        if (systemMessageDeserializer == null && (annotation = cls.getAnnotation(Message.class)) != null) {
            systemMessageDeserializer = this.cluster.getSerializationFramework(annotation.serializationFramework()).getDeserializer(cls);
        }
        return systemMessageDeserializer;
    }

    public InternalScheduler getInternalScheduler() {
        throw new UnsupportedOperationException();
    }

    public String getName() {
        return this.configuration.getName();
    }

    public <T> ActorRef actorOf(String str, Class<T> cls) throws Exception {
        if (cls.getAnnotation(Actor.class) == null) {
            throw new IllegalArgumentException("actorClass has to be annotated with @Actor");
        }
        if (cls.getAnnotation(SingletonActor.class) != null) {
            throw new IllegalArgumentException("actorClass is annotated with @SingletonActor and will be automatically created by the ActorSystem");
        }
        ManagedActor annotation = cls.getAnnotation(ManagedActor.class);
        if (annotation == null || !annotation.exclusive()) {
            return actorOf(str, cls.getName(), (ActorState) null);
        }
        throw new IllegalArgumentException("actorClass is annotated with @ManagedActor as exclusive and will be automatically created by the ActorSystem");
    }

    public ActorRef actorOf(String str, String str2) throws Exception {
        return actorOf(str, str2, (ActorState) null);
    }

    public <T> ActorRef actorOf(String str, Class<T> cls, ActorState actorState) throws Exception {
        if (cls.getAnnotation(Actor.class) == null) {
            throw new IllegalArgumentException("actorClass has to be annotated with @Actor");
        }
        if (cls.getAnnotation(SingletonActor.class) != null) {
            throw new IllegalArgumentException("actorClass is annotated with @SingletonActor and will be automatically created by the ActorSystem");
        }
        ManagedActor annotation = cls.getAnnotation(ManagedActor.class);
        if (annotation == null || !annotation.exclusive()) {
            return actorOf(str, cls.getName(), actorState);
        }
        throw new IllegalArgumentException("actorClass is annotated with @ManagedActor as exclusive and will be automatically created by the ActorSystem");
    }

    public ActorRef actorOf(String str, String str2, ActorState actorState) throws Exception {
        return actorOf(str, str2, actorState, ActorContextHolder.getSelf());
    }

    public ActorRef actorOf(String str, String str2, ActorState actorState, ActorRef actorRef) throws Exception {
        KafkaActorShard shardFor = shardFor(str);
        shardFor.sendMessage(actorRef, shardFor.getActorRef(), new CreateActorMessage(getName(), str2, str, actorState));
        return this.cluster.createPersistentActorRef(shardFor, str);
    }

    private KafkaActorShard shardFor(String str) {
        return this.actorShards[this.actorShardHasher.hashStringToInt(str) % this.actorShards.length];
    }

    public <T> ActorRef tempActorOf(Class<T> cls, @Nullable ActorState actorState) throws Exception {
        if (cls.getAnnotation(TempActor.class) == null) {
            throw new IllegalArgumentException("actorClass has to be annotated with @TempActor");
        }
        String uuid = UUID.randomUUID().toString();
        ActorRef self = ActorContextHolder.getSelf();
        String actorId = self != null ? self.getActorId() : null;
        CreateActorMessage createActorMessage = new CreateActorMessage(getName(), cls.getName(), uuid, actorState, ActorType.TEMP, actorId);
        KafkaActorThread actorThread = actorId != null ? shardFor(actorId).getActorThread() : shardFor(uuid).getActorThread();
        ActorRef createTempActorRef = this.cluster.createTempActorRef(this.localActorNode, actorThread.getNodeTopicPartitionId().intValue(), uuid);
        actorThread.createTempActor(createTempActorRef, createActorMessage);
        return createTempActorRef;
    }

    public ActorRef actorFor(String str) {
        return this.cluster.createPersistentActorRef(shardFor(str), str);
    }

    public ActorRef serviceActorFor(String str) {
        return this.cluster.createServiceActorRef(this.localActorNode, str);
    }

    public ActorRef serviceActorFor(String str, String str2) {
        ActorNode node = getNode(str);
        if (node != null) {
            return this.cluster.createServiceActorRef(node, str2);
        }
        throw new IllegalArgumentException(String.format("Unknown node [%s]", str));
    }

    public Scheduler getScheduler() {
        return this.schedulerService;
    }

    /* renamed from: getParent, reason: merged with bridge method [inline-methods] */
    public InternalActorSystems m2getParent() {
        return this.cluster;
    }

    public void stop(ActorRef actorRef) throws Exception {
        ActorRef self = ActorContextHolder.getSelf();
        ActorContainer actorContainer = ((ActorContainerRef) actorRef).getActorContainer();
        actorContainer.sendMessage(self, actorContainer.getActorRef(), new DestroyActorMessage(actorRef));
    }

    public List<ActorLifecycleListener<?>> getActorLifecycleListeners(Class<? extends ElasticActor> cls) {
        return this.actorLifecycleListenerRegistry.getListeners(cls);
    }

    public int getQueuesPerNode() {
        return 1;
    }

    public int getQueuesPerShard() {
        return 1;
    }

    public int getShardHashSeed() {
        return 0;
    }

    public int getMultiQueueHashSeed() {
        return 53;
    }

    public boolean isStable() {
        return this.stable.get();
    }

    public ActorShard getShard(String str) {
        String[] split = str.split("/");
        if (split[1].equals("shards")) {
            return getShard(Integer.parseInt(split[2]));
        }
        throw new IllegalArgumentException(String.format("No ActorShard found for actorPath [%s]", str));
    }

    public ActorShard getShard(int i) {
        return this.actorShards[i];
    }

    public int getNumberOfShards() {
        return this.configuration.getNumberOfShards();
    }

    public void updateNodes(List<PhysicalNode> list) throws Exception {
        HashMap hashMap = new HashMap();
        for (PhysicalNode physicalNode : list) {
            hashMap.put(physicalNode.getId(), physicalNode);
        }
        Set set = (Set) this.activeNodes.stream().map(kafkaActorNode -> {
            return kafkaActorNode.getKey().getNodeId();
        }).collect(Collectors.toSet());
        Iterator<KafkaActorNode> it = this.activeNodes.iterator();
        while (it.hasNext()) {
            KafkaActorNode next = it.next();
            if (!hashMap.containsKey(next.getKey().getNodeId())) {
                next.destroy();
                it.remove();
            }
        }
        list.stream().filter(physicalNode2 -> {
            return !set.contains(physicalNode2.getId());
        }).forEach(physicalNode3 -> {
            this.activeNodes.add(new KafkaActorNode(physicalNode3, this.shardThreads[this.activeNodes.size() % this.shardThreads.length], this));
        });
    }

    public void distributeShards(List<PhysicalNode> list, ShardDistributionStrategy shardDistributionStrategy) throws Exception {
        boolean z;
        boolean compareAndSet = this.initialized.compareAndSet(false, true);
        if (compareAndSet) {
            logger.info("Initializing ActorSystem [{}]", getName());
        }
        NodeSelector create = this.nodeSelectorFactory.create(this.nodeHasher, list);
        Multimap<PhysicalNode, ShardKey> create2 = HashMultimap.create();
        for (int i = 0; i < this.configuration.getNumberOfShards(); i++) {
            ShardKey shardKey = new ShardKey(this.configuration.getName(), i);
            create2.put(create.getPrimary(shardKey.toString()), shardKey);
        }
        CompletionStage<Boolean> completionStage = null;
        for (KafkaActorThread kafkaActorThread : this.shardThreads) {
            completionStage = completionStage == null ? kafkaActorThread.prepareRebalance(create2, shardDistributionStrategy) : completionStage.thenCombine(kafkaActorThread.prepareRebalance(create2, shardDistributionStrategy), (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
            });
        }
        try {
            z = completionStage.toCompletableFuture().get().booleanValue();
        } catch (ExecutionException e) {
            logger.error("FATAL Exception while executing prepareRebalance operation", e.getCause());
            z = false;
        } catch (Exception e2) {
            logger.error("Unexpected Exception while executing prepareRebalance operation", e2);
            z = false;
        }
        if (!shardDistributionStrategy.waitForReleasedShards(60L, TimeUnit.SECONDS)) {
            z = false;
        }
        HashSet hashSet = new HashSet();
        if (z) {
            CompletionStage<Set<Integer>> completionStage2 = null;
            for (KafkaActorThread kafkaActorThread2 : this.shardThreads) {
                completionStage2 = completionStage2 == null ? kafkaActorThread2.performRebalance() : completionStage2.thenCombine(kafkaActorThread2.performRebalance(), (set, set2) -> {
                    return ImmutableSet.builder().addAll(set).addAll(set2).build();
                });
            }
            try {
                hashSet.addAll(completionStage2.toCompletableFuture().get());
            } catch (ExecutionException e3) {
                logger.error("FATAL Exception while executing performRebalance operation", e3.getCause());
                z = false;
            } catch (Exception e4) {
                logger.error("Unexpected Exception while executing performRebalancer operation", e4);
                z = false;
            }
        }
        this.stable.set(z);
        CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
        if (compareAndSet) {
            completedFuture = this.localActorNode.initializeServiceActors();
        }
        completedFuture.thenRun(() -> {
            for (Class<? extends ElasticActor<?>> cls : this.managedActorsRegistry.getSingletonActorClasses()) {
                SingletonActor annotation = cls.getAnnotation(SingletonActor.class);
                String value = annotation.value();
                Class<? extends InitialStateProvider> initialStateProvider = annotation.initialStateProvider();
                ActorShard actorShard = (ActorShard) actorFor(value).getActorContainer();
                if (hashSet.contains(Integer.valueOf(actorShard.getKey().getShardId()))) {
                    createManagedActor(actorShard, cls, value, initialStateProvider);
                }
            }
            for (Class<? extends ElasticActor<?>> cls2 : this.managedActorsRegistry.getManagedActorClasses()) {
                ManagedActor annotation2 = cls2.getAnnotation(ManagedActor.class);
                for (String str : annotation2.value()) {
                    Class<? extends InitialStateProvider> initialStateProvider2 = annotation2.initialStateProvider();
                    ActorShard actorShard2 = (ActorShard) actorFor(str).getActorContainer();
                    if (hashSet.contains(Integer.valueOf(actorShard2.getKey().getShardId()))) {
                        createManagedActor(actorShard2, cls2, str, initialStateProvider2);
                    }
                }
            }
        });
        logger.info("Cluster shard mapping summary:");
        if (logger.isInfoEnabled()) {
            create2.asMap().forEach((physicalNode, collection) -> {
                logger.info("\t{} has {} shards assigned", physicalNode, Integer.valueOf(collection.size()));
            });
        }
    }

    private void createManagedActor(ActorShard actorShard, Class<? extends ElasticActor<?>> cls, String str, Class<? extends InitialStateProvider> cls2) {
        try {
            actorShard.sendMessage((ActorRef) null, actorShard.getActorRef(), new CreateActorMessage(getName(), cls.getName(), str, cls2.newInstance().getInitialState(str, cls.getAnnotation(Actor.class).stateClass())));
        } catch (Exception e) {
            logger.error("Could not create default actor state for managed actor {} of type {}", new Object[]{str, cls.getName(), e});
        }
    }

    public void register(ActorRef actorRef, ActorSystemEvent actorSystemEvent, Object obj) throws IOException {
        if (!(actorRef instanceof ActorShardRef)) {
            throw new IllegalArgumentException("ActorRef must be referencing a Persistent Actor (i.e. annotated with @Actor)");
        }
        KafkaActorShard actorContainer = ((ActorShardRef) actorRef).getActorContainer();
        ByteBuffer serialize = getSerializer(obj.getClass()).serialize(obj);
        byte[] bArr = new byte[serialize.remaining()];
        serialize.get(bArr);
        actorContainer.getActorThread().register(actorContainer.getKey(), actorSystemEvent, new ActorSystemEventListenerImpl(actorRef.getActorId(), obj.getClass(), bArr));
    }

    public void deregister(ActorRef actorRef, ActorSystemEvent actorSystemEvent) {
        if (!(actorRef instanceof ActorShardRef)) {
            throw new IllegalArgumentException("ActorRef must be referencing a Persistent Actor (i.e. annotated with @Actor)");
        }
        KafkaActorShard actorContainer = ((ActorShardRef) actorRef).getActorContainer();
        actorContainer.getActorThread().deregister(actorContainer.getKey(), actorSystemEvent, actorRef);
    }
}
