/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.kafka;

import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.elasticsoftware.elasticactors.ActorContainerRef;
import org.elasticsoftware.elasticactors.ActorContext;
import org.elasticsoftware.elasticactors.ActorNode;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorShard;
import org.elasticsoftware.elasticactors.ActorSystem;
import org.elasticsoftware.elasticactors.ElasticActor;
import org.elasticsoftware.elasticactors.NodeKey;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.ShardKey;
import org.elasticsoftware.elasticactors.cache.EvictionListener;
import org.elasticsoftware.elasticactors.cache.NodeActorCacheManager;
import org.elasticsoftware.elasticactors.cache.ShardActorCacheManager;
import org.elasticsoftware.elasticactors.cluster.ActorRefFactory;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEvent;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListener;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.ShardDistributionStrategy;
import org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage;
import org.elasticsoftware.elasticactors.kafka.InternalActorContext;
import org.elasticsoftware.elasticactors.kafka.KafkaActorNode;
import org.elasticsoftware.elasticactors.kafka.KafkaActorShard;
import org.elasticsoftware.elasticactors.kafka.KafkaActorSystemInstance;
import org.elasticsoftware.elasticactors.kafka.KafkaTransactionContext;
import org.elasticsoftware.elasticactors.kafka.ManagedActorContainer;
import org.elasticsoftware.elasticactors.kafka.ServiceActorContext;
import org.elasticsoftware.elasticactors.kafka.cluster.ActorLifecycleFunction;
import org.elasticsoftware.elasticactors.kafka.cluster.ApplicationProtocol;
import org.elasticsoftware.elasticactors.kafka.cluster.ReactiveStreamsProtocol;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaActorSystemEventListenerDeserializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaInternalMessageDeserializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaInternalMessageSerializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaPersistentActorSerializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaProducerSerializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaScheduledMessageDeserializer;
import org.elasticsoftware.elasticactors.kafka.serialization.UUIDDeserializer;
import org.elasticsoftware.elasticactors.kafka.state.PersistentActorStore;
import org.elasticsoftware.elasticactors.kafka.state.PersistentActorStoreFactory;
import org.elasticsoftware.elasticactors.kafka.utils.TopicNamesHelper;
import org.elasticsoftware.elasticactors.messaging.DefaultInternalMessage;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.internal.ActorNodeMessage;
import org.elasticsoftware.elasticactors.messaging.internal.CancelScheduledMessageMessage;
import org.elasticsoftware.elasticactors.messaging.internal.CreateActorMessage;
import org.elasticsoftware.elasticactors.messaging.internal.DestroyActorMessage;
import org.elasticsoftware.elasticactors.messaging.internal.PersistActorMessage;
import org.elasticsoftware.elasticactors.serialization.Deserializer;
import org.elasticsoftware.elasticactors.serialization.Message;
import org.elasticsoftware.elasticactors.serialization.MessageSerializer;
import org.elasticsoftware.elasticactors.serialization.SerializationAccessor;
import org.elasticsoftware.elasticactors.serialization.SerializationContext;
import org.elasticsoftware.elasticactors.serialization.Serializer;
import org.elasticsoftware.elasticactors.serialization.internal.ActorRefDeserializer;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageSerializer;
import org.elasticsoftware.elasticactors.serialization.internal.ScheduledMessageDeserializer;
import org.elasticsoftware.elasticactors.state.PersistentActor;
import org.elasticsoftware.elasticactors.util.ClassLoadingHelper;
import org.elasticsoftware.elasticactors.util.ManifestTools;
import org.elasticsoftware.elasticactors.util.SerializationTools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaActorThread
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(KafkaActorSystemInstance.class);
    private static final AtomicInteger THREAD_ID_SEQUENCE = new AtomicInteger(0);
    private static final long DEFAULT_OFFSET_INCREASE = 2L;
    private static final PersistentActor<ShardKey> TOMBSTONE = new PersistentActor(null, null, null, null, null, null);
    private final KafkaConsumer<UUID, InternalMessage> messageConsumer;
    private final KafkaProducer<Object, Object> producer;
    private final KafkaConsumer<String, byte[]> stateConsumer;
    private final KafkaConsumer<String, ActorSystemEventListener> actorSystemEventListenersConsumer;
    private final KafkaConsumer<UUID, ScheduledMessage> scheduledMessagesConsumer;
    private final String clusterName;
    private final InternalActorSystem internalActorSystem;
    private final BlockingQueue<BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>>> commands;
    private final Set<ShardKey> newLocalShards = new HashSet<ShardKey>();
    private final Map<ShardKey, ManagedActorShard> localShards = new HashMap<ShardKey, ManagedActorShard>();
    private ManagedActorNode localActorNode;
    private final Map<ShardKey, KafkaActorShard> managedShards = new HashMap<ShardKey, KafkaActorShard>();
    private final ShardActorCacheManager shardActorCacheManager;
    private final NodeActorCacheManager nodeActorCacheManager;
    private final Serializer<PersistentActor<ShardKey>, byte[]> stateSerializer;
    private final Deserializer<byte[], PersistentActor<ShardKey>> stateDeserializer;
    private final PersistentActorStoreFactory persistentActorStoreFactory;
    private final String messagesTopic;
    private final String scheduledMessagesTopic;
    private final String actorSystemEventListenersTopic;
    private final String persistentActorsTopic;
    private boolean RUNNING = true;
    private final Integer nodeTopicPartitionId;
    private final Callback loggingCallback = (metadata, exception) -> {
        if (exception != null) {
            logger.error("Exception while sending message to KafkaProducer", (Throwable)exception);
        }
    };
    private KafkaActorSystemState state = KafkaActorSystemState.INITIALIZING;

    public KafkaActorThread(String clusterName, String bootstrapServers, String nodeId, InternalActorSystem internalActorSystem, ActorRefFactory actorRefFactory, ShardActorCacheManager shardActorCacheManager, NodeActorCacheManager nodeActorCacheManager, Serializer<PersistentActor<ShardKey>, byte[]> stateSerializer, Deserializer<byte[], PersistentActor<ShardKey>> stateDeserializer, PersistentActorStoreFactory persistentActorStoreFactory) {
        super("KafkaActorThread-" + THREAD_ID_SEQUENCE.getAndIncrement());
        this.persistentActorStoreFactory = persistentActorStoreFactory;
        this.nodeTopicPartitionId = THREAD_ID_SEQUENCE.get() - 1;
        this.clusterName = clusterName;
        this.internalActorSystem = internalActorSystem;
        this.shardActorCacheManager = shardActorCacheManager;
        this.nodeActorCacheManager = nodeActorCacheManager;
        this.stateSerializer = stateSerializer;
        this.stateDeserializer = stateDeserializer;
        this.messagesTopic = TopicNamesHelper.getMessagesTopic(internalActorSystem);
        this.scheduledMessagesTopic = TopicNamesHelper.getScheduledMessagesTopic(internalActorSystem);
        this.actorSystemEventListenersTopic = TopicNamesHelper.getActorsystemEventListenersTopic(internalActorSystem);
        this.persistentActorsTopic = TopicNamesHelper.getPersistentActorsTopic(internalActorSystem);
        HashMap<String, Object> consumerConfig = new HashMap<String, Object>();
        consumerConfig.put("max.poll.records", "100");
        consumerConfig.put("auto.offset.reset", "earliest");
        consumerConfig.put("enable.auto.commit", "false");
        consumerConfig.put("internal.leave.group.on.close", false);
        consumerConfig.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        consumerConfig.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        consumerConfig.put("bootstrap.servers", bootstrapServers);
        consumerConfig.put("group.id", clusterName);
        consumerConfig.put("client.id", nodeId + "-" + this.getName() + "-consumer");
        InternalMessageDeserializer internalMessageDeserializer = new InternalMessageDeserializer(new ActorRefDeserializer(actorRefFactory), (SerializationAccessor)internalActorSystem);
        this.messageConsumer = new KafkaConsumer(consumerConfig, (org.apache.kafka.common.serialization.Deserializer)new UUIDDeserializer(), (org.apache.kafka.common.serialization.Deserializer)new KafkaInternalMessageDeserializer(internalMessageDeserializer));
        HashMap<String, Object> producerConfig = new HashMap<String, Object>();
        producerConfig.put("retries", Integer.MAX_VALUE);
        producerConfig.put("enable.idempotence", true);
        producerConfig.put("max.in.flight.requests.per.connection", 1);
        producerConfig.put("acks", "all");
        producerConfig.put("bootstrap.servers", bootstrapServers);
        producerConfig.put("client.id", nodeId + "-" + this.getName() + "-producer");
        producerConfig.put("transactional.id", nodeId + "-" + this.getName() + "-producer");
        KafkaProducerSerializer keySerializer = new KafkaProducerSerializer(new KafkaInternalMessageSerializer((Serializer<InternalMessage, byte[]>)InternalMessageSerializer.get()), new KafkaPersistentActorSerializer(stateSerializer));
        KafkaProducerSerializer valueSerializer = new KafkaProducerSerializer(new KafkaInternalMessageSerializer((Serializer<InternalMessage, byte[]>)InternalMessageSerializer.get()), new KafkaPersistentActorSerializer(stateSerializer));
        this.producer = new KafkaProducer(producerConfig, (org.apache.kafka.common.serialization.Serializer)keySerializer, (org.apache.kafka.common.serialization.Serializer)valueSerializer);
        this.producer.initTransactions();
        this.commands = new LinkedBlockingQueue<BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>>>();
        HashMap<String, Object> stateConsumerConfig = new HashMap<String, Object>();
        stateConsumerConfig.put("max.poll.records", "1000");
        stateConsumerConfig.put("auto.offset.reset", "earliest");
        stateConsumerConfig.put("enable.auto.commit", "false");
        stateConsumerConfig.put("internal.leave.group.on.close", false);
        stateConsumerConfig.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        stateConsumerConfig.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        stateConsumerConfig.put("bootstrap.servers", bootstrapServers);
        stateConsumerConfig.put("group.id", clusterName + "-state");
        stateConsumerConfig.put("client.id", nodeId + "-" + this.getName() + "-state-consumer");
        this.stateConsumer = new KafkaConsumer(stateConsumerConfig, (org.apache.kafka.common.serialization.Deserializer)new StringDeserializer(), (org.apache.kafka.common.serialization.Deserializer)new ByteArrayDeserializer());
        HashMap<String, Object> scheduledMessagesConsumerConfig = new HashMap<String, Object>();
        scheduledMessagesConsumerConfig.put("max.poll.records", "1000");
        scheduledMessagesConsumerConfig.put("auto.offset.reset", "earliest");
        scheduledMessagesConsumerConfig.put("enable.auto.commit", "false");
        scheduledMessagesConsumerConfig.put("internal.leave.group.on.close", false);
        scheduledMessagesConsumerConfig.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        scheduledMessagesConsumerConfig.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        scheduledMessagesConsumerConfig.put("bootstrap.servers", bootstrapServers);
        scheduledMessagesConsumerConfig.put("group.id", clusterName + "-scheduledMessages");
        scheduledMessagesConsumerConfig.put("client.id", nodeId + "-" + this.getName() + "-scheduledMessages-consumer");
        KafkaScheduledMessageDeserializer scheduledMessageDeserializer = new KafkaScheduledMessageDeserializer(new ScheduledMessageDeserializer(new ActorRefDeserializer(actorRefFactory)));
        this.scheduledMessagesConsumer = new KafkaConsumer(scheduledMessagesConsumerConfig, (org.apache.kafka.common.serialization.Deserializer)new UUIDDeserializer(), (org.apache.kafka.common.serialization.Deserializer)scheduledMessageDeserializer);
        HashMap<String, Object> actorSystemEventListenersConsumerConfig = new HashMap<String, Object>();
        actorSystemEventListenersConsumerConfig.put("max.poll.records", "1000");
        actorSystemEventListenersConsumerConfig.put("auto.offset.reset", "earliest");
        actorSystemEventListenersConsumerConfig.put("enable.auto.commit", "false");
        actorSystemEventListenersConsumerConfig.put("internal.leave.group.on.close", false);
        actorSystemEventListenersConsumerConfig.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        actorSystemEventListenersConsumerConfig.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        actorSystemEventListenersConsumerConfig.put("bootstrap.servers", bootstrapServers);
        actorSystemEventListenersConsumerConfig.put("group.id", clusterName + "-actorSystemEventListeners");
        actorSystemEventListenersConsumerConfig.put("client.id", nodeId + "-" + this.getName() + "-actorSystemEventListeners-consumer");
        this.actorSystemEventListenersConsumer = new KafkaConsumer(actorSystemEventListenersConsumerConfig, (org.apache.kafka.common.serialization.Deserializer)new StringDeserializer(), (org.apache.kafka.common.serialization.Deserializer)new KafkaActorSystemEventListenerDeserializer());
    }

    @Override
    public void run() {
        try {
            while (this.RUNNING) {
                BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>> command = this.pollOrWait();
                if (command != null) {
                    do {
                        command.accept(this.messageConsumer, this.producer);
                    } while ((command = this.pollOrWait()) != null);
                }
                if (!this.RUNNING || this.state != KafkaActorSystemState.ACTIVE) continue;
                this.processMessages();
                this.updateScheduledMessages();
                this.maybeFireScheduledMessages();
            }
        }
        catch (Exception e) {
            logger.error("FATAL: Exception in KafkaActorThread runLoop", (Throwable)e);
        }
        finally {
            this.producer.close();
            this.stateConsumer.close();
            this.actorSystemEventListenersConsumer.close();
            this.scheduledMessagesConsumer.close();
        }
    }

    private BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>> pollOrWait() {
        if (this.state == KafkaActorSystemState.ACTIVE) {
            return (BiConsumer)this.commands.poll();
        }
        try {
            return this.commands.poll(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            return null;
        }
    }

    private void processMessages() {
        try {
            ConsumerRecords consumerRecords = this.messageConsumer.poll(Duration.ofMillis(1L));
            if (!consumerRecords.isEmpty()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("messageConsumer has {} records to process", (Object)consumerRecords.count());
                }
                this.producer.beginTransaction();
                KafkaTransactionContext.setTransactionalProducer(this.producer);
                HashMap offset = new HashMap();
                consumerRecords.partitions().forEach(topicPartition -> consumerRecords.records(topicPartition).forEach(consumerRecord -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("handling InternalMessage(sender:{}, receiver:{}, type:{})  with offset {} from topicPartition({})", new Object[]{((InternalMessage)consumerRecord.value()).getSender(), ((InternalMessage)consumerRecord.value()).getReceivers().get(0), ((InternalMessage)consumerRecord.value()).getPayloadClass(), consumerRecord.offset(), topicPartition});
                    }
                    this.handleInternalMessage((TopicPartition)topicPartition, (InternalMessage)consumerRecord.value());
                    offset.put(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1L));
                }));
                this.producer.sendOffsetsToTransaction(offset, this.clusterName);
                this.producer.commitTransaction();
            }
        }
        catch (InterruptException | WakeupException e) {
            logger.warn("Recoverable exception while polling for Messages", e);
        }
        catch (KafkaException e) {
            logger.error("FATAL: Unrecoverable exception while polling for Messages", (Throwable)e);
            System.exit(1);
        }
        catch (Throwable t) {
            logger.error("Unexpected exception while polling for Messages", t);
            System.exit(1);
        }
    }

    private void updateScheduledMessages() {
        try {
            ConsumerRecords consumerRecords = this.scheduledMessagesConsumer.poll(Duration.ZERO);
            if (!consumerRecords.isEmpty()) {
                consumerRecords.partitions().forEach(topicPartition -> consumerRecords.records(topicPartition).forEach(consumerRecord -> {
                    ManagedActorShard managedActorShard = this.localShards.get(new ShardKey(this.internalActorSystem.getName(), topicPartition.partition()));
                    if (managedActorShard != null) {
                        if (consumerRecord.value() != null) {
                            managedActorShard.scheduledMessages.put((Object)((ScheduledMessage)consumerRecord.value()).getFireTime(TimeUnit.MILLISECONDS), consumerRecord.value());
                        } else {
                            managedActorShard.scheduledMessages.entries().removeIf(entry -> ((ScheduledMessage)entry.getValue()).getId().equals(consumerRecord.key()));
                        }
                    }
                }));
            }
        }
        catch (InterruptException | WakeupException e) {
            logger.warn("Recoverable exception while polling for ScheduledMessages", e);
        }
        catch (KafkaException e) {
            logger.error("FATAL: Unrecoverable exception while polling for ScheduledMessages", (Throwable)e);
        }
        catch (Throwable t) {
            logger.error("Unexpected exception while polling for ScheduledMessages", t);
        }
    }

    private void maybeFireScheduledMessages() {
        List<ScheduledMessage> messagesToFire = this.localShards.values().stream().map(managedActorShard -> managedActorShard.getScheduledMessagesThatShouldFire(System.currentTimeMillis())).flatMap(Collection::stream).collect(Collectors.toList());
        if (!messagesToFire.isEmpty()) {
            try {
                this.producer.beginTransaction();
                messagesToFire.forEach(scheduledMessage -> {
                    DefaultInternalMessage internalMessage = new DefaultInternalMessage(scheduledMessage.getSender(), scheduledMessage.getReceiver(), ByteBuffer.wrap(scheduledMessage.getMessageBytes()), scheduledMessage.getMessageClass().getName(), false);
                    ShardKey destinationKey = ((ActorShard)((ActorContainerRef)scheduledMessage.getReceiver()).getActorContainer()).getKey();
                    this.producer.send(new ProducerRecord(this.messagesTopic, Integer.valueOf(destinationKey.getShardId()), (Object)internalMessage.getId(), (Object)internalMessage));
                    ShardKey sourceKey = ((ActorShard)((ActorContainerRef)scheduledMessage.getSender()).getActorContainer()).getKey();
                    this.producer.send(new ProducerRecord(this.scheduledMessagesTopic, Integer.valueOf(sourceKey.getShardId()), (Object)scheduledMessage.getId(), null));
                });
                this.producer.commitTransaction();
                messagesToFire.forEach(scheduledMessage -> {
                    ShardKey sourceKey = ((ActorShard)((ActorContainerRef)scheduledMessage.getSender()).getActorContainer()).getKey();
                    this.localShards.get(sourceKey).scheduledMessages.remove((Object)scheduledMessage.getFireTime(TimeUnit.MILLISECONDS), scheduledMessage);
                });
            }
            catch (RetriableException e) {
                logger.warn("Recoverable exception while sending ScheduledMessages", (Throwable)e);
            }
            catch (ProducerFencedException e) {
                logger.error("FATAL: ProducerFenced while committing transaction, another Node seems to be handling the same shards", (Throwable)e);
            }
            catch (KafkaException e) {
                logger.error("FATAL: Unrecoverable exception while committing producer transaction", (Throwable)e);
            }
            catch (Throwable t) {
                logger.error("Unexpected exception while processing ScheduledMessages", t);
            }
        }
    }

    void send(ShardKey shard, InternalMessage internalMessage) {
        ProducerRecord producerRecord = new ProducerRecord(this.messagesTopic, Integer.valueOf(shard.getShardId()), (Object)internalMessage.getId(), (Object)internalMessage);
        this.doSend((ProducerRecord<Object, Object>)producerRecord, KafkaTransactionContext.getProducer());
    }

    private void doSend(ProducerRecord<Object, Object> producerRecord, KafkaProducer<Object, Object> transactionalProducer) {
        if (transactionalProducer == null) {
            this.runCommand((kafkaConsumer, kafkaProducer) -> {
                try {
                    kafkaProducer.beginTransaction();
                    kafkaProducer.send(producerRecord, this.loggingCallback);
                    kafkaProducer.commitTransaction();
                }
                catch (RetriableException e) {
                    logger.warn("Recoverable exception while sending ProducerRecord", (Throwable)e);
                }
                catch (ProducerFencedException e) {
                    logger.error("FATAL: ProducerFenced while committing transaction, another Node seems to be handling the same shards", (Throwable)e);
                }
                catch (KafkaException e) {
                    logger.error("FATAL: Unrecoverable exception while committing producer transaction", (Throwable)e);
                }
                catch (Throwable t) {
                    logger.error("Unexpected exception while sending ProducerRecord", t);
                }
            });
        } else {
            transactionalProducer.send(producerRecord, this.loggingCallback);
        }
    }

    void send(NodeKey node, int partition, InternalMessage internalMessage) {
        ProducerRecord producerRecord = new ProducerRecord(TopicNamesHelper.getNodeMessagesTopic(this.internalActorSystem, node.getNodeId()), Integer.valueOf(partition), (Object)internalMessage.getId(), (Object)internalMessage);
        this.doSend((ProducerRecord<Object, Object>)producerRecord, KafkaTransactionContext.getProducer());
    }

    void schedule(ShardKey shard, ScheduledMessage scheduledMessage) {
        ProducerRecord producerRecord = new ProducerRecord(this.scheduledMessagesTopic, Integer.valueOf(shard.getShardId()), (Object)scheduledMessage.getId(), (Object)scheduledMessage);
        this.doSend((ProducerRecord<Object, Object>)producerRecord, KafkaTransactionContext.getProducer());
    }

    void register(ShardKey shard, ActorSystemEvent event, ActorSystemEventListener listener) {
        ProducerRecord producerRecord = new ProducerRecord(this.actorSystemEventListenersTopic, Integer.valueOf(shard.getShardId()), (Object)String.format("%s:%s", event.name(), listener.getActorId()), (Object)listener);
        this.doSend((ProducerRecord<Object, Object>)producerRecord, KafkaTransactionContext.getProducer());
    }

    void deregister(ShardKey shard, ActorSystemEvent event, ActorRef listener) {
        ProducerRecord producerRecord = new ProducerRecord(this.actorSystemEventListenersTopic, Integer.valueOf(shard.getShardId()), (Object)String.format("%s:%s", event.name(), listener.getActorId()), null);
        this.doSend((ProducerRecord<Object, Object>)producerRecord, KafkaTransactionContext.getProducer());
    }

    void assign(KafkaActorNode node, boolean primary) {
        this.runCommand((kafkaConsumer, kafkaProducer) -> {
            this.localActorNode = new ManagedActorNode(node, primary);
        });
    }

    void assign(KafkaActorShard actorShard) {
        this.runCommand((kafkaConsumer, kafkaProducer) -> this.managedShards.put(actorShard.getKey(), actorShard));
    }

    void stopRunning() {
        this.runCommand((kafkaConsumer, kafkaProducer) -> {
            this.RUNNING = false;
        });
    }

    CompletionStage<Boolean> prepareRebalance(Multimap<PhysicalNode, ShardKey> shardDistribution, ShardDistributionStrategy distributionStrategy) {
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<Boolean>();
        this.runCommand((kafkaConsumer, kafkaProducer) -> {
            AtomicBoolean stable = new AtomicBoolean(true);
            this.state = KafkaActorSystemState.REBALANCING;
            shardDistribution.asMap().forEach((node, value) -> value.forEach(shardKey -> {
                KafkaActorShard actorShard = this.managedShards.get(shardKey);
                if (actorShard != null) {
                    if (node.isLocal()) {
                        if (actorShard.getOwningNode() == null || !actorShard.getOwningNode().equals(node)) {
                            String owningNodeId = actorShard.getOwningNode() != null ? actorShard.getOwningNode().getId() : "<No Node>";
                            logger.info("I will own {}", shardKey);
                            try {
                                distributionStrategy.registerWaitForRelease((ActorShard)actorShard, node);
                            }
                            catch (Exception e) {
                                logger.error("IMPORTANT: waiting on release of shard {} from node {} failed,  ElasticActors cluster is unstable. Please check all nodes", new Object[]{shardKey, owningNodeId, e});
                                stable.set(false);
                            }
                            finally {
                                actorShard.setOwningNode((PhysicalNode)node);
                                this.newLocalShards.add((ShardKey)shardKey);
                            }
                        } else {
                            logger.info("I already own {}", shardKey);
                        }
                    } else if (actorShard.getOwningNode() == null || actorShard.getOwningNode().isLocal()) {
                        logger.info("{} will own {}", node, shardKey);
                        try {
                            if (actorShard.getOwningNode() != null) {
                                actorShard.setOwningNode((PhysicalNode)node);
                                this.localShards.remove(shardKey).destroy();
                                distributionStrategy.signalRelease((ActorShard)actorShard, node);
                            }
                        }
                        catch (Exception e) {
                            logger.error("IMPORTANT: signalling release of shard {} to node {} failed, ElasticActors cluster is unstable. Please check all nodes", new Object[]{shardKey, node, e});
                            stable.set(false);
                        }
                    } else {
                        logger.info("{} will own {}", node, shardKey);
                    }
                }
            }));
            completableFuture.complete(stable.get());
        });
        return completableFuture;
    }

    CompletionStage<Set<Integer>> performRebalance() {
        CompletableFuture<Set<Integer>> completableFuture = new CompletableFuture<Set<Integer>>();
        this.runCommand((kafkaConsumer, kafkaProducer) -> {
            LinkedList<ManagedActorShard> newManagedShards = new LinkedList<ManagedActorShard>();
            try {
                this.newLocalShards.forEach(shardKey -> {
                    PersistentActorStore actorStore = this.createStateStore((ShardKey)shardKey);
                    ManagedActorShard managedActorShard = new ManagedActorShard(this.managedShards.get(shardKey), actorStore);
                    this.localShards.put((ShardKey)shardKey, managedActorShard);
                    newManagedShards.add(managedActorShard);
                });
                this.newLocalShards.clear();
                this.assignPartitions();
                if (!newManagedShards.isEmpty()) {
                    this.initializeStateStores(newManagedShards);
                    this.initializeScheduledMessages(newManagedShards);
                    this.initializeAndRunActorSystemEventListeners(newManagedShards);
                }
                this.state = KafkaActorSystemState.ACTIVE;
                completableFuture.complete(this.newLocalShards.stream().map(ShardKey::getShardId).collect(Collectors.toSet()));
            }
            catch (Exception e) {
                logger.error("FATAL Exception on performRebalance", (Throwable)e);
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private void assignPartitions() {
        List messagePartitions = this.localShards.keySet().stream().map(managedActorShard -> new TopicPartition(this.messagesTopic, managedActorShard.getShardId())).collect(Collectors.toList());
        if (this.localActorNode != null) {
            messagePartitions.add(new TopicPartition(TopicNamesHelper.getNodeMessagesTopic(this.internalActorSystem, this.localActorNode.actorNode.getKey().getNodeId()), this.nodeTopicPartitionId.intValue()));
        }
        this.messageConsumer.assign(messagePartitions);
        List statePartitions = this.localShards.keySet().stream().map(managedActorShard -> new TopicPartition(this.persistentActorsTopic, managedActorShard.getShardId())).collect(Collectors.toList());
        this.stateConsumer.assign(statePartitions);
        List scheduledMessagesPartitions = this.localShards.keySet().stream().map(managedActorShard -> new TopicPartition(this.scheduledMessagesTopic, managedActorShard.getShardId())).collect(Collectors.toList());
        this.scheduledMessagesConsumer.assign(scheduledMessagesPartitions);
        List actorSystemEventListenersPartitions = this.localShards.keySet().stream().map(managedActorShard -> new TopicPartition(this.actorSystemEventListenersTopic, managedActorShard.getShardId())).collect(Collectors.toList());
        this.actorSystemEventListenersConsumer.assign(actorSystemEventListenersPartitions);
    }

    private PersistentActorStore createStateStore(ShardKey shardKey) {
        return this.persistentActorStoreFactory.create(shardKey, this.stateDeserializer);
    }

    private void initializeStateStores(List<ManagedActorShard> managedActorShards) {
        if (managedActorShards.isEmpty()) {
            return;
        }
        List topicPartitions = managedActorShards.stream().map(managedActorShard -> new TopicPartition(this.persistentActorsTopic, managedActorShard.getKey().getShardId())).collect(Collectors.toList());
        this.stateConsumer.seekToEnd((Collection)this.stateConsumer.assignment());
        this.stateConsumer.seekToBeginning(topicPartitions);
        Map<Integer, ManagedActorShard> partitionsToShards = managedActorShards.stream().collect(Collectors.toMap(managedActorShard -> managedActorShard.getKey().getShardId(), managedActorShard -> managedActorShard));
        Map<Integer, Long> endOffsets = this.stateConsumer.endOffsets(topicPartitions).entrySet().stream().filter(e -> (Long)e.getValue() > 0L).collect(Collectors.toMap(e -> ((TopicPartition)e.getKey()).partition(), Map.Entry::getValue));
        partitionsToShards.forEach((key, value) -> {
            if (((ManagedActorShard)value).actorStore.getOffset() >= 0L) {
                this.stateConsumer.seek(new TopicPartition(this.persistentActorsTopic, value.getKey().getShardId()), ((ManagedActorShard)value).actorStore.getOffset() + 1L);
                Long endOffset = (Long)endOffsets.get(value.getKey().getShardId());
                if (endOffset != null && endOffset - 2L == ((ManagedActorShard)value).actorStore.getOffset()) {
                    endOffsets.remove(value.getKey().getShardId());
                }
            }
        });
        ConsumerRecords stateRecords = null;
        int totalCount = 0;
        do {
            try {
                stateRecords = this.stateConsumer.poll(10L);
                totalCount += stateRecords.count();
                stateRecords.iterator().forEachRemaining(consumerRecord -> {
                    Long endOffset;
                    if (consumerRecord.value() != null) {
                        ((ManagedActorShard)partitionsToShards.get(consumerRecord.partition())).actorStore.put((String)consumerRecord.key(), (byte[])consumerRecord.value(), consumerRecord.offset());
                    }
                    if ((endOffset = (Long)endOffsets.get(consumerRecord.partition())) != null && endOffset - 2L == consumerRecord.offset()) {
                        endOffsets.remove(consumerRecord.partition());
                    }
                });
            }
            catch (InterruptException | WakeupException e2) {
                logger.warn("Recoverable exception while polling PersistentActors state", e2);
            }
            catch (KafkaException e3) {
                logger.error("FATAL: Unrecoverable exception while polling PersistentActors state", (Throwable)e3);
            }
            catch (Throwable t) {
                logger.error("Unexpected exception while populating PersistentActorStores", t);
            }
        } while (stateRecords != null && !stateRecords.isEmpty() || !endOffsets.isEmpty());
        int uniques = managedActorShards.stream().mapToInt(value -> ((ManagedActorShard)value).actorStore.count()).sum();
        logger.info("Loaded {} unique persistent actors from {} entries", (Object)uniques, (Object)totalCount);
    }

    private void initializeScheduledMessages(List<ManagedActorShard> managedActorShards) {
        if (managedActorShards.isEmpty()) {
            return;
        }
        List topicPartitions = managedActorShards.stream().map(managedActorShard -> new TopicPartition(this.scheduledMessagesTopic, managedActorShard.getKey().getShardId())).collect(Collectors.toList());
        this.scheduledMessagesConsumer.seekToBeginning(topicPartitions);
        Map<Integer, ManagedActorShard> partitionsToShards = managedActorShards.stream().collect(Collectors.toMap(managedActorShard -> managedActorShard.getKey().getShardId(), managedActorShard -> managedActorShard));
        Map<Integer, Long> endOffsets = this.scheduledMessagesConsumer.endOffsets(topicPartitions).entrySet().stream().filter(e -> (Long)e.getValue() > 0L).collect(Collectors.toMap(e -> ((TopicPartition)e.getKey()).partition(), Map.Entry::getValue));
        ConsumerRecords scheduleMessageRecords = null;
        do {
            try {
                scheduleMessageRecords = this.scheduledMessagesConsumer.poll(10L);
                scheduleMessageRecords.iterator().forEachRemaining(consumerRecord -> {
                    if (consumerRecord.value() != null) {
                        ((ManagedActorShard)partitionsToShards.get(consumerRecord.partition())).scheduledMessages.put((Object)((ScheduledMessage)consumerRecord.value()).getFireTime(TimeUnit.MILLISECONDS), consumerRecord.value());
                    } else {
                        ((ManagedActorShard)partitionsToShards.get(consumerRecord.partition())).scheduledMessages.entries().removeIf(entry -> ((ScheduledMessage)entry.getValue()).getId().equals(consumerRecord.key()));
                    }
                    Long endOffset = (Long)endOffsets.get(consumerRecord.partition());
                    if (endOffset != null && endOffset - 2L == consumerRecord.offset()) {
                        endOffsets.remove(consumerRecord.partition());
                    }
                });
            }
            catch (InterruptException | WakeupException e2) {
                logger.warn("Recoverable exception while polling ScheduledMessages state", e2);
            }
            catch (KafkaException e3) {
                logger.error("FATAL: Unrecoverable exception while polling ScheduledMessages state", (Throwable)e3);
            }
            catch (Throwable t) {
                logger.error("Unexpected exception while populating ScheduledMessage", t);
            }
        } while (scheduleMessageRecords != null && !scheduleMessageRecords.isEmpty() || !endOffsets.isEmpty());
        try {
            this.scheduledMessagesConsumer.commitSync();
        }
        catch (InterruptException | WakeupException e4) {
            logger.warn("Recoverable exception calling commitSync on scheduledMessagesConsumer", e4);
        }
        catch (KafkaException e5) {
            logger.error("FATAL: Unrecoverable exception calling commitSync on scheduledMessagesConsumer", (Throwable)e5);
        }
    }

    private void initializeAndRunActorSystemEventListeners(List<ManagedActorShard> managedActorShards) {
        if (managedActorShards.isEmpty()) {
            return;
        }
        List topicPartitions = managedActorShards.stream().map(managedActorShard -> new TopicPartition(this.actorSystemEventListenersTopic, managedActorShard.getKey().getShardId())).collect(Collectors.toList());
        this.actorSystemEventListenersConsumer.seekToBeginning(topicPartitions);
        Map<Integer, ManagedActorShard> partitionsToShards = managedActorShards.stream().collect(Collectors.toMap(managedActorShard -> managedActorShard.getKey().getShardId(), managedActorShard -> managedActorShard));
        Map<Integer, Long> endOffsets = this.actorSystemEventListenersConsumer.endOffsets(topicPartitions).entrySet().stream().filter(e -> (Long)e.getValue() > 0L).collect(Collectors.toMap(e -> ((TopicPartition)e.getKey()).partition(), Map.Entry::getValue));
        ConsumerRecords consumerRecords = null;
        do {
            try {
                consumerRecords = this.actorSystemEventListenersConsumer.poll(10L);
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    Long endOffset;
                    ActorSystemEventListener eventListener = (ActorSystemEventListener)consumerRecord.value();
                    if (eventListener != null) {
                        ManagedActorShard managedActorShard = (ManagedActorShard)partitionsToShards.get(consumerRecord.partition());
                        ActorRef receiverRef = this.internalActorSystem.actorFor(eventListener.getActorId());
                        PersistentActor<ShardKey> persistentActor = managedActorShard.getPersistentActor(receiverRef);
                        DefaultInternalMessage internalMessage = new DefaultInternalMessage(null, receiverRef, ByteBuffer.wrap(eventListener.getMessageBytes()), eventListener.getMessageClass().getName(), false);
                        this.producer.beginTransaction();
                        try {
                            KafkaTransactionContext.setTransactionalProducer(this.producer);
                            this.doInActorContext(ApplicationProtocol::handleMessage, managedActorShard, persistentActor, (InternalMessage)internalMessage);
                            this.producer.commitTransaction();
                        }
                        catch (ProducerFencedException e) {
                            logger.error("FATAL: ProducerFenced while committing transaction, another Node seems to be handling the same shards", (Throwable)e);
                        }
                        catch (KafkaException e) {
                            logger.error("FATAL: Unrecoverable exception while comitting producer transaction", (Throwable)e);
                        }
                        catch (Throwable t) {
                            logger.error("Unexpected exception while generating ActorSystemEventListener message", t);
                        }
                        finally {
                            KafkaTransactionContext.clear();
                        }
                    }
                    if ((endOffset = (Long)endOffsets.get(consumerRecord.partition())) != null && endOffset - 2L == consumerRecord.offset()) {
                        endOffsets.remove(consumerRecord.partition());
                    }
                });
            }
            catch (InterruptException | WakeupException e2) {
                logger.warn("Recoverable exception while polling ActorSystemEventListeners state", e2);
            }
            catch (KafkaException e3) {
                logger.error("FATAL: Unrecoverable exception while polling ActorSystemEventListeners state", (Throwable)e3);
            }
        } while (consumerRecords != null && !consumerRecords.isEmpty() || !endOffsets.isEmpty());
    }

    Integer getNodeTopicPartitionId() {
        return this.nodeTopicPartitionId;
    }

    void createTempActor(ActorRef ref, CreateActorMessage createMessage) {
        this.runCommand((kafkaConsumer, kafkaProducer) -> {
            try {
                this.createActor(this.localActorNode, createMessage, ref, null);
            }
            catch (Exception e) {
                logger.error("Exception while creating TempActor", (Throwable)e);
            }
        });
    }

    CompletableFuture<Void> initializeServiceActors() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.runCommand((kafkaConsumer, kafkaProducer) -> {
            Set serviceActors = this.internalActorSystem.getConfiguration().getServices();
            if (serviceActors != null && !serviceActors.isEmpty()) {
                serviceActors.forEach(s -> {
                    ActorRef serviceRef = this.internalActorSystem.serviceActorFor(s);
                    ElasticActor serviceActor = this.internalActorSystem.getConfiguration().getService(s);
                    if (!this.localActorNode.initializedActors.contains(serviceRef)) {
                        InternalActorContext.setContext(new ServiceActorContext(serviceRef, (ActorSystem)this.internalActorSystem));
                        try {
                            serviceActor.postActivate(null);
                        }
                        catch (Exception e) {
                            logger.error("Exception while handling message for service [{}]", (Object)serviceRef, (Object)e);
                        }
                        finally {
                            InternalActorContext.clearContext();
                            this.localActorNode.initializedActors.add(serviceRef);
                        }
                    }
                });
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    private void runCommand(BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>> command) {
        this.commands.offer(command);
    }

    private void handleInternalMessage(TopicPartition topicPartition, InternalMessage im) {
        if (topicPartition.topic().equals(this.messagesTopic)) {
            this.handleInternalMessage(this.localShards.get(new ShardKey(this.internalActorSystem.getName(), topicPartition.partition())), im);
        } else {
            this.handleInternalMessage(this.localActorNode, im);
        }
    }

    private void handleInternalMessage(ManagedActorContainer managedActorContainer, InternalMessage im) {
        boolean needsCopy = im.getReceivers().size() > 1;
        im.getReceivers().forEach(actorRef -> {
            InternalMessage internalMessage;
            InternalMessage internalMessage2 = internalMessage = needsCopy ? im.copyOf() : im;
            if (actorRef.getActorId() != null) {
                this.handleActorMessage(managedActorContainer, (ActorRef)actorRef, internalMessage);
            } else {
                this.handleContainerMessage(managedActorContainer, internalMessage);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleActorMessage(ManagedActorContainer managedActorContainer, ActorRef receiverRef, InternalMessage internalMessage) {
        PersistentActor actor = managedActorContainer.getPersistentActor(receiverRef);
        if (TOMBSTONE == actor) {
            this.sendUndeliverable(internalMessage, receiverRef);
        } else if (actor == null) {
            ElasticActor serviceInstance = this.internalActorSystem.getServiceInstance(receiverRef);
            if (serviceInstance != null) {
                boolean jitActivationNeeded = ((ManagedActorNode)managedActorContainer).initializedActors.contains(receiverRef);
                InternalActorContext.setContext(new ServiceActorContext(receiverRef, (ActorSystem)this.internalActorSystem));
                try {
                    if (jitActivationNeeded) {
                        ((ManagedActorNode)managedActorContainer).initializedActors.add(receiverRef);
                        serviceInstance.postActivate(null);
                    }
                    Object message = SerializationTools.deserializeMessage((SerializationAccessor)this.internalActorSystem, (InternalMessage)internalMessage);
                    if (internalMessage.isUndeliverable()) {
                        serviceInstance.onUndeliverable(internalMessage.getSender(), message);
                    }
                    serviceInstance.onReceive(internalMessage.getSender(), message);
                }
                catch (Exception e) {
                    logger.error("Exception while handling message for service [{}]", (Object)receiverRef, (Object)e);
                }
                finally {
                    InternalActorContext.clearContext();
                }
            } else {
                this.sendUndeliverable(internalMessage, receiverRef);
            }
        } else if (internalMessage.isUndeliverable()) {
            if (internalMessage.getPayloadClass().startsWith("org.elasticsoftware.elasticactors.messaging.reactivestreams")) {
                this.doInActorContext(ReactiveStreamsProtocol::handleUndeliverableMessage, managedActorContainer, actor, internalMessage);
            } else {
                this.doInActorContext(ApplicationProtocol::handleUndeliverableMessage, managedActorContainer, actor, internalMessage);
            }
        } else if (internalMessage.getPayloadClass().startsWith("org.elasticsoftware.elasticactors.messaging.reactivestreams")) {
            this.doInActorContext(ReactiveStreamsProtocol::handleMessage, managedActorContainer, actor, internalMessage);
        } else {
            this.doInActorContext(ApplicationProtocol::handleMessage, managedActorContainer, actor, internalMessage);
        }
    }

    private void handleContainerMessage(ManagedActorContainer managedActorContainer, InternalMessage internalMessage) {
        try {
            Object message = SerializationTools.deserializeMessage((SerializationAccessor)this.internalActorSystem, (InternalMessage)internalMessage);
            if (message instanceof CreateActorMessage) {
                CreateActorMessage createActorMessage = (CreateActorMessage)message;
                if (!managedActorContainer.containsKey(createActorMessage.getActorId())) {
                    ActorRef ref = this.internalActorSystem.actorFor(createActorMessage.getActorId());
                    this.createActor(managedActorContainer, createActorMessage, ref, internalMessage);
                } else {
                    managedActorContainer.getPersistentActor(this.internalActorSystem.actorFor(createActorMessage.getActorId()));
                }
            } else if (message instanceof DestroyActorMessage) {
                DestroyActorMessage destroyActorMessage = (DestroyActorMessage)message;
                this.destroyActor(managedActorContainer, destroyActorMessage, internalMessage);
            } else if (message instanceof CancelScheduledMessageMessage) {
                CancelScheduledMessageMessage cancelMessage = (CancelScheduledMessageMessage)message;
                this.cancelScheduledMessage((ManagedActorShard)managedActorContainer, cancelMessage);
            } else if (message instanceof ActorNodeMessage) {
                if (!internalMessage.isUndeliverable()) {
                    ActorNodeMessage actorNodeMessage = (ActorNodeMessage)message;
                    ActorNode actorNode = this.internalActorSystem.getNode(actorNodeMessage.getNodeId());
                    if (actorNode != null) {
                        if (!actorNodeMessage.isUndeliverable()) {
                            actorNode.sendMessage(internalMessage.getSender(), actorNodeMessage.getReceiverRef(), actorNodeMessage.getMessage());
                        } else {
                            InternalMessage originalMessage = this.createInternalMessage(actorNodeMessage.getReceiverRef(), (List<? extends ActorRef>)ImmutableList.of((Object)internalMessage.getSender()), actorNodeMessage.getMessage());
                            actorNode.undeliverableMessage(originalMessage, internalMessage.getSender());
                        }
                    } else {
                        logger.error("ActorNode with id [{}] is not reachable, discarding message of type [{}] from [{}] for [{}]", new Object[]{actorNodeMessage.getNodeId(), actorNodeMessage.getMessage().getClass().getName(), internalMessage.getSender(), actorNodeMessage.getReceiverRef()});
                    }
                } else {
                    logger.error("undeliverable ActorNodeMessages are currently not supported");
                }
            } else if (message instanceof PersistActorMessage) {
                PersistActorMessage persistMessage = (PersistActorMessage)message;
                this.persistActor(managedActorContainer, persistMessage.getActorRef());
            }
        }
        catch (Exception e) {
            logger.error("Exception while handling InternalMessage for Shard [{}]; senderRef [{}], messageType [{}]", new Object[]{managedActorContainer.getKey(), internalMessage.getSender(), internalMessage.getPayloadClass(), e});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doInActorContext(ActorLifecycleFunction handler, ManagedActorContainer managedActorContainer, PersistentActor persistentActor, InternalMessage internalMessage) {
        InternalActorContext.setContext((ActorContext)persistentActor);
        SerializationContext.initialize();
        boolean shouldUpdateState = false;
        ElasticActor receiver = this.internalActorSystem.getActorInstance(persistentActor.getSelf(), persistentActor.getActorClass());
        try {
            shouldUpdateState = handler.apply(this.internalActorSystem, persistentActor, receiver, persistentActor.getSelf(), internalMessage);
        }
        catch (Exception e) {
            logger.error("Exception in doInActorContext", (Throwable)e);
        }
        finally {
            SerializationContext.reset();
            InternalActorContext.clearContext();
        }
        if (shouldUpdateState) {
            managedActorContainer.persistActor(persistentActor);
        }
    }

    private void sendUndeliverable(InternalMessage internalMessage, ActorRef receiverRef) {
        ActorRef senderRef = internalMessage.getSender();
        try {
            if (senderRef instanceof ActorContainerRef && !internalMessage.isUndeliverable()) {
                ((ActorContainerRef)senderRef).getActorContainer().undeliverableMessage(internalMessage, receiverRef);
            } else if (internalMessage.isUndeliverable()) {
                logger.error("Receiver for undeliverable message not found: message type '{}' , receiver '{}'", (Object)internalMessage.getPayloadClass(), (Object)receiverRef);
            } else {
                logger.warn("Could not send message undeliverable: original message type '{}' , receiver '{}'", (Object)internalMessage.getPayloadClass(), (Object)receiverRef);
            }
        }
        catch (Exception e) {
            logger.error("Exception while sending undeliverable message", (Throwable)e);
        }
    }

    private void createActor(ManagedActorContainer managedActorContainer, CreateActorMessage createMessage, ActorRef ref, InternalMessage internalMessage) throws ClassNotFoundException {
        Class actorClass = ClassLoadingHelper.getClassHelper().forName(createMessage.getActorClass());
        String actorStateVersion = ManifestTools.extractActorStateVersion((Class)actorClass);
        PersistentActor persistentActor = new PersistentActor(managedActorContainer.getKey(), this.internalActorSystem, actorStateVersion, ref, createMessage.getAffinityKey(), actorClass, createMessage.getInitialState());
        managedActorContainer.getActorCache().put((Object)ref, (Object)persistentActor);
        managedActorContainer.persistActor(persistentActor);
        this.doInActorContext(ApplicationProtocol::createActor, managedActorContainer, persistentActor, internalMessage);
    }

    private void destroyActor(ManagedActorContainer managedActorShard, DestroyActorMessage destroyMessage, InternalMessage internalMessage) {
        PersistentActor persistentActor = managedActorShard.getPersistentActor(destroyMessage.getActorRef());
        if (persistentActor != null) {
            managedActorShard.getActorCache().put((Object)persistentActor.getSelf(), TOMBSTONE);
            managedActorShard.deleteActor(persistentActor);
            this.doInActorContext(ApplicationProtocol::destroyActor, managedActorShard, persistentActor, internalMessage);
        }
    }

    private void persistActor(ManagedActorContainer managedActorShard, ActorRef actorRef) {
        PersistentActor persistentActor = managedActorShard.getPersistentActor(actorRef);
        if (persistentActor != null && persistentActor != TOMBSTONE) {
            managedActorShard.persistActor(persistentActor);
        }
    }

    private void cancelScheduledMessage(ManagedActorShard managedActorShard, CancelScheduledMessageMessage cancelMessage) {
        ProducerRecord producerRecord = new ProducerRecord(this.scheduledMessagesTopic, Integer.valueOf(managedActorShard.getKey().getShardId()), (Object)cancelMessage.getMessageId(), null);
        this.doSend((ProducerRecord<Object, Object>)producerRecord, KafkaTransactionContext.getProducer());
    }

    private InternalMessage createInternalMessage(ActorRef from, List<? extends ActorRef> to, Object message) throws IOException {
        MessageSerializer messageSerializer = this.internalActorSystem.getSerializer(message.getClass());
        if (messageSerializer == null) {
            logger.error("No message serializer found for class: {}. NOT sending message", (Object)message.getClass().getSimpleName());
            return null;
        }
        Message messageAnnotation = message.getClass().getAnnotation(Message.class);
        boolean durable = messageAnnotation != null && messageAnnotation.durable();
        int timeout = messageAnnotation != null ? messageAnnotation.timeout() : -1;
        return new DefaultInternalMessage(from, ImmutableList.copyOf(to), SerializationContext.serialize((MessageSerializer)messageSerializer, (Object)message), message.getClass().getName(), durable, timeout);
    }

    private final class ManagedActorNode
    implements EvictionListener<PersistentActor<NodeKey>>,
    ManagedActorContainer<NodeKey> {
        private final KafkaActorNode actorNode;
        private final boolean primary;
        private final Cache<ActorRef, PersistentActor<NodeKey>> actorCache;
        private final Set<ActorRef> initializedActors = new HashSet<ActorRef>();

        private ManagedActorNode(KafkaActorNode actorNode, boolean primary) {
            this.actorNode = actorNode;
            this.actorCache = KafkaActorThread.this.nodeActorCacheManager.create((Object)actorNode.getKey(), (EvictionListener)this);
            this.primary = primary;
        }

        public boolean isPrimary() {
            return this.primary;
        }

        public void onEvicted(PersistentActor<NodeKey> value) {
            logger.error("CRITICAL WARNING: Actor [{}] of type [{}] got evicted from the cache. This can lead to issues using temporary actors. Please increase the maximum size of the node actor cache by using the 'ea.nodeCache.maximumSize' property.", (Object)value.getSelf(), (Object)value.getActorClass().getName());
        }

        @Override
        public PersistentActor<NodeKey> getPersistentActor(ActorRef actorRef) {
            return (PersistentActor)this.actorCache.getIfPresent((Object)actorRef);
        }

        @Override
        public void persistActor(PersistentActor<NodeKey> persistentActor) {
        }

        @Override
        public void deleteActor(PersistentActor<NodeKey> persistentActor) {
            this.actorCache.invalidate((Object)persistentActor.getSelf());
        }

        @Override
        public boolean containsKey(String actorId) {
            return this.actorCache.getIfPresent((Object)actorId) != null;
        }

        @Override
        public NodeKey getKey() {
            return this.actorNode.getKey();
        }

        @Override
        public Cache<ActorRef, PersistentActor<NodeKey>> getActorCache() {
            return this.actorCache;
        }
    }

    private final class ManagedActorShard
    implements EvictionListener<PersistentActor<ShardKey>>,
    ManagedActorContainer<ShardKey> {
        private final KafkaActorShard actorShard;
        private final Cache<ActorRef, PersistentActor<ShardKey>> actorCache;
        private final PersistentActorStore actorStore;
        private final TreeMultimap<Long, ScheduledMessage> scheduledMessages;

        public ManagedActorShard(KafkaActorShard actorShard, PersistentActorStore actorStore) {
            this.actorShard = actorShard;
            this.actorCache = KafkaActorThread.this.shardActorCacheManager.create((Object)actorShard.getKey(), (EvictionListener)this);
            this.actorStore = actorStore;
            this.scheduledMessages = TreeMultimap.create(Comparator.naturalOrder(), Comparator.naturalOrder());
        }

        @Override
        public ShardKey getKey() {
            return this.actorShard.getKey();
        }

        public void onEvicted(PersistentActor<ShardKey> value) {
            if (TOMBSTONE != value) {
                KafkaActorThread.this.doInActorContext(ApplicationProtocol::passivateActor, this, value, null);
            }
        }

        public void destroy() {
            KafkaActorThread.this.shardActorCacheManager.destroy(this.actorCache);
            this.actorStore.destroy();
        }

        @Override
        public PersistentActor<ShardKey> getPersistentActor(ActorRef actorRef) {
            PersistentActor<ShardKey> persistentActor = (PersistentActor<ShardKey>)this.actorCache.getIfPresent((Object)actorRef);
            if (persistentActor == null) {
                persistentActor = this.actorStore.getPersistentActor(actorRef.getActorId());
                if (persistentActor != null) {
                    KafkaActorThread.this.doInActorContext(ApplicationProtocol::activateActor, this, persistentActor, null);
                    this.actorCache.put((Object)actorRef, persistentActor);
                    return persistentActor;
                }
                return null;
            }
            return persistentActor;
        }

        public boolean actorExists(ActorRef actorRef) {
            return this.actorStore.containsKey(actorRef.getActorId());
        }

        public List<ScheduledMessage> getScheduledMessagesThatShouldFire(long now) {
            return this.scheduledMessages.values().stream().filter(scheduledMessage -> scheduledMessage.getFireTime(TimeUnit.MILLISECONDS) < now).collect(Collectors.toList());
        }

        @Override
        public void persistActor(PersistentActor<ShardKey> persistentActor) {
            try {
                byte[] serializedActor = (byte[])KafkaActorThread.this.stateSerializer.serialize(persistentActor);
                if (logger.isDebugEnabled()) {
                    logger.debug("Serializing PersistentActor: keySize={}, valueSize={}", (Object)persistentActor.getSelf().getActorId().getBytes(StandardCharsets.UTF_8).length, (Object)serializedActor.length);
                }
                ProducerRecord producerRecord = new ProducerRecord(KafkaActorThread.this.persistentActorsTopic, Integer.valueOf(this.getKey().getShardId()), (Object)persistentActor.getSelf().getActorId(), (Object)serializedActor);
                KafkaActorThread.this.producer.send(producerRecord, (metadata, exception) -> {
                    if (metadata != null) {
                        if (metadata.hasOffset()) {
                            if (this.actorStore.isConcurrent()) {
                                this.actorStore.put(persistentActor.getSelf().getActorId(), serializedActor, metadata.offset());
                            } else {
                                KafkaActorThread.this.runCommand((kafkaConsumer, kafkaProducer) -> this.actorStore.put(persistentActor.getSelf().getActorId(), serializedActor, metadata.offset()));
                            }
                        } else {
                            this.actorStore.put(persistentActor.getSelf().getActorId(), serializedActor);
                        }
                    } else {
                        logger.error("Exception while sending message to KafkaProducer", (Throwable)exception);
                    }
                });
            }
            catch (IOException e) {
                throw new SerializationException(String.format("Exception while serializing state for actor %s", persistentActor.getSelf().getActorId()), (Throwable)e);
            }
        }

        @Override
        public void deleteActor(PersistentActor<ShardKey> persistentActor) {
            this.actorStore.remove(persistentActor.getSelf().getActorId());
            ProducerRecord producerRecord = new ProducerRecord(KafkaActorThread.this.persistentActorsTopic, Integer.valueOf(this.getKey().getShardId()), (Object)persistentActor.getSelf().getActorId(), null);
            KafkaActorThread.this.doSend((ProducerRecord<Object, Object>)producerRecord, (KafkaProducer<Object, Object>)KafkaTransactionContext.getProducer());
        }

        @Override
        public boolean containsKey(String actorId) {
            return this.actorStore.containsKey(actorId);
        }

        @Override
        public Cache<ActorRef, PersistentActor<ShardKey>> getActorCache() {
            return this.actorCache;
        }
    }

    private static enum KafkaActorSystemState {
        INITIALIZING,
        ACTIVE,
        REBALANCING;

    }
}

