package ratpack.rx;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import ratpack.exec.ExecController;
import ratpack.exec.ExecStarter;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.UnmanagedThreadException;
import ratpack.func.Action;
import ratpack.registry.RegistrySpec;
import ratpack.rx.internal.DefaultSchedulers;
import ratpack.rx.internal.ExecControllerBackedScheduler;
import ratpack.stream.Streams;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.OnCompletedFailedException;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

@Deprecated
/* loaded from: input_file:ratpack/rx/RxRatpack.class */
public abstract class RxRatpack {

    /* loaded from: input_file:ratpack/rx/RxRatpack$ErrorHandler.class */
    private static class ErrorHandler extends RxJavaErrorHandler {
        private ErrorHandler() {
        }

        public void handleError(Throwable th) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ratpack/rx/RxRatpack$ExecutionBackedSubscriber.class */
    public static class ExecutionBackedSubscriber<T> extends Subscriber<T> {
        private final Subscriber<? super T> subscriber;

        public ExecutionBackedSubscriber(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.subscriber = subscriber;
        }

        public void onCompleted() {
            try {
                this.subscriber.onCompleted();
            } catch (OnErrorNotImplementedException e) {
                Promise.error(e.getCause()).then(Action.noop());
            }
        }

        public void onError(Throwable th) {
            try {
                this.subscriber.onError(th);
            } catch (OnErrorNotImplementedException e) {
                Promise.error(e.getCause()).then(Action.noop());
            }
        }

        public void onNext(T t) {
            try {
                this.subscriber.onNext(t);
            } catch (OnErrorNotImplementedException e) {
                Promise.error(e.getCause()).then(Action.noop());
            }
        }
    }

    /* loaded from: input_file:ratpack/rx/RxRatpack$ExecutionHook.class */
    private static class ExecutionHook extends RxJavaObservableExecutionHook {
        private ExecutionHook() {
        }

        public <T> Throwable onSubscribeError(Throwable th) {
            if (th instanceof OnCompletedFailedException) {
                Promise.error(th).then(Action.noop());
            }
            return th;
        }

        public <T> Observable.OnSubscribe<T> onSubscribeStart(Observable<? extends T> observable, Observable.OnSubscribe<T> onSubscribe) {
            return (Observable.OnSubscribe) ExecController.current().map(execController -> {
                return executionBackedOnSubscribe(onSubscribe);
            }).orElse(onSubscribe);
        }

        private <T> Observable.OnSubscribe<T> executionBackedOnSubscribe(Observable.OnSubscribe<T> onSubscribe) {
            return subscriber -> {
                onSubscribe.call(new ExecutionBackedSubscriber(subscriber));
            };
        }
    }

    private RxRatpack() {
    }

    public static void initialize() {
        RxJavaPlugins rxJavaPlugins = RxJavaPlugins.getInstance();
        try {
            rxJavaPlugins.registerObservableExecutionHook(new ExecutionHook());
        } catch (IllegalStateException e) {
            RxJavaObservableExecutionHook observableExecutionHook = rxJavaPlugins.getObservableExecutionHook();
            if (!(observableExecutionHook instanceof ExecutionHook)) {
                throw new IllegalStateException("Cannot install RxJava integration because another execution hook (" + observableExecutionHook.getClass() + ") is already installed");
            }
        }
        try {
            rxJavaPlugins.registerErrorHandler(new ErrorHandler());
        } catch (IllegalStateException e2) {
            RxJavaErrorHandler errorHandler = rxJavaPlugins.getErrorHandler();
            if (!(errorHandler instanceof ErrorHandler)) {
                throw new IllegalStateException("Cannot install RxJava integration because another error handler (" + errorHandler.getClass() + ") is already installed");
            }
        }
        try {
            rxJavaPlugins.registerSchedulersHook(new DefaultSchedulers());
        } catch (IllegalStateException e3) {
            RxJavaSchedulersHook schedulersHook = rxJavaPlugins.getSchedulersHook();
            if (!(schedulersHook instanceof DefaultSchedulers)) {
                throw new IllegalStateException("Cannot install RxJava integration because another set of default schedulers (" + schedulersHook.getClass() + ") is already installed");
            }
        }
    }

