/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty;

import io.reactivex.netty.MutableReference;
import io.reactivex.netty.RemoteObservable;
import io.reactivex.netty.RemoteObservableConfiguration;
import io.reactivex.netty.RemoteObservableException;
import io.reactivex.netty.RemoteRxEvent;
import io.reactivex.netty.ServerMetrics;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.codec.Encoder;
import io.reactivex.netty.ingress.IngressPolicy;
import io.reactivex.netty.slotting.SlotAssignment;
import io.reactivex.netty.slotting.SlotValuePair;
import io.reactivex.netty.slotting.SlottingStrategy;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;

public class RemoteObservableConnectionHandler
implements ConnectionHandler<RemoteRxEvent, RemoteRxEvent> {
    private static final Logger logger = LoggerFactory.getLogger(RemoteObservableConnectionHandler.class);
    private Map<String, RemoteObservableConfiguration> observables;
    private CountDownLatch blockUntilCompleted;
    private ServerMetrics serverMetrics;
    private IngressPolicy ingressPolicy;

    public RemoteObservableConnectionHandler(Map<String, RemoteObservableConfiguration> observables, IngressPolicy ingressPolicy, CountDownLatch blockUntilCompleted, ServerMetrics metrics) {
        this.observables = observables;
        this.ingressPolicy = ingressPolicy;
        this.blockUntilCompleted = blockUntilCompleted;
        this.serverMetrics = metrics;
    }

    public Observable<Void> handle(ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
        logger.debug("Connection received: " + connection.toString());
        if (this.ingressPolicy.allowed(connection)) {
            return this.setupConnection(connection);
        }
        RemoteObservableException e = new RemoteObservableException("Connection rejected due to ingress policy");
        return Observable.error((Throwable)e);
    }

    private <T> void subscribe(MutableReference<Subscription> unsubscribeCallbackReference, final RemoteRxEvent event, final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection, RemoteObservableConfiguration<T> configuration, final SlotAssignment slotAssignment) {
        Observable<T> observable = configuration.getObservable();
        Func1<Map<String, String>, Func1<T, Boolean>> filterFunction = configuration.getFilterFunction();
        SlottingStrategy<T> slottingStrategy = configuration.getSlottingStrategy();
        Func1<SlotValuePair<T>, Boolean> slotAssignmentFunction = slottingStrategy.slottingFunction();
        final Encoder<T> encoder = configuration.getEncoder();
        Subscription subscription = observable.filter((Func1)filterFunction.call(event.getSubscribeParameters())).map(new Func1<T, SlotValuePair<T>>(){

            public SlotValuePair<T> call(T t1) {
                return new SlotValuePair(slotAssignment.getSlotAssignment(), t1);
            }
        }).filter(slotAssignmentFunction).map(new Func1<SlotValuePair<T>, T>(){

            public T call(SlotValuePair<T> pair) {
                return pair.getValue();
            }
        }).subscribe(new Observer<T>(){

            public void onCompleted() {
                connection.writeAndFlush((Object)RemoteRxEvent.completed(event.getName()));
                RemoteObservableConnectionHandler.this.blockUntilCompleted.countDown();
                RemoteObservableConnectionHandler.this.serverMetrics.incrementCompletedCount();
            }

            public void onError(Throwable e) {
                connection.writeAndFlush((Object)RemoteRxEvent.error(event.getName(), RemoteObservable.fromThrowableToBytes(e)));
                RemoteObservableConnectionHandler.this.serverMetrics.incrementErrorCount();
            }

            public void onNext(T t) {
                connection.writeAndFlush((Object)RemoteRxEvent.next(event.getName(), encoder.encode(t)));
                RemoteObservableConnectionHandler.this.serverMetrics.incrementNextCount();
            }
        });
        unsubscribeCallbackReference.setValue(subscription);
    }

    private Observable<Void> handleSubscribeRequest(RemoteRxEvent event, ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection, MutableReference<SlottingStrategy> slottingStrategyReference, MutableReference<Subscription> unsubscribeCallbackReference) {
        String observableName = event.getName();
        RemoteObservableConfiguration config = this.observables.get(observableName);
        if (config == null) {
            return Observable.error((Throwable)new RemoteObservableException("No remote observable configuration found for name: " + observableName));
        }
        if (event.getType() == RemoteRxEvent.Type.subscribed) {
            SlottingStrategy slottingStrategy = config.getSlottingStrategy();
            slottingStrategyReference.setValue(slottingStrategy);
            SlotAssignment slotAssignment = slottingStrategy.assignSlot(connection);
            if (slotAssignment.isAssigned()) {
                this.subscribe(unsubscribeCallbackReference, event, connection, config, slotAssignment);
                this.serverMetrics.incrementSubscribedCount();
                logger.debug("Connection: " + connection.toString() + " subscribed to observable: " + observableName);
            } else {
                return Observable.error((Throwable)new RemoteObservableException("Slot could not be assigned for connection."));
            }
        }
        return Observable.empty();
    }

    private Observable<Void> setupConnection(final ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
        final MutableReference unsubscribeCallbackReference = new MutableReference();
        final MutableReference slottingStrategyReference = new MutableReference();
        return connection.getInput().filter((Func1)new Func1<RemoteRxEvent, Boolean>(){

            public Boolean call(RemoteRxEvent event) {
                boolean supportedOperation = false;
                if (event.getType() == RemoteRxEvent.Type.subscribed || event.getType() == RemoteRxEvent.Type.unsubscribed) {
                    supportedOperation = true;
                }
                return supportedOperation;
            }
        }).flatMap((Func1)new Func1<RemoteRxEvent, Observable<Void>>(){

            public Observable<Void> call(RemoteRxEvent event) {
                if (event.getType() == RemoteRxEvent.Type.subscribed) {
                    return RemoteObservableConnectionHandler.this.handleSubscribeRequest(event, (ObservableConnection<RemoteRxEvent, RemoteRxEvent>)connection, slottingStrategyReference, unsubscribeCallbackReference);
                }
                if (event.getType() == RemoteRxEvent.Type.unsubscribed) {
                    Subscription subscription = (Subscription)unsubscribeCallbackReference.getValue();
                    if (subscription != null) {
                        subscription.unsubscribe();
                    }
                    RemoteObservableConnectionHandler.this.releaseSlot((SlottingStrategy)slottingStrategyReference.getValue(), (ObservableConnection<RemoteRxEvent, RemoteRxEvent>)connection);
                    RemoteObservableConnectionHandler.this.serverMetrics.incrementUnsubscribedCount();
                    logger.debug("Connection: " + connection.toString() + " unsubscribed");
                }
                return Observable.empty();
            }
        }).doOnCompleted(new Action0(){

            public void call() {
                RemoteObservableConnectionHandler.this.releaseSlot((SlottingStrategy)slottingStrategyReference.getValue(), (ObservableConnection<RemoteRxEvent, RemoteRxEvent>)connection);
                logger.debug("Connection: " + connection.toString() + " closed");
            }
        });
    }

    private void releaseSlot(SlottingStrategy slottingStrategy, ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
        if (slottingStrategy != null) {
            slottingStrategy.releaseSlot(connection);
        }
    }
}

