package org.elasticsoftware.elasticactors.messaging;

import javax.annotation.Nullable;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.messaging.internal.InternalHashKeyUtils;
import org.elasticsoftware.elasticactors.serialization.MessageDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elasticsoftware/elasticactors/messaging/MultiMessageQueueProxy.class */
public final class MultiMessageQueueProxy implements MessageQueueProxy {
    private static final Logger logger = LoggerFactory.getLogger(MultiMessageQueueProxy.class);
    private final Hasher hasher;
    private final MessageQueueFactory messageQueueFactory;
    private final MessageHandler messageHandler;
    private final ActorRef actorRef;
    private final MessageQueue[] messageQueues;

    public MultiMessageQueueProxy(Hasher hasher, MessageQueueFactory messageQueueFactory, MessageHandler messageHandler, ActorRef actorRef, int i) {
        this.hasher = hasher;
        this.messageQueueFactory = messageQueueFactory;
        this.messageHandler = messageHandler;
        this.actorRef = actorRef;
        if (i <= 0) {
            throw new IllegalArgumentException("Number of queues must be greater than 0");
        }
        this.messageQueues = new MessageQueue[i];
    }

    @Override // org.elasticsoftware.elasticactors.messaging.MessageQueueProxy
    public synchronized void init() throws Exception {
        logger.info("Initializing queue proxy for [{}/{}] in Multi-Queue mode with {} queues", new Object[]{this.actorRef.getActorCluster(), this.actorRef.getActorPath(), Integer.valueOf(this.messageQueues.length)});
        this.messageQueues[0] = this.messageQueueFactory.create(this.actorRef.getActorPath(), this.messageHandler);
        for (int i = 1; i < this.messageQueues.length; i++) {
            this.messageQueues[i] = this.messageQueueFactory.create(this.actorRef.getActorPath() + "-queue-" + i, this.messageHandler);
        }
    }

    @Override // org.elasticsoftware.elasticactors.messaging.MessageQueueProxy
    public void destroy() {
        logger.info("Destroying queue proxy for [{}/{}]", this.actorRef.getActorCluster(), this.actorRef.getActorPath());
        for (MessageQueue messageQueue : this.messageQueues) {
            messageQueue.destroy();
        }
    }

    @Override // org.elasticsoftware.elasticactors.messaging.MessageQueueProxy
    public void offerInternalMessage(InternalMessage internalMessage) {
        if (this.messageQueues.length == 1) {
            sendToBucket(0, internalMessage);
            return;
        }
        String determineMessageQueueKey = determineMessageQueueKey(internalMessage);
        if (determineMessageQueueKey != null) {
            sendToBucket(getBucket(determineMessageQueueKey), internalMessage);
            return;
        }
        if (internalMessage.getReceivers() != null && internalMessage.getReceivers().size() <= 1) {
            sendToBucket(SplittableUtils.calculateBucketForEmptyOrSingleActor(internalMessage.getReceivers(), this.hasher, this.messageQueues.length), internalMessage);
        } else if (internalMessage instanceof Splittable) {
            ((Splittable) internalMessage).splitInBuckets(this.hasher, this.messageQueues.length).forEach((v1, v2) -> {
                sendToBucket(v1, v2);
            });
        } else {
            logger.error("Could not detect to which queue to send message of type [{}] wrapped in [{}] to [{}]. Sending it to the default queue.", new Object[]{internalMessage.getPayloadClass(), internalMessage.getClass().getName(), this.actorRef.getActorPath()});
            sendToBucket(0, internalMessage);
        }
    }

    private int getBucket(String str) {
        return Math.abs(this.hasher.hashStringToInt(str)) % this.messageQueues.length;
    }

    private void sendToBucket(int i, InternalMessage internalMessage) {
        if (logger.isDebugEnabled()) {
            logger.debug("Offering message of type [{}] wrapped in a [{}] to [{}] on queue {}", new Object[]{internalMessage.getPayloadClass(), internalMessage.getClass().getName(), internalMessage.getReceivers(), Integer.valueOf(i)});
        }
        this.messageQueues[i].offer(internalMessage);
    }

    private String determineMessageQueueKey(InternalMessage internalMessage) {
        if (internalMessage.getReceivers() == null || internalMessage.getReceivers().size() <= 1) {
            return getMessageQueueKey(internalMessage);
        }
        if (getMessageQueueKey(internalMessage) == null) {
            return null;
        }
        logger.error("Received a message of type [{}] that should be hashed to a specific queue, wrapped in a [{}] but has multiple receivers", internalMessage.getPayloadClass(), internalMessage.getClass().getName());
        return null;
    }

    @Nullable
    private String getMessageQueueKey(InternalMessage internalMessage) {
        if (!internalMessage.hasPayloadObject()) {
            return null;
        }
        try {
            return InternalHashKeyUtils.getMessageQueueAffinityKey(internalMessage.getPayload((MessageDeserializer) null));
        } catch (Exception e) {
            logger.error("Could not determine hashing key for message of type [{}] wrapped in [{}]", new Object[]{internalMessage.getPayloadClass(), internalMessage.getClass().getName(), e});
            return null;
        }
    }
}
