/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.kafka;

import com.google.common.cache.Cache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.elasticsoftware.elasticactors.Actor;
import org.elasticsoftware.elasticactors.ActorContainer;
import org.elasticsoftware.elasticactors.ActorContainerRef;
import org.elasticsoftware.elasticactors.ActorContextHolder;
import org.elasticsoftware.elasticactors.ActorLifecycleListener;
import org.elasticsoftware.elasticactors.ActorLifecycleListenerRegistry;
import org.elasticsoftware.elasticactors.ActorNode;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorRefGroup;
import org.elasticsoftware.elasticactors.ActorShard;
import org.elasticsoftware.elasticactors.ActorState;
import org.elasticsoftware.elasticactors.ElasticActor;
import org.elasticsoftware.elasticactors.InitialStateProvider;
import org.elasticsoftware.elasticactors.InternalActorSystemConfiguration;
import org.elasticsoftware.elasticactors.ManagedActor;
import org.elasticsoftware.elasticactors.ManagedActorsRegistry;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.ShardKey;
import org.elasticsoftware.elasticactors.SingletonActor;
import org.elasticsoftware.elasticactors.TempActor;
import org.elasticsoftware.elasticactors.cache.NodeActorCacheManager;
import org.elasticsoftware.elasticactors.cache.ShardActorCacheManager;
import org.elasticsoftware.elasticactors.cluster.ActorRefFactory;
import org.elasticsoftware.elasticactors.cluster.ActorRefTools;
import org.elasticsoftware.elasticactors.cluster.ActorShardRef;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEvent;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListener;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListenerImpl;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListenerRegistry;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystems;
import org.elasticsoftware.elasticactors.cluster.LocalActorRefGroup;
import org.elasticsoftware.elasticactors.cluster.NodeSelector;
import org.elasticsoftware.elasticactors.cluster.NodeSelectorFactory;
import org.elasticsoftware.elasticactors.cluster.ShardDistributionStrategy;
import org.elasticsoftware.elasticactors.cluster.ShardDistributor;
import org.elasticsoftware.elasticactors.cluster.scheduler.InternalScheduler;
import org.elasticsoftware.elasticactors.kafka.KafkaActorNode;
import org.elasticsoftware.elasticactors.kafka.KafkaActorShard;
import org.elasticsoftware.elasticactors.kafka.KafkaActorThread;
import org.elasticsoftware.elasticactors.kafka.cluster.KafkaInternalActorSystems;
import org.elasticsoftware.elasticactors.kafka.scheduler.KafkaTopicScheduler;
import org.elasticsoftware.elasticactors.kafka.state.PersistentActorStoreFactory;
import org.elasticsoftware.elasticactors.kafka.utils.TopicHelper;
import org.elasticsoftware.elasticactors.messaging.internal.ActorType;
import org.elasticsoftware.elasticactors.messaging.internal.CreateActorMessage;
import org.elasticsoftware.elasticactors.messaging.internal.DestroyActorMessage;
import org.elasticsoftware.elasticactors.runtime.ElasticActorsNode;
import org.elasticsoftware.elasticactors.scheduler.Scheduler;
import org.elasticsoftware.elasticactors.serialization.Deserializer;
import org.elasticsoftware.elasticactors.serialization.Message;
import org.elasticsoftware.elasticactors.serialization.MessageDeserializer;
import org.elasticsoftware.elasticactors.serialization.MessageSerializer;
import org.elasticsoftware.elasticactors.serialization.SerializationFramework;
import org.elasticsoftware.elasticactors.serialization.Serializer;
import org.elasticsoftware.elasticactors.state.PersistentActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaActorSystemInstance
implements InternalActorSystem,
ShardDistributor,
ActorSystemEventListenerRegistry {
    private static final Logger logger = LoggerFactory.getLogger(KafkaActorSystemInstance.class);
    private final InternalActorSystemConfiguration configuration;
    private final NodeSelectorFactory nodeSelectorFactory;
    private final KafkaInternalActorSystems cluster;
    private final PhysicalNode localNode;
    private final ActorRefFactory actorRefFactory;
    private final KafkaActorThread[] shardThreads;
    private final KafkaActorShard[] actorShards;
    private final KafkaActorNode localActorNode;
    private final List<KafkaActorNode> activeNodes = new LinkedList<KafkaActorNode>();
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final AtomicBoolean stable = new AtomicBoolean(false);
    private final ConcurrentMap<Class, ElasticActor> actorInstances = new ConcurrentHashMap<Class, ElasticActor>();
    private final KafkaTopicScheduler schedulerService;
    private final HashFunction hashFunction = Hashing.murmur3_32();
    private final ActorLifecycleListenerRegistry actorLifecycleListenerRegistry;
    private final ManagedActorsRegistry managedActorsRegistry;

    public KafkaActorSystemInstance(ElasticActorsNode node, InternalActorSystemConfiguration configuration, NodeSelectorFactory nodeSelectorFactory, Integer numberOfShardThreads, String bootstrapServers, Cache<String, ActorRef> actorRefCache, ShardActorCacheManager shardActorCacheManager, NodeActorCacheManager nodeActorCacheManager, Serializer<PersistentActor<ShardKey>, byte[]> stateSerializer, Deserializer<byte[], PersistentActor<ShardKey>> stateDeserializer, ActorLifecycleListenerRegistry actorLifecycleListenerRegistry, PersistentActorStoreFactory persistentActorStoreFactory, ManagedActorsRegistry managedActorsRegistry) {
        int i;
        this.actorLifecycleListenerRegistry = actorLifecycleListenerRegistry;
        this.schedulerService = new KafkaTopicScheduler(this);
        this.localNode = node;
        this.cluster = new KafkaInternalActorSystems((InternalActorSystems)node, actorRefCache);
        this.configuration = configuration;
        this.nodeSelectorFactory = nodeSelectorFactory;
        this.actorRefFactory = this.cluster;
        this.actorShards = new KafkaActorShard[configuration.getNumberOfShards()];
        this.shardThreads = new KafkaActorThread[numberOfShardThreads.intValue()];
        try {
            TopicHelper.ensureTopicsExists(bootstrapServers, node.getId(), numberOfShardThreads, this);
        }
        catch (Exception e) {
            throw new RuntimeException("FATAL Exception on ensureTopicsExist", e);
        }
        for (i = 0; i < numberOfShardThreads; ++i) {
            this.shardThreads[i] = new KafkaActorThread(this.cluster.getClusterName(), bootstrapServers, this.localNode.getId(), this, this.actorRefFactory, shardActorCacheManager, nodeActorCacheManager, stateSerializer, stateDeserializer, persistentActorStoreFactory);
        }
        for (i = 0; i < configuration.getNumberOfShards(); ++i) {
            this.actorShards[i] = new KafkaActorShard(new ShardKey(configuration.getName(), i), this.shardThreads[i % numberOfShardThreads], this);
        }
        this.localActorNode = new KafkaActorNode(this.localNode, this.shardThreads[0], this);
        this.activeNodes.add(this.localActorNode);
        for (i = 1; i < this.shardThreads.length; ++i) {
            this.shardThreads[i].assign(this.localActorNode, false);
        }
        this.managedActorsRegistry = managedActorsRegistry;
    }

    @PostConstruct
    public void init() {
        for (KafkaActorThread shardThread : this.shardThreads) {
            shardThread.start();
        }
    }

    @PreDestroy
    public void destroy() {
        for (KafkaActorThread shardThread : this.shardThreads) {
            shardThread.stopRunning();
        }
    }

    public InternalActorSystemConfiguration getConfiguration() {
        return this.configuration;
    }

    public ActorSystemEventListenerRegistry getEventListenerRegistry() {
        return this;
    }

    public ActorRefGroup groupOf(Collection<ActorRef> members) throws IllegalArgumentException {
        for (ActorRef member : members) {
            if (member instanceof ActorShardRef) continue;
            throw new IllegalArgumentException("Only Persistent Actors (annotated with @Actor) of the same ElasticActors cluster are allowed to form a group");
        }
        ImmutableListMultimap.Builder memberMap = ImmutableListMultimap.builder();
        for (ActorRef member : members) {
            memberMap.put((Object)((ActorShardRef)((ActorShardRef)member).getActorContainer().getActorRef()), (Object)member);
        }
        return new LocalActorRefGroup(memberMap.build());
    }

    public ActorRef tempActorFor(String actorId) {
        throw new UnsupportedOperationException("KafkaActorSystemInstance does not support tempActorFor logic because node partition cannot be determined");
    }

    public ElasticActor getActorInstance(ActorRef actorRef, Class<? extends ElasticActor> actorClass) {
        return this.actorInstances.computeIfAbsent(actorClass, k -> {
            try {
                return (ElasticActor)actorClass.newInstance();
            }
            catch (Exception e) {
                logger.error("Exception creating actor instance for actorClass [{}]", (Object)actorClass.getName(), (Object)e);
                return null;
            }
        });
    }

    public ElasticActor getServiceInstance(ActorRef serviceRef) {
        if (ActorRefTools.isService((ActorRef)serviceRef)) {
            return this.configuration.getService(serviceRef.getActorId());
        }
        return null;
    }

    public ActorNode getNode(String nodeId) {
        return this.activeNodes.stream().filter(kafkaActorNode -> kafkaActorNode.getKey().getNodeId().equals(nodeId)).findFirst().orElse(null);
    }

    public ActorNode getNode() {
        return this.localActorNode;
    }

    public <T> MessageSerializer<T> getSerializer(Class<T> messageClass) {
        Message messageAnnotation;
        MessageSerializer messageSerializer = this.cluster.getSystemMessageSerializer(messageClass);
        if (messageSerializer == null && (messageAnnotation = messageClass.getAnnotation(Message.class)) != null) {
            SerializationFramework framework = this.cluster.getSerializationFramework(messageAnnotation.serializationFramework());
            messageSerializer = framework.getSerializer(messageClass);
        }
        return messageSerializer;
    }

    public <T> MessageDeserializer<T> getDeserializer(Class<T> messageClass) {
        Message messageAnnotation;
        MessageDeserializer messageDeserializer = this.cluster.getSystemMessageDeserializer(messageClass);
        if (messageDeserializer == null && (messageAnnotation = messageClass.getAnnotation(Message.class)) != null) {
            SerializationFramework framework = this.cluster.getSerializationFramework(messageAnnotation.serializationFramework());
            messageDeserializer = framework.getDeserializer(messageClass);
        }
        return messageDeserializer;
    }

    public InternalScheduler getInternalScheduler() {
        throw new UnsupportedOperationException();
    }

    public String getName() {
        return this.configuration.getName();
    }

    public <T> ActorRef actorOf(String actorId, Class<T> actorClass) throws Exception {
        if (actorClass.getAnnotation(Actor.class) == null) {
            throw new IllegalArgumentException("actorClass has to be annotated with @Actor");
        }
        if (actorClass.getAnnotation(SingletonActor.class) != null) {
            throw new IllegalArgumentException("actorClass is annotated with @SingletonActor and will be automatically created by the ActorSystem");
        }
        ManagedActor managedActorAnnotation = actorClass.getAnnotation(ManagedActor.class);
        if (managedActorAnnotation != null && managedActorAnnotation.exclusive()) {
            throw new IllegalArgumentException("actorClass is annotated with @ManagedActor as exclusive and will be automatically created by the ActorSystem");
        }
        return this.actorOf(actorId, actorClass.getName(), null);
    }

    public ActorRef actorOf(String actorId, String actorClassName) throws Exception {
        return this.actorOf(actorId, actorClassName, null);
    }

    public <T> ActorRef actorOf(String actorId, Class<T> actorClass, ActorState initialState) throws Exception {
        if (actorClass.getAnnotation(Actor.class) == null) {
            throw new IllegalArgumentException("actorClass has to be annotated with @Actor");
        }
        if (actorClass.getAnnotation(SingletonActor.class) != null) {
            throw new IllegalArgumentException("actorClass is annotated with @SingletonActor and will be automatically created by the ActorSystem");
        }
        ManagedActor managedActorAnnotation = actorClass.getAnnotation(ManagedActor.class);
        if (managedActorAnnotation != null && managedActorAnnotation.exclusive()) {
            throw new IllegalArgumentException("actorClass is annotated with @ManagedActor as exclusive and will be automatically created by the ActorSystem");
        }
        return this.actorOf(actorId, actorClass.getName(), initialState);
    }

    public ActorRef actorOf(String actorId, String actorClassName, ActorState initialState) throws Exception {
        return this.actorOf(actorId, actorClassName, initialState, ActorContextHolder.getSelf());
    }

    public ActorRef actorOf(String actorId, String actorClassName, ActorState initialState, ActorRef creator) throws Exception {
        KafkaActorShard shard = this.shardFor(actorId);
        CreateActorMessage createActorMessage = new CreateActorMessage(this.getName(), actorClassName, actorId, initialState);
        shard.sendMessage(creator, shard.getActorRef(), (Object)createActorMessage);
        return this.cluster.createPersistentActorRef(shard, actorId);
    }

    private KafkaActorShard shardFor(String actorId) {
        return this.actorShards[Math.abs(this.hashFunction.hashString((CharSequence)actorId, StandardCharsets.UTF_8).asInt()) % this.actorShards.length];
    }

    public <T> ActorRef tempActorOf(Class<T> actorClass, @Nullable ActorState initialState) throws Exception {
        if (actorClass.getAnnotation(TempActor.class) == null) {
            throw new IllegalArgumentException("actorClass has to be annotated with @TempActor");
        }
        String actorId = UUID.randomUUID().toString();
        ActorRef self = ActorContextHolder.getSelf();
        String affinityKey = self != null ? self.getActorId() : null;
        CreateActorMessage createActorMessage = new CreateActorMessage(this.getName(), actorClass.getName(), actorId, initialState, ActorType.TEMP, affinityKey);
        KafkaActorThread actorThread = affinityKey != null ? this.shardFor(affinityKey).getActorThread() : this.shardFor(actorId).getActorThread();
        ActorRef tempActorRef = this.cluster.createTempActorRef(this.localActorNode, actorThread.getNodeTopicPartitionId(), actorId);
        actorThread.createTempActor(tempActorRef, createActorMessage);
        return tempActorRef;
    }

    public ActorRef actorFor(String actorId) {
        KafkaActorShard shard = this.shardFor(actorId);
        return this.cluster.createPersistentActorRef(shard, actorId);
    }

    public ActorRef serviceActorFor(String actorId) {
        return this.cluster.createServiceActorRef(this.localActorNode, actorId);
    }

    public ActorRef serviceActorFor(String nodeId, String actorId) {
        ActorNode node = this.getNode(nodeId);
        if (node != null) {
            return this.cluster.createServiceActorRef(node, actorId);
        }
        throw new IllegalArgumentException(String.format("Unknown node [%s]", nodeId));
    }

    public Scheduler getScheduler() {
        return this.schedulerService;
    }

    public InternalActorSystems getParent() {
        return this.cluster;
    }

    public void stop(ActorRef actorRef) throws Exception {
        ActorRef sender = ActorContextHolder.getSelf();
        ActorContainer handlingContainer = ((ActorContainerRef)actorRef).getActorContainer();
        handlingContainer.sendMessage(sender, handlingContainer.getActorRef(), (Object)new DestroyActorMessage(actorRef));
    }

    public List<ActorLifecycleListener<?>> getActorLifecycleListeners(Class<? extends ElasticActor> actorClass) {
        return this.actorLifecycleListenerRegistry.getListeners(actorClass);
    }

    public boolean isStable() {
        return this.stable.get();
    }

    public ActorShard getShard(String actorPath) {
        String[] pathElements = actorPath.split("/");
        if (pathElements[1].equals("shards")) {
            return this.getShard(Integer.parseInt(pathElements[2]));
        }
        throw new IllegalArgumentException(String.format("No ActorShard found for actorPath [%s]", actorPath));
    }

    public ActorShard getShard(int shardId) {
        return this.actorShards[shardId];
    }

    public int getNumberOfShards() {
        return this.configuration.getNumberOfShards();
    }

    public void updateNodes(List<PhysicalNode> nodes) throws Exception {
        HashMap<String, PhysicalNode> nodeMap = new HashMap<String, PhysicalNode>();
        for (PhysicalNode node : nodes) {
            nodeMap.put(node.getId(), node);
        }
        Set activeNodeIds = this.activeNodes.stream().map(kafkaActorNode -> kafkaActorNode.getKey().getNodeId()).collect(Collectors.toSet());
        Iterator<KafkaActorNode> nodeIterator = this.activeNodes.iterator();
        while (nodeIterator.hasNext()) {
            KafkaActorNode node = nodeIterator.next();
            if (nodeMap.containsKey(node.getKey().getNodeId())) continue;
            node.destroy();
            nodeIterator.remove();
        }
        nodes.stream().filter(physicalNode -> !activeNodeIds.contains(physicalNode.getId())).forEach(physicalNode -> this.activeNodes.add(new KafkaActorNode((PhysicalNode)physicalNode, this.shardThreads[this.activeNodes.size() % this.shardThreads.length], this)));
    }

    public void distributeShards(List<PhysicalNode> nodes, ShardDistributionStrategy strategy) throws Exception {
        boolean initializing = this.initialized.compareAndSet(false, true);
        if (initializing) {
            logger.info("Initializing ActorSystem [{}]", (Object)this.getName());
        }
        NodeSelector nodeSelector = this.nodeSelectorFactory.create(nodes);
        HashMultimap shardDistribution = HashMultimap.create();
        boolean stable = true;
        for (int i = 0; i < this.configuration.getNumberOfShards(); ++i) {
            ShardKey shardKey = new ShardKey(this.configuration.getName(), i);
            PhysicalNode node = nodeSelector.getPrimary(shardKey.toString());
            shardDistribution.put((Object)node, (Object)shardKey);
        }
        CompletionStage<Boolean> result = null;
        for (ShardKey shardThread : this.shardThreads) {
            result = result == null ? shardThread.prepareRebalance((Multimap<PhysicalNode, ShardKey>)shardDistribution, strategy) : result.thenCombine(shardThread.prepareRebalance((Multimap<PhysicalNode, ShardKey>)shardDistribution, strategy), (b1, b2) -> b1 != false && b2 != false);
        }
        try {
            stable = (Boolean)result.toCompletableFuture().get();
        }
        catch (ExecutionException e) {
            logger.error("FATAL Exception while executing prepareRebalance operation", e.getCause());
            stable = false;
        }
        catch (Exception e) {
            logger.error("Unexpected Exception while executing prepareRebalance operation", (Throwable)e);
            stable = false;
        }
        if (!strategy.waitForReleasedShards(60L, TimeUnit.SECONDS)) {
            stable = false;
        }
        HashSet newLocalShards = new HashSet();
        if (stable) {
            CompletionStage<Set<Integer>> performRebalanceResult = null;
            for (KafkaActorThread shardThread : this.shardThreads) {
                performRebalanceResult = performRebalanceResult == null ? shardThread.performRebalance() : performRebalanceResult.thenCombine(shardThread.performRebalance(), (s1, s2) -> ImmutableSet.builder().addAll((Iterable)s1).addAll((Iterable)s2).build());
            }
            try {
                newLocalShards.addAll((Collection)performRebalanceResult.toCompletableFuture().get());
            }
            catch (ExecutionException e) {
                logger.error("FATAL Exception while executing performRebalance operation", e.getCause());
                stable = false;
            }
            catch (Exception e) {
                logger.error("Unexpected Exception while executing performRebalancer operation", (Throwable)e);
                stable = false;
            }
        }
        this.stable.set(stable);
        CompletableFuture<Object> serviceActorsInitilization = CompletableFuture.completedFuture(null);
        if (initializing) {
            serviceActorsInitilization = this.localActorNode.initializeServiceActors();
        }
        serviceActorsInitilization.thenRun(() -> {
            for (Class actorClass : this.managedActorsRegistry.getSingletonActorClasses()) {
                SingletonActor singletonActor = actorClass.getAnnotation(SingletonActor.class);
                String actorId = singletonActor.value();
                Class initialStateProviderClass = singletonActor.initialStateProvider();
                ActorShardRef actorRef = (ActorShardRef)this.actorFor(actorId);
                ActorShard shard = (ActorShard)actorRef.getActorContainer();
                if (!newLocalShards.contains(shard.getKey().getShardId())) continue;
                this.createManagedActor(shard, actorClass, actorId, initialStateProviderClass);
            }
            for (Class actorClass : this.managedActorsRegistry.getManagedActorClasses()) {
                String[] actorIds;
                ManagedActor managedActor = actorClass.getAnnotation(ManagedActor.class);
                for (String actorId : actorIds = managedActor.value()) {
                    Class initialStateProviderClass = managedActor.initialStateProvider();
                    ActorShardRef actorRef = (ActorShardRef)this.actorFor(actorId);
                    ActorShard shard = (ActorShard)actorRef.getActorContainer();
                    if (!newLocalShards.contains(shard.getKey().getShardId())) continue;
                    this.createManagedActor(shard, actorClass, actorId, initialStateProviderClass);
                }
            }
        });
        logger.info("Cluster shard mapping summary:");
        for (Map.Entry entry : shardDistribution.asMap().entrySet()) {
            logger.info("\t{} has {} shards assigned", entry.getKey(), (Object)((Collection)entry.getValue()).size());
        }
    }

    private void createManagedActor(ActorShard shard, Class<? extends ElasticActor<?>> actorClass, String actorId, Class<? extends InitialStateProvider> initialStateProviderClass) {
        try {
            Class stateClass = actorClass.getAnnotation(Actor.class).stateClass();
            InitialStateProvider initialStateProvider = initialStateProviderClass.newInstance();
            ActorState initialState = initialStateProvider.getInitialState(actorId, stateClass);
            shard.sendMessage(null, shard.getActorRef(), (Object)new CreateActorMessage(this.getName(), actorClass.getName(), actorId, initialState));
        }
        catch (Exception e) {
            logger.error("Could not create default actor state for managed actor {} of type {}", new Object[]{actorId, actorClass.getName(), e});
        }
    }

    public void register(ActorRef receiver, ActorSystemEvent event, Object message) throws IOException {
        if (!(receiver instanceof ActorShardRef)) {
            throw new IllegalArgumentException("ActorRef must be referencing a Persistent Actor (i.e. annotated with @Actor)");
        }
        KafkaActorShard actorShard = (KafkaActorShard)((ActorShardRef)receiver).getActorContainer();
        MessageSerializer<?> serializer = this.getSerializer(message.getClass());
        ByteBuffer serializedMessage = serializer.serialize(message);
        byte[] serializedBytes = new byte[serializedMessage.remaining()];
        serializedMessage.get(serializedBytes);
        actorShard.getActorThread().register(actorShard.getKey(), event, (ActorSystemEventListener)new ActorSystemEventListenerImpl(receiver.getActorId(), message.getClass(), serializedBytes));
    }

    public void deregister(ActorRef receiver, ActorSystemEvent event) {
        if (!(receiver instanceof ActorShardRef)) {
            throw new IllegalArgumentException("ActorRef must be referencing a Persistent Actor (i.e. annotated with @Actor)");
        }
        KafkaActorShard actorShard = (KafkaActorShard)((ActorShardRef)receiver).getActorContainer();
        actorShard.getActorThread().deregister(actorShard.getKey(), event, receiver);
    }
}

