/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.kafka;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorShard;
import org.elasticsoftware.elasticactors.PhysicalNode;
import org.elasticsoftware.elasticactors.ShardKey;
import org.elasticsoftware.elasticactors.cluster.ActorShardRef;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage;
import org.elasticsoftware.elasticactors.kafka.KafkaActorThread;
import org.elasticsoftware.elasticactors.messaging.DefaultInternalMessage;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.serialization.Message;
import org.elasticsoftware.elasticactors.serialization.MessageSerializer;
import org.elasticsoftware.elasticactors.serialization.SerializationContext;

public final class KafkaActorShard
implements ActorShard {
    private final ShardKey key;
    private final ActorRef myRef;
    private final AtomicReference<PhysicalNode> owningNode = new AtomicReference<Object>(null);
    private final KafkaActorThread actorThread;
    private final InternalActorSystem actorSystem;

    public KafkaActorShard(ShardKey key, KafkaActorThread actorThread, InternalActorSystem actorSystem) {
        this.key = key;
        this.myRef = new ActorShardRef(actorSystem, actorSystem.getParent().getClusterName(), (ActorShard)this);
        this.actorThread = actorThread;
        this.actorSystem = actorSystem;
        this.actorThread.assign(this);
    }

    public ShardKey getKey() {
        return this.key;
    }

    public PhysicalNode getOwningNode() {
        return this.owningNode.get();
    }

    public void setOwningNode(PhysicalNode owningNode) {
        this.owningNode.set(owningNode);
    }

    public ActorRef getActorRef() {
        return this.myRef;
    }

    public void sendMessage(ActorRef sender, ActorRef receiver, Object message) throws Exception {
        this.sendMessage(sender, (List<? extends ActorRef>)ImmutableList.of((Object)receiver), message);
    }

    public void sendMessage(ActorRef sender, List<? extends ActorRef> receivers, Object message) throws Exception {
        this.offerInternalMessage(this.createInternalMessage(sender, receivers, message));
    }

    public void undeliverableMessage(InternalMessage message, ActorRef receiverRef) throws Exception {
        DefaultInternalMessage undeliverableMessage = new DefaultInternalMessage(receiverRef, message.getSender(), message.getPayload(), message.getPayloadClass(), message.isDurable(), true, message.getTimeout());
        this.offerInternalMessage((InternalMessage)undeliverableMessage);
    }

    public void offerInternalMessage(InternalMessage internalMessage) {
        this.actorThread.send(this.key, internalMessage);
    }

    public void schedule(ScheduledMessage scheduledMessage) {
        this.actorThread.schedule(this.key, scheduledMessage);
    }

    KafkaActorThread getActorThread() {
        return this.actorThread;
    }

    public void init() throws Exception {
    }

    public void destroy() {
    }

    private InternalMessage createInternalMessage(ActorRef from, List<? extends ActorRef> to, Object message) throws IOException {
        MessageSerializer messageSerializer = this.actorSystem.getSerializer(message.getClass());
        if (messageSerializer == null) {
            throw new IllegalArgumentException("MessageSerializer not found for message of type " + message.getClass().getName());
        }
        Message messageAnnotation = message.getClass().getAnnotation(Message.class);
        boolean durable = messageAnnotation != null && messageAnnotation.durable();
        int timeout = messageAnnotation != null ? messageAnnotation.timeout() : -1;
        return new DefaultInternalMessage(from, ImmutableList.copyOf(to), SerializationContext.serialize((MessageSerializer)messageSerializer, (Object)message), message.getClass().getName(), durable, timeout);
    }
}

