package org.elasticsoftware.elasticactors.kafka;

import com.google.common.cache.Cache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.elasticsoftware.elasticactors.ActorContainerRef;
import org.elasticsoftware.elasticactors.ActorNode;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorState;
import org.elasticsoftware.elasticactors.ElasticActor;
import org.elasticsoftware.elasticactors.NodeKey;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.ShardKey;
import org.elasticsoftware.elasticactors.cache.EvictionListener;
import org.elasticsoftware.elasticactors.cache.NodeActorCacheManager;
import org.elasticsoftware.elasticactors.cache.ShardActorCacheManager;
import org.elasticsoftware.elasticactors.cluster.ActorRefFactory;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEvent;
import org.elasticsoftware.elasticactors.cluster.ActorSystemEventListener;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.ShardDistributionStrategy;
import org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage;
import org.elasticsoftware.elasticactors.kafka.cluster.ActorLifecycleFunction;
import org.elasticsoftware.elasticactors.kafka.cluster.ApplicationProtocol;
import org.elasticsoftware.elasticactors.kafka.cluster.ReactiveStreamsProtocol;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaActorSystemEventListenerDeserializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaInternalMessageDeserializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaInternalMessageSerializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaPersistentActorSerializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaProducerSerializer;
import org.elasticsoftware.elasticactors.kafka.serialization.KafkaScheduledMessageDeserializer;
import org.elasticsoftware.elasticactors.kafka.serialization.UUIDDeserializer;
import org.elasticsoftware.elasticactors.kafka.state.PersistentActorStore;
import org.elasticsoftware.elasticactors.kafka.state.PersistentActorStoreFactory;
import org.elasticsoftware.elasticactors.kafka.utils.TopicNamesHelper;
import org.elasticsoftware.elasticactors.messaging.DefaultInternalMessage;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.InternalMessageFactory;
import org.elasticsoftware.elasticactors.messaging.internal.ActorNodeMessage;
import org.elasticsoftware.elasticactors.messaging.internal.CancelScheduledMessageMessage;
import org.elasticsoftware.elasticactors.messaging.internal.CreateActorMessage;
import org.elasticsoftware.elasticactors.messaging.internal.DestroyActorMessage;
import org.elasticsoftware.elasticactors.messaging.internal.PersistActorMessage;
import org.elasticsoftware.elasticactors.serialization.Deserializer;
import org.elasticsoftware.elasticactors.serialization.SerializationContext;
import org.elasticsoftware.elasticactors.serialization.Serializer;
import org.elasticsoftware.elasticactors.serialization.internal.ActorRefDeserializer;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageDeserializer;
import org.elasticsoftware.elasticactors.serialization.internal.InternalMessageSerializer;
import org.elasticsoftware.elasticactors.serialization.internal.ScheduledMessageDeserializer;
import org.elasticsoftware.elasticactors.state.PersistentActor;
import org.elasticsoftware.elasticactors.tracing.Traceable;
import org.elasticsoftware.elasticactors.util.ClassLoadingHelper;
import org.elasticsoftware.elasticactors.util.ManifestTools;
import org.elasticsoftware.elasticactors.util.SerializationTools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/elasticactors/kafka/KafkaActorThread.class */
public final class KafkaActorThread extends Thread {
    private static final long DEFAULT_OFFSET_INCREASE = 2;
    private final KafkaConsumer<UUID, InternalMessage> messageConsumer;
    private final KafkaProducer<Object, Object> producer;
    private final KafkaConsumer<String, byte[]> stateConsumer;
    private final KafkaConsumer<String, ActorSystemEventListener> actorSystemEventListenersConsumer;
    private final KafkaConsumer<UUID, ScheduledMessage> scheduledMessagesConsumer;
    private final String clusterName;
    private final InternalActorSystem internalActorSystem;
    private final BlockingQueue<BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>>> commands;
    private final Set<ShardKey> newLocalShards;
    private final Map<ShardKey, ManagedActorShard> localShards;
    private ManagedActorNode localActorNode;
    private final Map<ShardKey, KafkaActorShard> managedShards;
    private final ShardActorCacheManager shardActorCacheManager;
    private final NodeActorCacheManager nodeActorCacheManager;
    private final Serializer<PersistentActor<ShardKey>, byte[]> stateSerializer;
    private final Deserializer<byte[], PersistentActor<ShardKey>> stateDeserializer;
    private final PersistentActorStoreFactory persistentActorStoreFactory;
    private final String messagesTopic;
    private final String scheduledMessagesTopic;
    private final String actorSystemEventListenersTopic;
    private final String persistentActorsTopic;
    private boolean RUNNING;
    private final Integer nodeTopicPartitionId;
    private final Callback loggingCallback;
    private KafkaActorSystemState state;
    private static final Logger logger = LoggerFactory.getLogger(KafkaActorSystemInstance.class);
    private static final AtomicInteger THREAD_ID_SEQUENCE = new AtomicInteger(0);
    private static final PersistentActor<ShardKey> TOMBSTONE = new PersistentActor<>((Object) null, (InternalActorSystem) null, (String) null, (ActorRef) null, (Class) null, (ActorState) null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/kafka/KafkaActorThread$KafkaActorSystemState.class */
    public enum KafkaActorSystemState {
        INITIALIZING,
        ACTIVE,
        REBALANCING
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/kafka/KafkaActorThread$ManagedActorNode.class */
    public final class ManagedActorNode implements EvictionListener<PersistentActor<NodeKey>>, ManagedActorContainer<NodeKey> {
        private final KafkaActorNode actorNode;
        private final boolean primary;
        private final Cache<ActorRef, PersistentActor<NodeKey>> actorCache;
        private final Set<ActorRef> initializedActors;

        private ManagedActorNode(KafkaActorNode kafkaActorNode, boolean z) {
            this.initializedActors = new HashSet();
            this.actorNode = kafkaActorNode;
            this.actorCache = KafkaActorThread.this.nodeActorCacheManager.create(kafkaActorNode.getKey(), this);
            this.primary = z;
        }

        public boolean isPrimary() {
            return this.primary;
        }

        public void onEvicted(PersistentActor<NodeKey> persistentActor) {
            Traceable traceable = persistentActor.getState() instanceof Traceable ? (Traceable) persistentActor.getState() : null;
            boolean z = (traceable == null || (traceable.getTraceContext() == null && traceable.getCreationContext() == null)) ? false : true;
            Logger logger = KafkaActorThread.logger;
            Object[] objArr = new Object[5];
            objArr[0] = persistentActor.getSelf();
            objArr[1] = persistentActor.getActorClass().getName();
            objArr[2] = z ? " Temporary Actor created with the following contexts in scope:" : "";
            objArr[3] = z ? toLoggableString(traceable.getCreationContext()) : "";
            objArr[4] = z ? toLoggableString(traceable.getTraceContext()) : "";
            logger.error("CRITICAL WARNING: Actor [{}] of type [{}] got evicted from the cache. This can lead to issues using temporary actors. Please increase the maximum size of the node actor cache by using the 'ea.nodeCache.maximumSize' property.{}{}{}", objArr);
        }

        private String toLoggableString(@Nullable Object obj) {
            return obj != null ? " " + obj + "." : "";
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public PersistentActor<NodeKey> getPersistentActor(ActorRef actorRef) {
            return (PersistentActor) this.actorCache.getIfPresent(actorRef);
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public void persistActor(PersistentActor<NodeKey> persistentActor) {
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public void deleteActor(PersistentActor<NodeKey> persistentActor) {
            this.actorCache.invalidate(persistentActor.getSelf());
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public boolean containsKey(ActorRef actorRef) {
            return this.actorCache.getIfPresent(actorRef) != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public NodeKey getKey() {
            return this.actorNode.getKey();
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public Cache<ActorRef, PersistentActor<NodeKey>> getActorCache() {
            return this.actorCache;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsoftware/elasticactors/kafka/KafkaActorThread$ManagedActorShard.class */
    public final class ManagedActorShard implements EvictionListener<PersistentActor<ShardKey>>, ManagedActorContainer<ShardKey> {
        private final KafkaActorShard actorShard;
        private final Cache<ActorRef, PersistentActor<ShardKey>> actorCache;
        private final PersistentActorStore actorStore;
        private final TreeMultimap<Long, ScheduledMessage> scheduledMessages = TreeMultimap.create(Comparator.naturalOrder(), Comparator.naturalOrder());

        public ManagedActorShard(KafkaActorShard kafkaActorShard, PersistentActorStore persistentActorStore) {
            this.actorShard = kafkaActorShard;
            this.actorCache = KafkaActorThread.this.shardActorCacheManager.create(kafkaActorShard.getKey(), this);
            this.actorStore = persistentActorStore;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public ShardKey getKey() {
            return this.actorShard.getKey();
        }

        public void onEvicted(PersistentActor<ShardKey> persistentActor) {
            if (KafkaActorThread.TOMBSTONE != persistentActor) {
                KafkaActorThread.this.doInActorContext(ApplicationProtocol::passivateActor, this, persistentActor, null);
            }
        }

        public void destroy() {
            KafkaActorThread.this.shardActorCacheManager.destroy(this.actorCache);
            this.actorStore.destroy();
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public PersistentActor<ShardKey> getPersistentActor(ActorRef actorRef) {
            PersistentActor<ShardKey> persistentActor = (PersistentActor) this.actorCache.getIfPresent(actorRef);
            if (persistentActor != null) {
                return persistentActor;
            }
            PersistentActor<ShardKey> persistentActor2 = this.actorStore.getPersistentActor(actorRef.getActorId());
            if (persistentActor2 == null) {
                return null;
            }
            KafkaActorThread.this.doInActorContext(ApplicationProtocol::activateActor, this, persistentActor2, null);
            this.actorCache.put(actorRef, persistentActor2);
            return persistentActor2;
        }

        public boolean actorExists(ActorRef actorRef) {
            return this.actorStore.containsKey(actorRef.getActorId());
        }

        public List<ScheduledMessage> getScheduledMessagesThatShouldFire(long j) {
            return (List) this.scheduledMessages.values().stream().filter(scheduledMessage -> {
                return scheduledMessage.getFireTime(TimeUnit.MILLISECONDS) < j;
            }).collect(Collectors.toList());
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public void persistActor(PersistentActor<ShardKey> persistentActor) {
            try {
                byte[] bArr = (byte[]) KafkaActorThread.this.stateSerializer.serialize(persistentActor);
                if (KafkaActorThread.logger.isDebugEnabled()) {
                    KafkaActorThread.logger.debug("Serializing PersistentActor: keySize={}, valueSize={}", Integer.valueOf(persistentActor.getSelf().getActorId().getBytes(StandardCharsets.UTF_8).length), Integer.valueOf(bArr.length));
                }
                KafkaActorThread.this.producer.send(new ProducerRecord(KafkaActorThread.this.persistentActorsTopic, Integer.valueOf(getKey().getShardId()), persistentActor.getSelf().getActorId(), bArr), (recordMetadata, exc) -> {
                    if (recordMetadata == null) {
                        KafkaActorThread.logger.error("Exception while sending message to KafkaProducer", exc);
                        return;
                    }
                    if (!recordMetadata.hasOffset()) {
                        this.actorStore.put(persistentActor.getSelf().getActorId(), bArr);
                    } else if (this.actorStore.isConcurrent()) {
                        this.actorStore.put(persistentActor.getSelf().getActorId(), bArr, recordMetadata.offset());
                    } else {
                        KafkaActorThread.this.runCommand((kafkaConsumer, kafkaProducer) -> {
                            this.actorStore.put(persistentActor.getSelf().getActorId(), bArr, recordMetadata.offset());
                        });
                    }
                });
            } catch (IOException e) {
                throw new SerializationException(String.format("Exception while serializing state for actor %s", persistentActor.getSelf().getActorId()), e);
            }
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public void deleteActor(PersistentActor<ShardKey> persistentActor) {
            this.actorStore.remove(persistentActor.getSelf().getActorId());
            KafkaActorThread.this.doSend(new ProducerRecord(KafkaActorThread.this.persistentActorsTopic, Integer.valueOf(getKey().getShardId()), persistentActor.getSelf().getActorId(), (Object) null), KafkaTransactionContext.getProducer());
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public boolean containsKey(ActorRef actorRef) {
            return actorExists(actorRef);
        }

        @Override // org.elasticsoftware.elasticactors.kafka.ManagedActorContainer
        public Cache<ActorRef, PersistentActor<ShardKey>> getActorCache() {
            return this.actorCache;
        }
    }

    public KafkaActorThread(String str, String str2, String str3, InternalActorSystem internalActorSystem, ActorRefFactory actorRefFactory, ShardActorCacheManager shardActorCacheManager, NodeActorCacheManager nodeActorCacheManager, Serializer<PersistentActor<ShardKey>, byte[]> serializer, Deserializer<byte[], PersistentActor<ShardKey>> deserializer, PersistentActorStoreFactory persistentActorStoreFactory) {
        super("KafkaActorThread-" + THREAD_ID_SEQUENCE.getAndIncrement());
        this.newLocalShards = new HashSet();
        this.localShards = new HashMap();
        this.managedShards = new HashMap();
        this.RUNNING = true;
        this.loggingCallback = (recordMetadata, exc) -> {
            if (exc != null) {
                logger.error("Exception while sending message to KafkaProducer", exc);
            }
        };
        this.state = KafkaActorSystemState.INITIALIZING;
        this.persistentActorStoreFactory = persistentActorStoreFactory;
        this.nodeTopicPartitionId = Integer.valueOf(THREAD_ID_SEQUENCE.get() - 1);
        this.clusterName = str;
        this.internalActorSystem = internalActorSystem;
        this.shardActorCacheManager = shardActorCacheManager;
        this.nodeActorCacheManager = nodeActorCacheManager;
        this.stateSerializer = serializer;
        this.stateDeserializer = deserializer;
        this.messagesTopic = TopicNamesHelper.getMessagesTopic(internalActorSystem);
        this.scheduledMessagesTopic = TopicNamesHelper.getScheduledMessagesTopic(internalActorSystem);
        this.actorSystemEventListenersTopic = TopicNamesHelper.getActorsystemEventListenersTopic(internalActorSystem);
        this.persistentActorsTopic = TopicNamesHelper.getPersistentActorsTopic(internalActorSystem);
        HashMap hashMap = new HashMap();
        hashMap.put("max.poll.records", "100");
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("internal.leave.group.on.close", false);
        hashMap.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        hashMap.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        hashMap.put("bootstrap.servers", str2);
        hashMap.put("group.id", str);
        hashMap.put("client.id", str3 + "-" + getName() + "-consumer");
        this.messageConsumer = new KafkaConsumer<>(hashMap, new UUIDDeserializer(), new KafkaInternalMessageDeserializer(new InternalMessageDeserializer(new ActorRefDeserializer(actorRefFactory), internalActorSystem)));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("retries", Integer.MAX_VALUE);
        hashMap2.put("enable.idempotence", true);
        hashMap2.put("max.in.flight.requests.per.connection", 1);
        hashMap2.put("acks", "all");
        hashMap2.put("bootstrap.servers", str2);
        hashMap2.put("client.id", str3 + "-" + getName() + "-producer");
        hashMap2.put("transactional.id", str3 + "-" + getName() + "-producer");
        this.producer = new KafkaProducer<>(hashMap2, new KafkaProducerSerializer(new KafkaInternalMessageSerializer(InternalMessageSerializer.get()), new KafkaPersistentActorSerializer(serializer)), new KafkaProducerSerializer(new KafkaInternalMessageSerializer(InternalMessageSerializer.get()), new KafkaPersistentActorSerializer(serializer)));
        this.producer.initTransactions();
        this.commands = new LinkedBlockingQueue();
        HashMap hashMap3 = new HashMap();
        hashMap3.put("max.poll.records", "1000");
        hashMap3.put("auto.offset.reset", "earliest");
        hashMap3.put("enable.auto.commit", "false");
        hashMap3.put("internal.leave.group.on.close", false);
        hashMap3.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        hashMap3.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        hashMap3.put("bootstrap.servers", str2);
        hashMap3.put("group.id", str + "-state");
        hashMap3.put("client.id", str3 + "-" + getName() + "-state-consumer");
        this.stateConsumer = new KafkaConsumer<>(hashMap3, new StringDeserializer(), new ByteArrayDeserializer());
        HashMap hashMap4 = new HashMap();
        hashMap4.put("max.poll.records", "1000");
        hashMap4.put("auto.offset.reset", "earliest");
        hashMap4.put("enable.auto.commit", "false");
        hashMap4.put("internal.leave.group.on.close", false);
        hashMap4.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        hashMap4.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        hashMap4.put("bootstrap.servers", str2);
        hashMap4.put("group.id", str + "-scheduledMessages");
        hashMap4.put("client.id", str3 + "-" + getName() + "-scheduledMessages-consumer");
        this.scheduledMessagesConsumer = new KafkaConsumer<>(hashMap4, new UUIDDeserializer(), new KafkaScheduledMessageDeserializer(new ScheduledMessageDeserializer(new ActorRefDeserializer(actorRefFactory))));
        HashMap hashMap5 = new HashMap();
        hashMap5.put("max.poll.records", "1000");
        hashMap5.put("auto.offset.reset", "earliest");
        hashMap5.put("enable.auto.commit", "false");
        hashMap5.put("internal.leave.group.on.close", false);
        hashMap5.put("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        hashMap5.put("isolation.level", IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT));
        hashMap5.put("bootstrap.servers", str2);
        hashMap5.put("group.id", str + "-actorSystemEventListeners");
        hashMap5.put("client.id", str3 + "-" + getName() + "-actorSystemEventListeners-consumer");
        this.actorSystemEventListenersConsumer = new KafkaConsumer<>(hashMap5, new StringDeserializer(), new KafkaActorSystemEventListenerDeserializer());
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x002b, code lost:
    
        if (r4.RUNNING == false) goto L29;
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0035, code lost:
    
        if (r4.state != org.elasticsoftware.elasticactors.kafka.KafkaActorThread.KafkaActorSystemState.ACTIVE) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x0038, code lost:
    
        processMessages();
        updateScheduledMessages();
        maybeFireScheduledMessages();
     */
    /* JADX WARN: Code restructure failed: missing block: B:5:0x000d, code lost:
    
        if (r5 != null) goto L6;
     */
    /* JADX WARN: Code restructure failed: missing block: B:6:0x0010, code lost:
    
        r5.accept(r4.messageConsumer, r4.producer);
        r5 = pollOrWait();
     */
    /* JADX WARN: Code restructure failed: missing block: B:7:0x0024, code lost:
    
        if (r5 != null) goto L33;
     */
    @Override // java.lang.Thread, java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            r4 = this;
        L0:
            r0 = r4
            boolean r0 = r0.RUNNING     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            if (r0 == 0) goto L47
            r0 = r4
            java.util.function.BiConsumer r0 = r0.pollOrWait()     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r5 = r0
            r0 = r5
            if (r0 == 0) goto L27
        L10:
            r0 = r5
            r1 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.util.UUID, org.elasticsoftware.elasticactors.messaging.InternalMessage> r1 = r1.messageConsumer     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r2 = r4
            org.apache.kafka.clients.producer.KafkaProducer<java.lang.Object, java.lang.Object> r2 = r2.producer     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r0.accept(r1, r2)     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r0 = r4
            java.util.function.BiConsumer r0 = r0.pollOrWait()     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r5 = r0
            r0 = r5
            if (r0 != 0) goto L10
        L27:
            r0 = r4
            boolean r0 = r0.RUNNING     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            if (r0 == 0) goto L0
            r0 = r4
            org.elasticsoftware.elasticactors.kafka.KafkaActorThread$KafkaActorSystemState r0 = r0.state     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            org.elasticsoftware.elasticactors.kafka.KafkaActorThread$KafkaActorSystemState r1 = org.elasticsoftware.elasticactors.kafka.KafkaActorThread.KafkaActorSystemState.ACTIVE     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            if (r0 != r1) goto L0
            r0 = r4
            r0.processMessages()     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r0 = r4
            r0.updateScheduledMessages()     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            r0 = r4
            r0.maybeFireScheduledMessages()     // Catch: java.lang.Exception -> L66 java.lang.Throwable -> L91
            goto L0
        L47:
            r0 = r4
            org.apache.kafka.clients.producer.KafkaProducer<java.lang.Object, java.lang.Object> r0 = r0.producer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.lang.String, byte[]> r0 = r0.stateConsumer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.lang.String, org.elasticsoftware.elasticactors.cluster.ActorSystemEventListener> r0 = r0.actorSystemEventListenersConsumer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.util.UUID, org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage> r0 = r0.scheduledMessagesConsumer
            r0.close()
            goto Lb0
        L66:
            r6 = move-exception
            org.slf4j.Logger r0 = org.elasticsoftware.elasticactors.kafka.KafkaActorThread.logger     // Catch: java.lang.Throwable -> L91
            java.lang.String r1 = "FATAL: Exception in KafkaActorThread runLoop"
            r2 = r6
            r0.error(r1, r2)     // Catch: java.lang.Throwable -> L91
            r0 = r4
            org.apache.kafka.clients.producer.KafkaProducer<java.lang.Object, java.lang.Object> r0 = r0.producer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.lang.String, byte[]> r0 = r0.stateConsumer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.lang.String, org.elasticsoftware.elasticactors.cluster.ActorSystemEventListener> r0 = r0.actorSystemEventListenersConsumer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.util.UUID, org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage> r0 = r0.scheduledMessagesConsumer
            r0.close()
            goto Lb0
        L91:
            r7 = move-exception
            r0 = r4
            org.apache.kafka.clients.producer.KafkaProducer<java.lang.Object, java.lang.Object> r0 = r0.producer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.lang.String, byte[]> r0 = r0.stateConsumer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.lang.String, org.elasticsoftware.elasticactors.cluster.ActorSystemEventListener> r0 = r0.actorSystemEventListenersConsumer
            r0.close()
            r0 = r4
            org.apache.kafka.clients.consumer.KafkaConsumer<java.util.UUID, org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage> r0 = r0.scheduledMessagesConsumer
            r0.close()
            r0 = r7
            throw r0
        Lb0:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.elasticsoftware.elasticactors.kafka.KafkaActorThread.run():void");
    }

    private BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>> pollOrWait() {
        if (this.state == KafkaActorSystemState.ACTIVE) {
            return this.commands.poll();
        }
        try {
            return this.commands.poll(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return null;
        }
    }

    private void processMessages() {
        try {
            ConsumerRecords poll = this.messageConsumer.poll(Duration.ofMillis(1L));
            if (!poll.isEmpty()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("messageConsumer has {} records to process", Integer.valueOf(poll.count()));
                }
                this.producer.beginTransaction();
                KafkaTransactionContext.setTransactionalProducer(this.producer);
                HashMap hashMap = new HashMap();
                poll.partitions().forEach(topicPartition -> {
                    poll.records(topicPartition).forEach(consumerRecord -> {
                        if (logger.isDebugEnabled()) {
                            logger.debug("handling InternalMessage(sender:{}, receiver:{}, type:{})  with offset {} from topicPartition({})", new Object[]{((InternalMessage) consumerRecord.value()).getSender(), ((InternalMessage) consumerRecord.value()).getReceivers().get(0), ((InternalMessage) consumerRecord.value()).getPayloadClass(), Long.valueOf(consumerRecord.offset()), topicPartition});
                        }
                        handleInternalMessage(topicPartition, (InternalMessage) consumerRecord.value());
                        hashMap.put(topicPartition, new OffsetAndMetadata(consumerRecord.offset() + 1));
                    });
                });
                this.producer.sendOffsetsToTransaction(hashMap, this.clusterName);
                this.producer.commitTransaction();
            }
        } catch (KafkaException e) {
            logger.error("FATAL: Unrecoverable exception while polling for Messages", e);
            System.exit(1);
        } catch (WakeupException | InterruptException e2) {
            logger.warn("Recoverable exception while polling for Messages", e2);
        } catch (Throwable th) {
            logger.error("Unexpected exception while polling for Messages", th);
            System.exit(1);
        }
    }

    private void updateScheduledMessages() {
        try {
            ConsumerRecords poll = this.scheduledMessagesConsumer.poll(Duration.ZERO);
            if (!poll.isEmpty()) {
                poll.partitions().forEach(topicPartition -> {
                    poll.records(topicPartition).forEach(consumerRecord -> {
                        ManagedActorShard managedActorShard = this.localShards.get(new ShardKey(this.internalActorSystem.getName(), topicPartition.partition()));
                        if (managedActorShard != null) {
                            if (consumerRecord.value() != null) {
                                managedActorShard.scheduledMessages.put(Long.valueOf(((ScheduledMessage) consumerRecord.value()).getFireTime(TimeUnit.MILLISECONDS)), consumerRecord.value());
                            } else {
                                managedActorShard.scheduledMessages.entries().removeIf(entry -> {
                                    return ((ScheduledMessage) entry.getValue()).getId().equals(consumerRecord.key());
                                });
                            }
                        }
                    });
                });
            }
        } catch (WakeupException | InterruptException e) {
            logger.warn("Recoverable exception while polling for ScheduledMessages", e);
        } catch (KafkaException e2) {
            logger.error("FATAL: Unrecoverable exception while polling for ScheduledMessages", e2);
        } catch (Throwable th) {
            logger.error("Unexpected exception while polling for ScheduledMessages", th);
        }
    }

    private void maybeFireScheduledMessages() {
        List list = (List) this.localShards.values().stream().map(managedActorShard -> {
            return managedActorShard.getScheduledMessagesThatShouldFire(System.currentTimeMillis());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        try {
            this.producer.beginTransaction();
            list.forEach(scheduledMessage -> {
                DefaultInternalMessage defaultInternalMessage = new DefaultInternalMessage(scheduledMessage.getSender(), scheduledMessage.getReceiver(), scheduledMessage.getMessageBytes(), scheduledMessage.getMessageClass().getName(), scheduledMessage.getMessageQueueAffinityKey(), false);
                this.producer.send(new ProducerRecord(this.messagesTopic, Integer.valueOf(scheduledMessage.getReceiver().getActorContainer().getKey().getShardId()), defaultInternalMessage.getId(), defaultInternalMessage));
                this.producer.send(new ProducerRecord(this.scheduledMessagesTopic, Integer.valueOf(scheduledMessage.getSender().getActorContainer().getKey().getShardId()), scheduledMessage.getId(), (Object) null));
            });
            this.producer.commitTransaction();
            list.forEach(scheduledMessage2 -> {
                this.localShards.get(scheduledMessage2.getSender().getActorContainer().getKey()).scheduledMessages.remove(Long.valueOf(scheduledMessage2.getFireTime(TimeUnit.MILLISECONDS)), scheduledMessage2);
            });
        } catch (ProducerFencedException e) {
            logger.error("FATAL: ProducerFenced while committing transaction, another Node seems to be handling the same shards", e);
        } catch (KafkaException e2) {
            logger.error("FATAL: Unrecoverable exception while committing producer transaction", e2);
        } catch (RetriableException e3) {
            logger.warn("Recoverable exception while sending ScheduledMessages", e3);
        } catch (Throwable th) {
            logger.error("Unexpected exception while processing ScheduledMessages", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(ShardKey shardKey, InternalMessage internalMessage) {
        doSend(new ProducerRecord<>(this.messagesTopic, Integer.valueOf(shardKey.getShardId()), internalMessage.getId(), internalMessage), KafkaTransactionContext.getProducer());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doSend(ProducerRecord<Object, Object> producerRecord, KafkaProducer<Object, Object> kafkaProducer) {
        if (kafkaProducer == null) {
            runCommand((kafkaConsumer, kafkaProducer2) -> {
                try {
                    kafkaProducer2.beginTransaction();
                    kafkaProducer2.send(producerRecord, this.loggingCallback);
                    kafkaProducer2.commitTransaction();
                } catch (RetriableException e) {
                    logger.warn("Recoverable exception while sending ProducerRecord", e);
                } catch (KafkaException e2) {
                    logger.error("FATAL: Unrecoverable exception while committing producer transaction", e2);
                } catch (ProducerFencedException e3) {
                    logger.error("FATAL: ProducerFenced while committing transaction, another Node seems to be handling the same shards", e3);
                } catch (Throwable th) {
                    logger.error("Unexpected exception while sending ProducerRecord", th);
                }
            });
        } else {
            kafkaProducer.send(producerRecord, this.loggingCallback);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void send(NodeKey nodeKey, int i, InternalMessage internalMessage) {
        doSend(new ProducerRecord<>(TopicNamesHelper.getNodeMessagesTopic(this.internalActorSystem, nodeKey.getNodeId()), Integer.valueOf(i), internalMessage.getId(), internalMessage), KafkaTransactionContext.getProducer());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void schedule(ShardKey shardKey, ScheduledMessage scheduledMessage) {
        doSend(new ProducerRecord<>(this.scheduledMessagesTopic, Integer.valueOf(shardKey.getShardId()), scheduledMessage.getId(), scheduledMessage), KafkaTransactionContext.getProducer());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void register(ShardKey shardKey, ActorSystemEvent actorSystemEvent, ActorSystemEventListener actorSystemEventListener) {
        doSend(new ProducerRecord<>(this.actorSystemEventListenersTopic, Integer.valueOf(shardKey.getShardId()), String.format("%s:%s", actorSystemEvent.name(), actorSystemEventListener.getActorId()), actorSystemEventListener), KafkaTransactionContext.getProducer());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deregister(ShardKey shardKey, ActorSystemEvent actorSystemEvent, ActorRef actorRef) {
        doSend(new ProducerRecord<>(this.actorSystemEventListenersTopic, Integer.valueOf(shardKey.getShardId()), String.format("%s:%s", actorSystemEvent.name(), actorRef.getActorId()), (Object) null), KafkaTransactionContext.getProducer());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assign(KafkaActorNode kafkaActorNode, boolean z) {
        runCommand((kafkaConsumer, kafkaProducer) -> {
            this.localActorNode = new ManagedActorNode(kafkaActorNode, z);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void assign(KafkaActorShard kafkaActorShard) {
        runCommand((kafkaConsumer, kafkaProducer) -> {
            this.managedShards.put(kafkaActorShard.getKey(), kafkaActorShard);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopRunning() {
        runCommand((kafkaConsumer, kafkaProducer) -> {
            this.RUNNING = false;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionStage<Boolean> prepareRebalance(Multimap<PhysicalNode, ShardKey> multimap, ShardDistributionStrategy shardDistributionStrategy) {
        CompletableFuture completableFuture = new CompletableFuture();
        runCommand((kafkaConsumer, kafkaProducer) -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            this.state = KafkaActorSystemState.REBALANCING;
            multimap.forEach((physicalNode, shardKey) -> {
                KafkaActorShard kafkaActorShard = this.managedShards.get(shardKey);
                if (kafkaActorShard != null) {
                    if (!physicalNode.isLocal()) {
                        if (kafkaActorShard.getOwningNode() != null && !kafkaActorShard.getOwningNode().isLocal()) {
                            logger.info("{} will own {}", physicalNode, shardKey);
                            return;
                        }
                        logger.info("{} will own {}", physicalNode, shardKey);
                        try {
                            if (kafkaActorShard.getOwningNode() != null) {
                                kafkaActorShard.setOwningNode(physicalNode);
                                this.localShards.remove(shardKey).destroy();
                                shardDistributionStrategy.signalRelease(kafkaActorShard, physicalNode);
                            }
                            return;
                        } catch (Exception e) {
                            logger.error("IMPORTANT: signalling release of shard {} to node {} failed, ElasticActors cluster is unstable. Please check all nodes", new Object[]{shardKey, physicalNode, e});
                            atomicBoolean.set(false);
                            return;
                        }
                    }
                    if (kafkaActorShard.getOwningNode() != null && kafkaActorShard.getOwningNode().equals(physicalNode)) {
                        logger.info("I already own {}", shardKey);
                        return;
                    }
                    String id = kafkaActorShard.getOwningNode() != null ? kafkaActorShard.getOwningNode().getId() : "<No Node>";
                    logger.info("I will own {}", shardKey);
                    try {
                        try {
                            shardDistributionStrategy.registerWaitForRelease(kafkaActorShard, physicalNode);
                            kafkaActorShard.setOwningNode(physicalNode);
                            this.newLocalShards.add(shardKey);
                        } catch (Exception e2) {
                            logger.error("IMPORTANT: waiting on release of shard {} from node {} failed,  ElasticActors cluster is unstable. Please check all nodes", new Object[]{shardKey, id, e2});
                            atomicBoolean.set(false);
                            kafkaActorShard.setOwningNode(physicalNode);
                            this.newLocalShards.add(shardKey);
                        }
                    } catch (Throwable th) {
                        kafkaActorShard.setOwningNode(physicalNode);
                        this.newLocalShards.add(shardKey);
                        throw th;
                    }
                }
            });
            completableFuture.complete(Boolean.valueOf(atomicBoolean.get()));
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionStage<Set<Integer>> performRebalance() {
        CompletableFuture completableFuture = new CompletableFuture();
        runCommand((kafkaConsumer, kafkaProducer) -> {
            LinkedList linkedList = new LinkedList();
            try {
                this.newLocalShards.forEach(shardKey -> {
                    ManagedActorShard managedActorShard = new ManagedActorShard(this.managedShards.get(shardKey), createStateStore(shardKey));
                    this.localShards.put(shardKey, managedActorShard);
                    linkedList.add(managedActorShard);
                });
                this.newLocalShards.clear();
                assignPartitions();
                if (!linkedList.isEmpty()) {
                    initializeStateStores(linkedList);
                    initializeScheduledMessages(linkedList);
                    initializeAndRunActorSystemEventListeners(linkedList);
                }
                this.state = KafkaActorSystemState.ACTIVE;
                completableFuture.complete(this.newLocalShards.stream().map((v0) -> {
                    return v0.getShardId();
                }).collect(Collectors.toSet()));
            } catch (Exception e) {
                logger.error("FATAL Exception on performRebalance", e);
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private void assignPartitions() {
        List list = (List) this.localShards.keySet().stream().map(shardKey -> {
            return new TopicPartition(this.messagesTopic, shardKey.getShardId());
        }).collect(Collectors.toList());
        if (this.localActorNode != null) {
            list.add(new TopicPartition(TopicNamesHelper.getNodeMessagesTopic(this.internalActorSystem, this.localActorNode.actorNode.getKey().getNodeId()), this.nodeTopicPartitionId.intValue()));
        }
        this.messageConsumer.assign(list);
        this.stateConsumer.assign((List) this.localShards.keySet().stream().map(shardKey2 -> {
            return new TopicPartition(this.persistentActorsTopic, shardKey2.getShardId());
        }).collect(Collectors.toList()));
        this.scheduledMessagesConsumer.assign((List) this.localShards.keySet().stream().map(shardKey3 -> {
            return new TopicPartition(this.scheduledMessagesTopic, shardKey3.getShardId());
        }).collect(Collectors.toList()));
        this.actorSystemEventListenersConsumer.assign((List) this.localShards.keySet().stream().map(shardKey4 -> {
            return new TopicPartition(this.actorSystemEventListenersTopic, shardKey4.getShardId());
        }).collect(Collectors.toList()));
    }

    private PersistentActorStore createStateStore(ShardKey shardKey) {
        return this.persistentActorStoreFactory.create(shardKey, this.stateDeserializer);
    }

    private void initializeStateStores(List<ManagedActorShard> list) {
        if (list.isEmpty()) {
            return;
        }
        List list2 = (List) list.stream().map(managedActorShard -> {
            return new TopicPartition(this.persistentActorsTopic, managedActorShard.getKey().getShardId());
        }).collect(Collectors.toList());
        this.stateConsumer.seekToEnd(this.stateConsumer.assignment());
        this.stateConsumer.seekToBeginning(list2);
        Map map = (Map) list.stream().collect(Collectors.toMap(managedActorShard2 -> {
            return Integer.valueOf(managedActorShard2.getKey().getShardId());
        }, managedActorShard3 -> {
            return managedActorShard3;
        }));
        Map map2 = (Map) this.stateConsumer.endOffsets(list2).entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() > 0;
        }).collect(Collectors.toMap(entry2 -> {
            return Integer.valueOf(((TopicPartition) entry2.getKey()).partition());
        }, (v0) -> {
            return v0.getValue();
        }));
        map.forEach((num, managedActorShard4) -> {
            if (managedActorShard4.actorStore.getOffset() >= 0) {
                this.stateConsumer.seek(new TopicPartition(this.persistentActorsTopic, managedActorShard4.getKey().getShardId()), managedActorShard4.actorStore.getOffset() + 1);
                Long l = (Long) map2.get(Integer.valueOf(managedActorShard4.getKey().getShardId()));
                if (l == null || l.longValue() - DEFAULT_OFFSET_INCREASE != managedActorShard4.actorStore.getOffset()) {
                    return;
                }
                map2.remove(Integer.valueOf(managedActorShard4.getKey().getShardId()));
            }
        });
        ConsumerRecords consumerRecords = null;
        int i = 0;
        while (true) {
            try {
                consumerRecords = this.stateConsumer.poll(10L);
                i += consumerRecords.count();
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    if (consumerRecord.value() != null) {
                        ((ManagedActorShard) map.get(Integer.valueOf(consumerRecord.partition()))).actorStore.put((String) consumerRecord.key(), (byte[]) consumerRecord.value(), consumerRecord.offset());
                    }
                    Long l = (Long) map2.get(Integer.valueOf(consumerRecord.partition()));
                    if (l == null || l.longValue() - DEFAULT_OFFSET_INCREASE != consumerRecord.offset()) {
                        return;
                    }
                    map2.remove(Integer.valueOf(consumerRecord.partition()));
                });
            } catch (KafkaException e) {
                logger.error("FATAL: Unrecoverable exception while polling PersistentActors state", e);
            } catch (WakeupException | InterruptException e2) {
                logger.warn("Recoverable exception while polling PersistentActors state", e2);
            } catch (Throwable th) {
                logger.error("Unexpected exception while populating PersistentActorStores", th);
            }
            if (consumerRecords == null || consumerRecords.isEmpty()) {
                if (map2.isEmpty()) {
                    logger.info("Loaded {} unique persistent actors from {} entries", Integer.valueOf(list.stream().mapToInt(managedActorShard5 -> {
                        return managedActorShard5.actorStore.count();
                    }).sum()), Integer.valueOf(i));
                    return;
                }
            }
        }
    }

    private void initializeScheduledMessages(List<ManagedActorShard> list) {
        if (list.isEmpty()) {
            return;
        }
        List list2 = (List) list.stream().map(managedActorShard -> {
            return new TopicPartition(this.scheduledMessagesTopic, managedActorShard.getKey().getShardId());
        }).collect(Collectors.toList());
        this.scheduledMessagesConsumer.seekToBeginning(list2);
        Map map = (Map) list.stream().collect(Collectors.toMap(managedActorShard2 -> {
            return Integer.valueOf(managedActorShard2.getKey().getShardId());
        }, managedActorShard3 -> {
            return managedActorShard3;
        }));
        Map map2 = (Map) this.scheduledMessagesConsumer.endOffsets(list2).entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() > 0;
        }).collect(Collectors.toMap(entry2 -> {
            return Integer.valueOf(((TopicPartition) entry2.getKey()).partition());
        }, (v0) -> {
            return v0.getValue();
        }));
        ConsumerRecords consumerRecords = null;
        while (true) {
            try {
                consumerRecords = this.scheduledMessagesConsumer.poll(10L);
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    if (consumerRecord.value() != null) {
                        ((ManagedActorShard) map.get(Integer.valueOf(consumerRecord.partition()))).scheduledMessages.put(Long.valueOf(((ScheduledMessage) consumerRecord.value()).getFireTime(TimeUnit.MILLISECONDS)), consumerRecord.value());
                    } else {
                        ((ManagedActorShard) map.get(Integer.valueOf(consumerRecord.partition()))).scheduledMessages.entries().removeIf(entry3 -> {
                            return ((ScheduledMessage) entry3.getValue()).getId().equals(consumerRecord.key());
                        });
                    }
                    Long l = (Long) map2.get(Integer.valueOf(consumerRecord.partition()));
                    if (l == null || l.longValue() - DEFAULT_OFFSET_INCREASE != consumerRecord.offset()) {
                        return;
                    }
                    map2.remove(Integer.valueOf(consumerRecord.partition()));
                });
            } catch (KafkaException e) {
                logger.error("FATAL: Unrecoverable exception while polling ScheduledMessages state", e);
            } catch (WakeupException | InterruptException e2) {
                logger.warn("Recoverable exception while polling ScheduledMessages state", e2);
            } catch (Throwable th) {
                logger.error("Unexpected exception while populating ScheduledMessage", th);
            }
            if (consumerRecords == null || consumerRecords.isEmpty()) {
                if (map2.isEmpty()) {
                    try {
                        this.scheduledMessagesConsumer.commitSync();
                        return;
                    } catch (KafkaException e3) {
                        logger.error("FATAL: Unrecoverable exception calling commitSync on scheduledMessagesConsumer", e3);
                        return;
                    } catch (WakeupException | InterruptException e4) {
                        logger.warn("Recoverable exception calling commitSync on scheduledMessagesConsumer", e4);
                        return;
                    }
                }
            }
        }
    }

    private void initializeAndRunActorSystemEventListeners(List<ManagedActorShard> list) {
        if (list.isEmpty()) {
            return;
        }
        List list2 = (List) list.stream().map(managedActorShard -> {
            return new TopicPartition(this.actorSystemEventListenersTopic, managedActorShard.getKey().getShardId());
        }).collect(Collectors.toList());
        this.actorSystemEventListenersConsumer.seekToBeginning(list2);
        Map map = (Map) list.stream().collect(Collectors.toMap(managedActorShard2 -> {
            return Integer.valueOf(managedActorShard2.getKey().getShardId());
        }, managedActorShard3 -> {
            return managedActorShard3;
        }));
        Map map2 = (Map) this.actorSystemEventListenersConsumer.endOffsets(list2).entrySet().stream().filter(entry -> {
            return ((Long) entry.getValue()).longValue() > 0;
        }).collect(Collectors.toMap(entry2 -> {
            return Integer.valueOf(((TopicPartition) entry2.getKey()).partition());
        }, (v0) -> {
            return v0.getValue();
        }));
        ConsumerRecords consumerRecords = null;
        while (true) {
            try {
                consumerRecords = this.actorSystemEventListenersConsumer.poll(10L);
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    ActorSystemEventListener actorSystemEventListener = (ActorSystemEventListener) consumerRecord.value();
                    if (actorSystemEventListener != null) {
                        ManagedActorShard managedActorShard4 = (ManagedActorShard) map.get(Integer.valueOf(consumerRecord.partition()));
                        ActorRef actorFor = this.internalActorSystem.actorFor(actorSystemEventListener.getActorId());
                        PersistentActor<ShardKey> persistentActor = managedActorShard4.getPersistentActor(actorFor);
                        DefaultInternalMessage defaultInternalMessage = new DefaultInternalMessage((ActorRef) null, actorFor, actorSystemEventListener.getMessageBytes(), actorSystemEventListener.getMessageClass().getName(), actorSystemEventListener.getMessageQueueAffinityKey(), false);
                        this.producer.beginTransaction();
                        try {
                            try {
                                try {
                                    KafkaTransactionContext.setTransactionalProducer(this.producer);
                                    doInActorContext(ApplicationProtocol::handleMessage, managedActorShard4, persistentActor, defaultInternalMessage);
                                    this.producer.commitTransaction();
                                    KafkaTransactionContext.clear();
                                } catch (KafkaException e) {
                                    logger.error("FATAL: Unrecoverable exception while comitting producer transaction", e);
                                    KafkaTransactionContext.clear();
                                }
                            } catch (ProducerFencedException e2) {
                                logger.error("FATAL: ProducerFenced while committing transaction, another Node seems to be handling the same shards", e2);
                                KafkaTransactionContext.clear();
                            } catch (Throwable th) {
                                logger.error("Unexpected exception while generating ActorSystemEventListener message", th);
                                KafkaTransactionContext.clear();
                            }
                        } catch (Throwable th2) {
                            KafkaTransactionContext.clear();
                            throw th2;
                        }
                    }
                    Long l = (Long) map2.get(Integer.valueOf(consumerRecord.partition()));
                    if (l == null || l.longValue() - DEFAULT_OFFSET_INCREASE != consumerRecord.offset()) {
                        return;
                    }
                    map2.remove(Integer.valueOf(consumerRecord.partition()));
                });
            } catch (KafkaException e) {
                logger.error("FATAL: Unrecoverable exception while polling ActorSystemEventListeners state", e);
            } catch (WakeupException | InterruptException e2) {
                logger.warn("Recoverable exception while polling ActorSystemEventListeners state", e2);
            }
            if (consumerRecords == null || consumerRecords.isEmpty()) {
                if (map2.isEmpty()) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Integer getNodeTopicPartitionId() {
        return this.nodeTopicPartitionId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createTempActor(ActorRef actorRef, CreateActorMessage createActorMessage) {
        runCommand((kafkaConsumer, kafkaProducer) -> {
            try {
                createActor(this.localActorNode, createActorMessage, actorRef, null);
            } catch (Exception e) {
                logger.error("Exception while creating TempActor", e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> initializeServiceActors() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        runCommand((kafkaConsumer, kafkaProducer) -> {
            Set services = this.internalActorSystem.getConfiguration().getServices();
            if (services != null && !services.isEmpty()) {
                services.forEach(str -> {
                    ActorRef serviceActorFor = this.internalActorSystem.serviceActorFor(str);
                    ElasticActor service = this.internalActorSystem.getConfiguration().getService(str);
                    if (this.localActorNode.initializedActors.contains(serviceActorFor)) {
                        return;
                    }
                    InternalActorContext.setContext(new ServiceActorContext(serviceActorFor, this.internalActorSystem));
                    try {
                        try {
                            service.postActivate((String) null);
                            InternalActorContext.clearContext();
                            this.localActorNode.initializedActors.add(serviceActorFor);
                        } catch (Exception e) {
                            logger.error("Exception while handling message for service [{}]", serviceActorFor, e);
                            InternalActorContext.clearContext();
                            this.localActorNode.initializedActors.add(serviceActorFor);
                        }
                    } catch (Throwable th) {
                        InternalActorContext.clearContext();
                        this.localActorNode.initializedActors.add(serviceActorFor);
                        throw th;
                    }
                });
            }
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void runCommand(BiConsumer<KafkaConsumer<UUID, InternalMessage>, KafkaProducer<Object, Object>> biConsumer) {
        this.commands.offer(biConsumer);
    }

    private void handleInternalMessage(TopicPartition topicPartition, InternalMessage internalMessage) {
        if (topicPartition.topic().equals(this.messagesTopic)) {
            handleInternalMessage(this.localShards.get(new ShardKey(this.internalActorSystem.getName(), topicPartition.partition())), internalMessage);
        } else {
            handleInternalMessage(this.localActorNode, internalMessage);
        }
    }

    private void handleInternalMessage(ManagedActorContainer managedActorContainer, InternalMessage internalMessage) {
        internalMessage.getReceivers().forEach(actorRef -> {
            if (actorRef.getActorId() != null) {
                handleActorMessage(managedActorContainer, actorRef, internalMessage);
            } else {
                handleContainerMessage(managedActorContainer, internalMessage);
            }
        });
    }

    private void handleActorMessage(ManagedActorContainer managedActorContainer, ActorRef actorRef, InternalMessage internalMessage) {
        PersistentActor<ShardKey> persistentActor = managedActorContainer.getPersistentActor(actorRef);
        if (TOMBSTONE == persistentActor) {
            sendUndeliverable(internalMessage, actorRef);
            return;
        }
        if (persistentActor != null) {
            if (internalMessage.isUndeliverable()) {
                if (internalMessage.getPayloadClass().startsWith("org.elasticsoftware.elasticactors.messaging.reactivestreams")) {
                    doInActorContext(ReactiveStreamsProtocol::handleUndeliverableMessage, managedActorContainer, persistentActor, internalMessage);
                    return;
                } else {
                    doInActorContext(ApplicationProtocol::handleUndeliverableMessage, managedActorContainer, persistentActor, internalMessage);
                    return;
                }
            }
            if (internalMessage.getPayloadClass().startsWith("org.elasticsoftware.elasticactors.messaging.reactivestreams")) {
                doInActorContext(ReactiveStreamsProtocol::handleMessage, managedActorContainer, persistentActor, internalMessage);
                return;
            } else {
                doInActorContext(ApplicationProtocol::handleMessage, managedActorContainer, persistentActor, internalMessage);
                return;
            }
        }
        ElasticActor serviceInstance = this.internalActorSystem.getServiceInstance(actorRef);
        if (serviceInstance == null) {
            sendUndeliverable(internalMessage, actorRef);
            return;
        }
        boolean contains = ((ManagedActorNode) managedActorContainer).initializedActors.contains(actorRef);
        InternalActorContext.setContext(new ServiceActorContext(actorRef, this.internalActorSystem));
        try {
            if (contains) {
                ((ManagedActorNode) managedActorContainer).initializedActors.add(actorRef);
                serviceInstance.postActivate((String) null);
            }
            Object deserializeMessage = SerializationTools.deserializeMessage(this.internalActorSystem, internalMessage);
            if (internalMessage.isUndeliverable()) {
                serviceInstance.onUndeliverable(internalMessage.getSender(), deserializeMessage);
            } else {
                serviceInstance.onReceive(internalMessage.getSender(), deserializeMessage);
            }
        } catch (Exception e) {
            logger.error("Exception while handling message for service [{}]", actorRef, e);
        } finally {
            InternalActorContext.clearContext();
        }
    }

    private void handleContainerMessage(ManagedActorContainer managedActorContainer, InternalMessage internalMessage) {
        try {
            Object deserializeMessage = SerializationTools.deserializeMessage(this.internalActorSystem, internalMessage);
            if (deserializeMessage instanceof CreateActorMessage) {
                CreateActorMessage createActorMessage = (CreateActorMessage) deserializeMessage;
                ActorRef actorFor = this.internalActorSystem.actorFor(createActorMessage.getActorId());
                if (managedActorContainer.containsKey(actorFor)) {
                    managedActorContainer.getPersistentActor(actorFor);
                } else {
                    createActor(managedActorContainer, createActorMessage, actorFor, internalMessage);
                }
            } else if (deserializeMessage instanceof DestroyActorMessage) {
                destroyActor(managedActorContainer, (DestroyActorMessage) deserializeMessage, internalMessage);
            } else if (deserializeMessage instanceof CancelScheduledMessageMessage) {
                cancelScheduledMessage((ManagedActorShard) managedActorContainer, (CancelScheduledMessageMessage) deserializeMessage);
            } else if (deserializeMessage instanceof ActorNodeMessage) {
                if (internalMessage.isUndeliverable()) {
                    logger.error("undeliverable ActorNodeMessages are currently not supported");
                } else {
                    ActorNodeMessage actorNodeMessage = (ActorNodeMessage) deserializeMessage;
                    ActorNode node = this.internalActorSystem.getNode(actorNodeMessage.getNodeId());
                    if (node == null) {
                        logger.error("ActorNode with id [{}] is not reachable, discarding message of type [{}] from [{}] for [{}]", new Object[]{actorNodeMessage.getNodeId(), actorNodeMessage.getMessage().getClass().getName(), internalMessage.getSender(), actorNodeMessage.getReceiverRef()});
                    } else if (actorNodeMessage.isUndeliverable()) {
                        node.undeliverableMessage(createInternalMessage(actorNodeMessage.getReceiverRef(), ImmutableList.of(internalMessage.getSender()), actorNodeMessage.getMessage()), internalMessage.getSender());
                    } else {
                        node.sendMessage(internalMessage.getSender(), actorNodeMessage.getReceiverRef(), actorNodeMessage.getMessage());
                    }
                }
            } else if (deserializeMessage instanceof PersistActorMessage) {
                persistActor(managedActorContainer, ((PersistActorMessage) deserializeMessage).getActorRef());
            }
        } catch (Exception e) {
            logger.error("Exception while handling InternalMessage for Shard [{}]; senderRef [{}], messageType [{}]", new Object[]{managedActorContainer.getKey(), internalMessage.getSender(), internalMessage.getPayloadClass(), e});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doInActorContext(ActorLifecycleFunction actorLifecycleFunction, ManagedActorContainer managedActorContainer, PersistentActor persistentActor, InternalMessage internalMessage) {
        InternalActorContext.setContext(persistentActor);
        boolean z = false;
        try {
            try {
                z = actorLifecycleFunction.apply(this.internalActorSystem, persistentActor, this.internalActorSystem.getActorInstance(persistentActor.getSelf(), persistentActor.getActorClass()), persistentActor.getSelf(), internalMessage).booleanValue();
                SerializationContext.reset();
                InternalActorContext.clearContext();
            } catch (Exception e) {
                logger.error("Exception in doInActorContext", e);
                SerializationContext.reset();
                InternalActorContext.clearContext();
            }
            if (z) {
                managedActorContainer.persistActor(persistentActor);
            }
        } catch (Throwable th) {
            SerializationContext.reset();
            InternalActorContext.clearContext();
            throw th;
        }
    }

    private void sendUndeliverable(InternalMessage internalMessage, ActorRef actorRef) {
        ActorContainerRef sender = internalMessage.getSender();
        try {
            if ((sender instanceof ActorContainerRef) && !internalMessage.isUndeliverable()) {
                sender.getActorContainer().undeliverableMessage(internalMessage, actorRef);
            } else if (internalMessage.isUndeliverable()) {
                logger.error("Receiver for undeliverable message not found: message type '{}' , receiver '{}'", internalMessage.getPayloadClass(), actorRef);
            } else {
                logger.warn("Could not send message undeliverable: original message type '{}' , receiver '{}'", internalMessage.getPayloadClass(), actorRef);
            }
        } catch (Exception e) {
            logger.error("Exception while sending undeliverable message", e);
        }
    }

    private void createActor(ManagedActorContainer managedActorContainer, CreateActorMessage createActorMessage, ActorRef actorRef, InternalMessage internalMessage) throws ClassNotFoundException {
        Class forName = ClassLoadingHelper.getClassHelper().forName(createActorMessage.getActorClass());
        PersistentActor persistentActor = new PersistentActor(managedActorContainer.getKey(), this.internalActorSystem, ManifestTools.extractActorStateVersion(forName), actorRef, createActorMessage.getAffinityKey(), forName, createActorMessage.getInitialState());
        managedActorContainer.getActorCache().put(actorRef, persistentActor);
        managedActorContainer.persistActor(persistentActor);
        doInActorContext(ApplicationProtocol::createActor, managedActorContainer, persistentActor, internalMessage);
    }

    private void destroyActor(ManagedActorContainer managedActorContainer, DestroyActorMessage destroyActorMessage, InternalMessage internalMessage) {
        PersistentActor persistentActor = managedActorContainer.getPersistentActor(destroyActorMessage.getActorRef());
        if (persistentActor != null) {
            managedActorContainer.getActorCache().put(persistentActor.getSelf(), TOMBSTONE);
            managedActorContainer.deleteActor(persistentActor);
            doInActorContext(ApplicationProtocol::destroyActor, managedActorContainer, persistentActor, internalMessage);
        }
    }

    private void persistActor(ManagedActorContainer managedActorContainer, ActorRef actorRef) {
        PersistentActor<ShardKey> persistentActor = managedActorContainer.getPersistentActor(actorRef);
        if (persistentActor == null || persistentActor == TOMBSTONE) {
            return;
        }
        managedActorContainer.persistActor(persistentActor);
    }

    private void cancelScheduledMessage(ManagedActorShard managedActorShard, CancelScheduledMessageMessage cancelScheduledMessageMessage) {
        doSend(new ProducerRecord<>(this.scheduledMessagesTopic, Integer.valueOf(managedActorShard.getKey().getShardId()), cancelScheduledMessageMessage.getMessageId(), (Object) null), KafkaTransactionContext.getProducer());
    }

    private InternalMessage createInternalMessage(ActorRef actorRef, List<? extends ActorRef> list, Object obj) throws IOException {
        return InternalMessageFactory.createWithSerializedPayload(actorRef, list, this.internalActorSystem.getSerializer(obj.getClass()), obj);
    }
}
