/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.messaging;

import com.google.common.collect.ImmutableMap;
import javax.annotation.Nullable;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.messaging.Hasher;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.MessageHandler;
import org.elasticsoftware.elasticactors.messaging.MessageQueue;
import org.elasticsoftware.elasticactors.messaging.MessageQueueFactory;
import org.elasticsoftware.elasticactors.messaging.MessageQueueProxy;
import org.elasticsoftware.elasticactors.messaging.Splittable;
import org.elasticsoftware.elasticactors.messaging.SplittableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MultiMessageQueueProxy
implements MessageQueueProxy {
    private static final Logger logger = LoggerFactory.getLogger(MultiMessageQueueProxy.class);
    private final Hasher hasher;
    private final MessageQueueFactory messageQueueFactory;
    private final MessageHandler messageHandler;
    private final ActorRef actorRef;
    private final MessageQueue[] messageQueues;

    public MultiMessageQueueProxy(Hasher hasher, MessageQueueFactory messageQueueFactory, MessageHandler messageHandler, ActorRef actorRef, int queueCount) {
        this.hasher = hasher;
        this.messageQueueFactory = messageQueueFactory;
        this.messageHandler = messageHandler;
        this.actorRef = actorRef;
        if (queueCount <= 0) {
            throw new IllegalArgumentException("Number of queues must be greater than 0");
        }
        this.messageQueues = new MessageQueue[queueCount];
    }

    @Override
    public synchronized void init() throws Exception {
        logger.info("Initializing queue proxy for [{}/{}] in Multi-Queue mode with {} queues", new Object[]{this.actorRef.getActorCluster(), this.actorRef.getActorPath(), this.messageQueues.length});
        this.messageQueues[0] = this.messageQueueFactory.create(this.actorRef.getActorPath(), this.messageHandler);
        for (int i = 1; i < this.messageQueues.length; ++i) {
            this.messageQueues[i] = this.messageQueueFactory.create(this.actorRef.getActorPath() + "-queue-" + i, this.messageHandler);
        }
    }

    @Override
    public void destroy() {
        logger.info("Destroying queue proxy for [{}/{}]", (Object)this.actorRef.getActorCluster(), (Object)this.actorRef.getActorPath());
        for (MessageQueue messageQueue : this.messageQueues) {
            messageQueue.destroy();
        }
    }

    @Override
    public void offerInternalMessage(InternalMessage message) {
        if (this.messageQueues.length == 1) {
            this.sendToBucket(0, message);
        } else {
            String messageQueueKey = this.determineMessageQueueKey(message);
            if (messageQueueKey != null) {
                this.sendToBucket(this.getBucket(messageQueueKey), message);
            } else if (message.getReceivers() != null && message.getReceivers().size() <= 1) {
                int bucket = SplittableUtils.calculateBucketForEmptyOrSingleActor(message.getReceivers(), this.hasher, this.messageQueues.length);
                this.sendToBucket(bucket, message);
            } else if (message instanceof Splittable) {
                ImmutableMap messagesPerBucket = ((Splittable)message).splitInBuckets(this.hasher, this.messageQueues.length);
                messagesPerBucket.forEach(this::sendToBucket);
            } else {
                logger.error("Could not detect to which queue to send message of type [{}] wrapped in [{}] to [{}]. Sending it to the default queue.", new Object[]{message.getPayloadClass(), message.getClass().getName(), this.actorRef.getActorPath()});
                this.sendToBucket(0, message);
            }
        }
    }

    private int getBucket(String messageQueueKey) {
        return Math.abs(this.hasher.hashStringToInt(messageQueueKey)) % this.messageQueues.length;
    }

    private void sendToBucket(int bucket, InternalMessage message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Offering message of type [{}] wrapped in a [{}] to [{}] on queue {}", new Object[]{message.getPayloadClass(), message.getClass().getName(), message.getReceivers(), bucket});
        }
        this.messageQueues[bucket].offer(message);
    }

    private String determineMessageQueueKey(InternalMessage message) {
        if (message.getReceivers() != null && message.getReceivers().size() > 1) {
            String key = this.getMessageQueueKey(message);
            if (key != null) {
                logger.error("Received a message of type [{}] that should be hashed to a specific queue, wrapped in a [{}] but has multiple receivers", (Object)message.getPayloadClass(), (Object)message.getClass().getName());
            }
            return null;
        }
        return this.getMessageQueueKey(message);
    }

    @Nullable
    private String getMessageQueueKey(InternalMessage message) {
        try {
            return message.getMessageQueueAffinityKey();
        }
        catch (Exception e) {
            logger.error("Could not determine hashing key for message of type [{}] wrapped in [{}]", new Object[]{message.getPayloadClass(), message.getClass().getName(), e});
            return null;
        }
    }
}