    public static <T> Observable<T> observe(Promise<T> promise) {
        return Observable.create(subscriber -> {
            Objects.requireNonNull(subscriber);
            promise.onError(subscriber::onError).then(obj -> {
                subscriber.onNext(obj);
                subscriber.onCompleted();
            });
        });
    }

    public static Observable<Void> observe(Operation operation) {
        return Observable.create(subscriber -> {
            Objects.requireNonNull(subscriber);
            Operation onError = operation.onError(subscriber::onError);
            Objects.requireNonNull(subscriber);
            onError.then(subscriber::onCompleted);
        });
    }

    public static <T, I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise) {
        return Observable.merge(observe(promise).map(Observable::from));
    }

    public static <T> Promise<List<T>> promise(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Observable list = observable.toList();
            Objects.requireNonNull(downstream);
            Action1 action1 = (v1) -> {
                r1.success(v1);
            };
            Objects.requireNonNull(downstream);
            list.subscribe(action1, downstream::error);
        });
    }

    public static <T> Promise<List<T>> promise(Observable.OnSubscribe<T> onSubscribe) throws UnmanagedThreadException {
        return promise(Observable.create(onSubscribe));
    }

    public static <T> Promise<T> promiseSingle(Observable<T> observable) throws UnmanagedThreadException {
        return Promise.async(downstream -> {
            Observable single = observable.single();
            Objects.requireNonNull(downstream);
            Action1 action1 = downstream::success;
            Objects.requireNonNull(downstream);
            single.subscribe(action1, downstream::error);
        });
    }

    public static <T> Promise<T> promiseSingle(Observable.OnSubscribe<T> onSubscribe) throws UnmanagedThreadException {
        return promiseSingle(Observable.create(onSubscribe));
    }

    public static <T> TransformablePublisher<T> publisher(Observable<T> observable) {
        return Streams.transformable(RxReactiveStreams.toPublisher(observable));
    }

    public static <T> TransformablePublisher<T> publisher(Observable.OnSubscribe<T> onSubscribe) {
        return publisher(Observable.create(onSubscribe));
    }

    public static <T> Observable<T> bindExec(Observable<T> observable) {
        return (Observable) Exceptions.uncheck(() -> {
            return (Observable) promise(observable).to(RxRatpack::observeEach);
        });
    }

    public static <T> Observable<T> fork(Observable<T> observable) {
        return observeEach(promise(observable).fork());
    }

    public static <T> Observable<T> fork(Observable<T> observable, Action<? super RegistrySpec> action) throws Exception {
        return observeEach(promise(observable).fork(execSpec -> {
            execSpec.register(action);
        }));
    }

    public static <T> Observable<T> forkEach(Observable<T> observable) {
        return forkEach(observable, Action.noop());
    }

    public static <T> Observable<T> forkEach(Observable<T> observable, Action<? super RegistrySpec> action) {
        return observable.lift(subscriber -> {
            return new Subscriber<T>(subscriber) { // from class: ratpack.rx.RxRatpack.1
                private final AtomicInteger wip = new AtomicInteger(1);
                private final AtomicBoolean closed = new AtomicBoolean();

                public void onCompleted() {
                    maybeDone();
                }

                public void onError(Throwable th) {
                    Subscriber subscriber = subscriber;
                    terminate(() -> {
                        subscriber.onError(th);
                    });
                }

                private void maybeDone() {
                    if (this.wip.decrementAndGet() == 0) {
                        Subscriber subscriber = subscriber;
                        Objects.requireNonNull(subscriber);
                        terminate(subscriber::onCompleted);
                    }
                }

                private void terminate(Runnable runnable) {
                    if (this.closed.compareAndSet(false, true)) {
                        runnable.run();
                    }
                }

                public void onNext(T t) {
                    if (isUnsubscribed() || this.closed.get()) {
                        return;
                    }
                    this.wip.incrementAndGet();
                    ExecStarter onError = Execution.fork().register(action).onComplete(execution -> {
                        maybeDone();
                    }).onError(this::onError);
                    Subscriber subscriber = subscriber;
                    onError.start(execution2 -> {
                        if (this.closed.get()) {
                            return;
                        }
                        subscriber.onNext(t);
                    });
                }
            };
        });
    }

    public static Scheduler scheduler(ExecController execController) {
        return new ExecControllerBackedScheduler(execController);
    }

    public static Scheduler scheduler() {
        return scheduler(ExecController.require());
    }
}
