/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.Endpoint;
import io.reactivex.mantis.remote.observable.ConnectToGroupedObservable;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.EndpointChange;
import io.reactivex.mantis.remote.observable.EndpointInjector;
import io.reactivex.mantis.remote.observable.MergedObservable;
import io.reactivex.mantis.remote.observable.RemoteObservable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subscriptions.BooleanSubscription;

public class FixedConnectionSet<T> {
    private static final Logger logger = LoggerFactory.getLogger(FixedConnectionSet.class);
    private EndpointInjector endpointInjector;
    private Func1<Endpoint, Observable<T>> toObservableFunc;
    private MergedObservable<T> mergedObservable;

    public FixedConnectionSet(int expectedTerminalCount, EndpointInjector endpointInjector, Func1<Endpoint, Observable<T>> toObservableFunc) {
        this.endpointInjector = endpointInjector;
        this.toObservableFunc = toObservableFunc;
        this.mergedObservable = MergedObservable.create(expectedTerminalCount);
    }

    public static <K, V> FixedConnectionSet<GroupedObservable<K, V>> create(int expectedTerminalCount, final ConnectToGroupedObservable.Builder<K, V> config, EndpointInjector endpointService) {
        Func1 toObservableFunc = new Func1<Endpoint, Observable<GroupedObservable<K, V>>>(){

            public Observable<GroupedObservable<K, V>> call(Endpoint endpoint) {
                config.host(endpoint.getHost()).port(endpoint.getPort());
                return RemoteObservable.connect(config.build()).getObservable();
            }
        };
        return new FixedConnectionSet<GroupedObservable<K, V>>(expectedTerminalCount, endpointService, toObservableFunc);
    }

    public static <T> FixedConnectionSet<T> create(int expectedTerminalCount, final ConnectToObservable.Builder<T> config, EndpointInjector endpointService) {
        Func1 toObservableFunc = new Func1<Endpoint, Observable<T>>(){

            public Observable<T> call(Endpoint endpoint) {
                config.host(endpoint.getHost()).port(endpoint.getPort());
                return RemoteObservable.connect(config.build()).getObservable();
            }
        };
        return new FixedConnectionSet<T>(expectedTerminalCount, endpointService, toObservableFunc);
    }

    public Observable<Observable<T>> getObservables() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<Observable<T>>(){

            public void call(final Subscriber<? super Observable<T>> subscriber) {
                final BooleanSubscription subscription = new BooleanSubscription();
                subscriber.add(new Subscription(){

                    public void unsubscribe() {
                        FixedConnectionSet.this.mergedObservable.clear();
                        subscription.unsubscribe();
                    }

                    public boolean isUnsubscribed() {
                        return subscription.isUnsubscribed();
                    }
                });
                subscriber.add(FixedConnectionSet.this.mergedObservable.get().subscribe(new Observer<Observable<T>>(){

                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    public void onError(Throwable e) {
                        subscriber.onError(e);
                    }

                    public void onNext(Observable<T> t) {
                        subscriber.onNext(t);
                    }
                }));
                subscriber.add(FixedConnectionSet.this.endpointInjector.deltas().subscribe((Action1)new Action1<EndpointChange>(){

                    public void call(EndpointChange ec) {
                        String id = Endpoint.uniqueHost((String)ec.getEndpoint().getHost(), (int)ec.getEndpoint().getPort(), (String)ec.getEndpoint().getSlotId());
                        if (EndpointChange.Type.add == ec.getType()) {
                            logger.info("Adding new connection to host: " + ec.getEndpoint().getHost() + " at port: " + ec.getEndpoint().getPort() + " with id: " + id);
                            FixedConnectionSet.this.mergedObservable.mergeIn(id, (Observable)FixedConnectionSet.this.toObservableFunc.call((Object)ec.getEndpoint()), (Action1<Throwable>)ec.getEndpoint().getErrorCallback(), ec.getEndpoint().getCompletedCallback());
                        } else if (EndpointChange.Type.complete == ec.getType()) {
                            logger.info("Forcing connection to complete host: " + ec.getEndpoint().getHost() + " at port: " + ec.getEndpoint().getPort() + " with id: " + id);
                            FixedConnectionSet.this.mergedObservable.forceComplete(id);
                        }
                    }
                }));
            }
        });
    }
}

