/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsoftware.elasticactors.kafka.cluster;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.elasticsoftware.elasticactors.ActorRef;
import org.elasticsoftware.elasticactors.ActorSystem;
import org.elasticsoftware.elasticactors.ElasticActor;
import org.elasticsoftware.elasticactors.PersistentSubscription;
import org.elasticsoftware.elasticactors.PublisherNotFoundException;
import org.elasticsoftware.elasticactors.cluster.InternalActorSystem;
import org.elasticsoftware.elasticactors.cluster.tasks.ActorLifecycleTask;
import org.elasticsoftware.elasticactors.kafka.cluster.InternalSubscriberContext;
import org.elasticsoftware.elasticactors.kafka.cluster.SubscriberContextImpl;
import org.elasticsoftware.elasticactors.messaging.InternalMessage;
import org.elasticsoftware.elasticactors.messaging.reactivestreams.CancelMessage;
import org.elasticsoftware.elasticactors.messaging.reactivestreams.CompletedMessage;
import org.elasticsoftware.elasticactors.messaging.reactivestreams.NextMessage;
import org.elasticsoftware.elasticactors.messaging.reactivestreams.RequestMessage;
import org.elasticsoftware.elasticactors.messaging.reactivestreams.SubscribeMessage;
import org.elasticsoftware.elasticactors.messaging.reactivestreams.SubscriptionMessage;
import org.elasticsoftware.elasticactors.reactivestreams.InternalPersistentSubscription;
import org.elasticsoftware.elasticactors.serialization.MessageDeserializer;
import org.elasticsoftware.elasticactors.serialization.SerializationAccessor;
import org.elasticsoftware.elasticactors.serialization.SerializationContext;
import org.elasticsoftware.elasticactors.state.MessageSubscriber;
import org.elasticsoftware.elasticactors.state.PersistentActor;
import org.elasticsoftware.elasticactors.util.ClassLoadingHelper;
import org.elasticsoftware.elasticactors.util.SerializationTools;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ReactiveStreamsProtocol {
    private static final Logger logger = LoggerFactory.getLogger(ReactiveStreamsProtocol.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Boolean handleUndeliverableMessage(InternalActorSystem actorSystem, PersistentActor persistentActor, ElasticActor receiver, ActorRef receiverRef, InternalMessage internalMessage) {
        try {
            Object message = SerializationTools.deserializeMessage((SerializationAccessor)actorSystem, (InternalMessage)internalMessage);
            if (message instanceof NextMessage) {
                NextMessage nextMessage = (NextMessage)message;
                return persistentActor.removeSubscriber(nextMessage.getMessageName(), new MessageSubscriber(internalMessage.getSender()));
            }
            if (message instanceof CompletedMessage) {
                return false;
            }
            if (message instanceof CancelMessage) {
                CancelMessage cancelMessage = (CancelMessage)message;
                return persistentActor.removeSubscription(cancelMessage.getMessageName(), internalMessage.getSender());
            }
            if (message instanceof RequestMessage) {
                RequestMessage requestMessage = (RequestMessage)message;
                persistentActor.cancelSubscription(requestMessage.getMessageName(), internalMessage.getSender());
                return persistentActor.removeSubscription(requestMessage.getMessageName(), internalMessage.getSender());
            }
            if (message instanceof SubscribeMessage) {
                SubscribeMessage subscribeMessage = (SubscribeMessage)message;
                Optional persistentSubscription = persistentActor.getSubscription(subscribeMessage.getMessageName(), internalMessage.getSender());
                if (persistentSubscription.isPresent()) {
                    persistentActor.removeSubscription(subscribeMessage.getMessageName(), internalMessage.getSender());
                    InternalPersistentSubscription currentSubscription = (InternalPersistentSubscription)persistentSubscription.get();
                    InternalSubscriberContext.setContext(new SubscriberContextImpl(persistentActor, internalMessage.getSender(), (ActorSystem)actorSystem, (PersistentSubscription)currentSubscription));
                    try {
                        currentSubscription.getSubscriber().onError((Throwable)new PublisherNotFoundException(String.format("Actor[%s] does not exist", internalMessage.getSender().toString()), internalMessage.getSender()));
                    }
                    catch (Exception e) {
                        logger.error("Unexpected Exception while calling onError on Subscriber with type {} of Actor {}", new Object[]{currentSubscription.getSubscriber() != null ? currentSubscription.getSubscriber().getClass().getSimpleName() : null, receiverRef, e});
                    }
                    finally {
                        InternalSubscriberContext.clearContext();
                    }
                    return true;
                }
                return false;
            }
            if (message instanceof SubscriptionMessage) {
                SubscriptionMessage subscriptionMessage = (SubscriptionMessage)message;
                return persistentActor.removeSubscriber(subscriptionMessage.getMessageName(), new MessageSubscriber(internalMessage.getSender()));
            }
        }
        catch (Exception e) {
            logger.error("Exception while Deserializing Message class {} in ActorSystem [{}]", new Object[]{internalMessage.getPayloadClass(), actorSystem.getName(), e});
        }
        return false;
    }

    public static Boolean handleMessage(InternalActorSystem actorSystem, PersistentActor persistentActor, ElasticActor receiver, ActorRef receiverRef, InternalMessage internalMessage) {
        try {
            Object message = SerializationTools.deserializeMessage((SerializationAccessor)actorSystem, (InternalMessage)internalMessage);
            if (message instanceof NextMessage) {
                return ReactiveStreamsProtocol.handle((NextMessage)message, persistentActor, receiver, receiverRef, internalMessage.getSender(), actorSystem);
            }
            if (message instanceof SubscribeMessage) {
                ReactiveStreamsProtocol.handle((SubscribeMessage)message, receiverRef, ((SubscribeMessage)message).getSubscriberRef(), persistentActor);
            } else if (message instanceof CancelMessage) {
                ReactiveStreamsProtocol.handle((CancelMessage)message, ((CancelMessage)message).getSubscriberRef(), persistentActor);
            } else if (message instanceof RequestMessage) {
                ReactiveStreamsProtocol.handle((RequestMessage)message, internalMessage.getSender(), persistentActor);
            } else if (message instanceof SubscriptionMessage) {
                ReactiveStreamsProtocol.handle((SubscriptionMessage)message, receiverRef, internalMessage.getSender(), persistentActor, actorSystem);
            } else if (message instanceof CompletedMessage) {
                ReactiveStreamsProtocol.handle((CompletedMessage)message, receiverRef, internalMessage.getSender(), persistentActor, actorSystem);
            }
            return true;
        }
        catch (Exception e) {
            logger.error("Exception while Deserializing Message class {} in ActorSystem [{}]", new Object[]{internalMessage.getPayloadClass(), actorSystem.getName(), e});
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Boolean handle(NextMessage nextMessage, PersistentActor persistentActor, ElasticActor receiver, ActorRef receiverRef, ActorRef publisherRef, InternalActorSystem actorSystem) {
        Optional persistentSubscription = persistentActor.getSubscription(nextMessage.getMessageName(), publisherRef);
        if (persistentSubscription.isPresent()) {
            InternalPersistentSubscription currentSubscription = (InternalPersistentSubscription)persistentSubscription.get();
            InternalSubscriberContext.setContext(new SubscriberContextImpl(persistentActor, publisherRef, (ActorSystem)actorSystem, (PersistentSubscription)currentSubscription));
            try {
                Class messageClass = ClassLoadingHelper.getClassHelper().forName(nextMessage.getMessageName());
                MessageDeserializer deserializer = actorSystem.getDeserializer(messageClass);
                Object message = SerializationContext.deserialize((MessageDeserializer)deserializer, (ByteBuffer)ByteBuffer.wrap(nextMessage.getMessageBytes()));
                currentSubscription.getSubscriber().onNext(message);
                Boolean bl = ActorLifecycleTask.shouldUpdateState((ElasticActor)receiver, (Object)message);
                return bl;
            }
            catch (ClassNotFoundException e) {
                logger.error("Actor[{}]: Could not find message type: <{}>, unable to deserialize subscribed message", (Object)receiverRef, (Object)nextMessage.getMessageName());
            }
            catch (IOException e) {
                logger.error("Actor[{}]: Problem trying to deserialize message embedded in NextMessage", (Object)receiverRef, (Object)e);
            }
            catch (Exception e) {
                logger.error("Unexpected Exception while calling onNext on Subscriber with type {} of Actor {}", new Object[]{currentSubscription.getSubscriber() != null ? currentSubscription.getSubscriber().getClass().getSimpleName() : null, receiverRef, e});
            }
            finally {
                InternalSubscriberContext.clearContext();
            }
        } else {
            logger.error("Subscriber {} is missing PersistentSubscription for Publisher {} while handling NextMessage", (Object)receiverRef, (Object)publisherRef);
            publisherRef.tell((Object)new CancelMessage(receiverRef, nextMessage.getMessageName()));
        }
        return false;
    }

    private static void handle(SubscribeMessage subscribeMessage, ActorRef receiverRef, ActorRef subscriberRef, PersistentActor persistentActor) {
        persistentActor.addSubscriber(subscribeMessage.getMessageName(), new MessageSubscriber(subscriberRef));
        subscriberRef.tell((Object)new SubscriptionMessage(subscribeMessage.getMessageName()), receiverRef);
    }

    private static void handle(CancelMessage cancelMessage, ActorRef subscriberRef, PersistentActor persistentActor) {
        if (persistentActor.removeSubscriber(cancelMessage.getMessageName(), new MessageSubscriber(subscriberRef))) {
            subscriberRef.tell((Object)new CompletedMessage(cancelMessage.getMessageName()));
        }
    }

    private static void handle(RequestMessage requestMessage, ActorRef subscriberRef, PersistentActor persistentActor) {
        if (persistentActor.getMessageSubscribers() != null) {
            persistentActor.getMessageSubscribers().get((Object)requestMessage.getMessageName()).stream().filter(m -> m.getSubscriberRef().equals(subscriberRef)).findFirst().ifPresent(messageSubscriber -> messageSubscriber.incrementAndGet(requestMessage.getN()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void handle(SubscriptionMessage subscriptionMessage, ActorRef subscriberRef, ActorRef publisherRef, PersistentActor persistentActor, InternalActorSystem actorSystem) {
        Optional persistentSubscription = persistentActor.getSubscription(subscriptionMessage.getMessageName(), publisherRef);
        if (persistentSubscription.isPresent()) {
            InternalPersistentSubscription currentSubscription = (InternalPersistentSubscription)persistentSubscription.get();
            InternalSubscriberContext.setContext(new SubscriberContextImpl(persistentActor, publisherRef, (ActorSystem)actorSystem, (PersistentSubscription)currentSubscription));
            try {
                ((InternalPersistentSubscription)persistentSubscription.get()).getSubscriber().onSubscribe((Subscription)persistentSubscription.get());
            }
            catch (Exception e) {
                logger.error("Unexpected Exception while calling onSubscribe on Subscriber with type {} of Actor {}", new Object[]{currentSubscription.getSubscriber() != null ? currentSubscription.getSubscriber().getClass().getSimpleName() : null, subscriberRef, e});
            }
            finally {
                InternalSubscriberContext.clearContext();
            }
        } else {
            logger.error("Subscriber {} is missing PersistentSubscription for Publisher {} while handling SubscriptionMessage", (Object)subscriberRef, (Object)publisherRef);
            publisherRef.tell((Object)new CancelMessage(subscriberRef, subscriptionMessage.getMessageName()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void handle(CompletedMessage completedMessage, ActorRef subscriberRef, ActorRef publisherRef, PersistentActor persistentActor, InternalActorSystem actorSystem) {
        Optional persistentSubscription = persistentActor.getSubscription(completedMessage.getMessageName(), publisherRef);
        if (persistentSubscription.isPresent()) {
            InternalPersistentSubscription currentSubscription = (InternalPersistentSubscription)persistentSubscription.get();
            InternalSubscriberContext.setContext(new SubscriberContextImpl(persistentActor, publisherRef, (ActorSystem)actorSystem, (PersistentSubscription)currentSubscription));
            try {
                ((InternalPersistentSubscription)persistentSubscription.get()).getSubscriber().onComplete();
            }
            catch (Exception e) {
                logger.error("Unexpected Exception while calling onComplete on Subscriber with type {} of Actor {}", new Object[]{currentSubscription.getSubscriber() != null ? currentSubscription.getSubscriber().getClass().getSimpleName() : null, subscriberRef, e});
            }
            finally {
                InternalSubscriberContext.clearContext();
                persistentActor.removeSubscription(completedMessage.getMessageName(), publisherRef);
            }
        } else {
            logger.error("Subscriber {} is missing PersistentSubscription for Publisher {} while handling CompletedMessage", (Object)subscriberRef, (Object)publisherRef);
        }
    }
}

