/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.reactivex.netty.ConnectConfiguration;
import io.reactivex.netty.ConnectionMetrics;
import io.reactivex.netty.RemoteObservableConfiguration;
import io.reactivex.netty.RemoteRxConnection;
import io.reactivex.netty.RemoteRxEvent;
import io.reactivex.netty.RemoteRxServer;
import io.reactivex.netty.RemoteUnsubscribe;
import io.reactivex.netty.RxEventPipelineConfigurator;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.SubscribeInfo;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.codec.Decoder;
import io.reactivex.netty.codec.Encoder;
import io.reactivex.netty.ingress.IngressPolicies;
import io.reactivex.netty.ingress.IngressPolicy;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfiguratorComposite;
import io.reactivex.netty.slotting.SlottingStrategies;
import io.reactivex.netty.slotting.SlottingStrategy;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

public class RemoteObservable {
    private RemoteObservable() {
    }

    public static <T> RemoteRxConnection<T> connect(final ConnectConfiguration<T> params) {
        final ConnectionMetrics metrics = new ConnectionMetrics();
        return new RemoteRxConnection(Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<T>(){

            public void call(Subscriber<? super T> subscriber) {
                RemoteUnsubscribe remoteUnsubscribe = new RemoteUnsubscribe(params.getName());
                subscriber.add((Subscription)remoteUnsubscribe);
                RemoteObservable.createTcpConnectionToServer(params, remoteUnsubscribe, metrics).subscribe(subscriber);
            }
        }), metrics);
    }

    public static <T> Observable<T> connect(String host, int port, Decoder<T> decoder) {
        return RemoteObservable.connect(new ConnectConfiguration.Builder().host(host).port(port).decoder(decoder).build()).getObservable();
    }

    private static <T> Observable<T> createTcpConnectionToServer(final ConnectConfiguration<T> params, final RemoteUnsubscribe remoteUnsubscribe, final ConnectionMetrics metrics) {
        final PublishSubject proxy = PublishSubject.create();
        final Decoder<T> decoder = params.getDecoder();
        RxNetty.createTcpClient((String)params.getHost(), (int)params.getPort(), (PipelineConfigurator)new PipelineConfiguratorComposite(new PipelineConfigurator[]{new PipelineConfigurator<RemoteRxEvent, RemoteRxEvent>(){

            public void configureNewPipeline(ChannelPipeline pipeline) {
                pipeline.addLast("frameEncoder", (ChannelHandler)new LengthFieldPrepender(4));
                pipeline.addLast("frameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(524288, 0, 4, 0, 4));
            }
        }, new RxEventPipelineConfigurator()})).connect().flatMap((Func1)new Func1<ObservableConnection<RemoteRxEvent, RemoteRxEvent>, Observable<RemoteRxEvent>>(){

            public Observable<RemoteRxEvent> call(ObservableConnection<RemoteRxEvent, RemoteRxEvent> connection) {
                connection.writeAndFlush((Object)RemoteRxEvent.subscribed(params.getName(), params.getSubscribeParameters()));
                remoteUnsubscribe.setConnection(connection);
                return connection.getInput();
            }
        }).retry((long)params.getSubscribeRetryAttempts()).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable t1) {
                params.getSubscribeErrorHandler().call((Object)new SubscribeInfo(params.getHost(), params.getPort(), params.getName(), params.getSubscribeParameters()), (Object)t1);
                if (!params.isSuppressSubscribeErrors()) {
                    proxy.onError(t1);
                }
            }
        }).map(new Func1<RemoteRxEvent, Notification<T>>(){

            public Notification<T> call(RemoteRxEvent rxEvent) {
                if (rxEvent.getType() == RemoteRxEvent.Type.next) {
                    metrics.incrementNextCount();
                    return Notification.createOnNext(decoder.decode(rxEvent.getData()));
                }
                if (rxEvent.getType() == RemoteRxEvent.Type.error) {
                    metrics.incrementErrorCount();
                    return Notification.createOnError((Throwable)RemoteObservable.fromBytesToThrowable(rxEvent.getData()));
                }
                if (rxEvent.getType() == RemoteRxEvent.Type.completed) {
                    metrics.incrementCompletedCount();
                    return Notification.createOnCompleted();
                }
                throw new RuntimeException("RemoteRxEvent of type:" + (Object)((Object)rxEvent.getType()) + ", not supported.");
            }
        }).doOnError((Action1)new Action1<Throwable>(){

            public void call(Throwable t1) {
                params.getDeocdingErrorHandler().call(null, (Object)t1);
                if (!params.isSuppressDecodingErrors()) {
                    proxy.onError(t1);
                }
            }
        }).dematerialize().subscribe(new Observer<T>(){

            public void onCompleted() {
                proxy.onCompleted();
            }

            public void onError(Throwable e) {
                proxy.onError(e);
            }

            public void onNext(T t) {
                proxy.onNext(t);
            }
        });
        return proxy;
    }

    public static <T> RemoteRxServer serve(int port, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(RemoteObservable.configureServerFromParams(null, port, observable, encoder, SlottingStrategies.noSlotting(), IngressPolicies.allowAll()));
    }

    public static <T> RemoteRxServer serve(int port, String name, Observable<T> observable, Encoder<T> encoder) {
        return new RemoteRxServer(RemoteObservable.configureServerFromParams(name, port, observable, encoder, SlottingStrategies.noSlotting(), IngressPolicies.allowAll()));
    }

    private static <T> RemoteRxServer.Builder configureServerFromParams(String name, int port, Observable<T> observable, Encoder<T> encoder, SlottingStrategy<T> slottingStrategy, IngressPolicy ingressPolicy) {
        return new RemoteRxServer.Builder().port(port).ingressPolicy(ingressPolicy).addObservable(new RemoteObservableConfiguration.Builder().name(name).encoder(encoder).slottingStrategy(slottingStrategy).observable(observable).build());
    }

    static byte[] fromThrowableToBytes(Throwable t) {
        ByteArrayOutputStream baos = null;
        ObjectOutput out = null;
        try {
            baos = new ByteArrayOutputStream();
            out = new ObjectOutputStream(baos);
            out.writeObject(t);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        finally {
            try {
                if (out != null) {
                    out.close();
                }
                if (baos != null) {
                    baos.close();
                }
            }
            catch (IOException e1) {
                e1.printStackTrace();
                throw new RuntimeException(e1);
            }
        }
        return baos.toByteArray();
    }

    static Throwable fromBytesToThrowable(byte[] bytes) {
        Throwable t = null;
        ByteArrayInputStream bis = null;
        ObjectInput in = null;
        try {
            bis = new ByteArrayInputStream(bytes);
            in = new ObjectInputStream(bis);
            t = (Throwable)in.readObject();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        catch (ClassNotFoundException e1) {
            throw new RuntimeException(e1);
        }
        finally {
            try {
                if (bis != null) {
                    bis.close();
                }
                if (in != null) {
                    in.close();
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return t;
    }
}

