/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.kafka.scheduler;

import java.nio.ByteBuffer;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.elasticsoftware.elasticactors.ActorContainer;
import org.elasticsoftware.elasticactors.ActorContainerRef;
import org.elasticsoftware.elasticactors.ActorContextHolder;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorShard;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessage;
import org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessageKey;
import org.elasticsoftware.elasticactors.cluster.scheduler.ScheduledMessageShardRef;
import org.elasticsoftware.elasticactors.kafka.KafkaActorShard;
import org.elasticsoftware.elasticactors.messaging.ScheduledMessageImpl;
import org.elasticsoftware.elasticactors.scheduler.ScheduledMessageRef;
import org.elasticsoftware.elasticactors.scheduler.Scheduler;
import org.elasticsoftware.elasticactors.serialization.MessageSerializer;

public final class KafkaTopicScheduler
implements Scheduler {
    private final InternalActorSystem actorSystem;

    public KafkaTopicScheduler(InternalActorSystem internalActorSystem) {
        this.actorSystem = internalActorSystem;
    }

    public ScheduledMessageRef scheduleOnce(Object message, ActorRef receiver, long delay, TimeUnit timeUnit) {
        KafkaActorShard actorShard;
        ActorContainer actorContainer;
        ActorRef sender = ActorContextHolder.getSelf();
        if (sender instanceof ActorContainerRef && (actorContainer = ((ActorContainerRef)sender).getActorContainer()) instanceof KafkaActorShard && (actorShard = (KafkaActorShard)actorContainer).getOwningNode() != null && actorShard.getOwningNode().isLocal()) {
            try {
                long fireTime = System.currentTimeMillis() + timeUnit.toMillis(delay);
                MessageSerializer serializer = this.actorSystem.getSerializer(message.getClass());
                ByteBuffer serializedMessage = serializer.serialize(message);
                ScheduledMessageImpl scheduledMessage = new ScheduledMessageImpl(fireTime, sender, receiver, message.getClass(), serializedMessage, message);
                actorShard.schedule((ScheduledMessage)scheduledMessage);
                return new ScheduledMessageShardRef(this.actorSystem.getParent().getClusterName(), (ActorShard)actorShard, new ScheduledMessageKey(scheduledMessage.getId(), fireTime));
            }
            catch (Exception e) {
                throw new RejectedExecutionException(e);
            }
        }
        throw new IllegalStateException("Cannot determine an appropriate ActorRef(self). Only use this method while inside an ElasticActor Lifecycle or on(Message) method on a Persistent Actor!");
    }
}

