package hu.akarnokd.reactive4java.reactive;

import hu.akarnokd.reactive4java.base.Action1;
import hu.akarnokd.reactive4java.base.Func1;
import hu.akarnokd.reactive4java.base.Observable;
import hu.akarnokd.reactive4java.base.Observer;
import hu.akarnokd.reactive4java.base.Scheduler;
import hu.akarnokd.reactive4java.util.Closeables;
import hu.akarnokd.reactive4java.util.CompositeCloseable;
import hu.akarnokd.reactive4java.util.DefaultObserverEx;
import hu.akarnokd.reactive4java.util.DefaultRunnable;
import hu.akarnokd.reactive4java.util.Observers;
import hu.akarnokd.reactive4java.util.Producer;
import hu.akarnokd.reactive4java.util.R4JConfigManager;
import hu.akarnokd.reactive4java.util.SequentialCloseable;
import hu.akarnokd.reactive4java.util.SingleCloseable;
import hu.akarnokd.reactive4java.util.Sink;
import java.io.Closeable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay.class */
public final class Delay {

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$ByObservable.class */
    public static class ByObservable<T, U, V> extends Producer<T> {
        private Observable<? extends T> source;
        private Observable<U> registerDelay;
        private Func1<? super T, ? extends Observable<V>> delaySelector;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$ByObservable$ResultSink.class */
        public class ResultSink extends Sink<T> implements Observer<T> {
            protected final CompositeCloseable delays;
            protected final Lock lock;

            @GuardedBy("lock")
            protected boolean atEnd;
            protected final SequentialCloseable registration;

            /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$ByObservable$ResultSink$DelayObserver.class */
            class DelayObserver implements Observer<V> {
                private final T value;
                private final Closeable self;

                public DelayObserver(T t, Closeable closeable) {
                    this.value = t;
                    this.self = closeable;
                }

                @Override // hu.akarnokd.reactive4java.base.BaseObserver
                public void error(@Nonnull Throwable th) {
                    ((Observer) ResultSink.this.observer.get()).error(th);
                    ResultSink.this.closeSilently();
                }

                @Override // hu.akarnokd.reactive4java.base.BaseObserver
                public void finish() {
                    deliver();
                }

                @Override // hu.akarnokd.reactive4java.base.Observer
                public void next(V v) {
                    deliver();
                }

                protected void deliver() {
                    ResultSink.this.lock.lock();
                    try {
                        ((Observer) ResultSink.this.observer.get()).next(this.value);
                        ResultSink.this.delays.removeSilently(this.self);
                        ResultSink.this.checkDone();
                    } finally {
                        ResultSink.this.lock.unlock();
                    }
                }
            }

            /* JADX INFO: Access modifiers changed from: package-private */
            /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$ByObservable$ResultSink$StartObserver.class */
            public class StartObserver implements Observer<U> {
                StartObserver() {
                }

                @Override // hu.akarnokd.reactive4java.base.BaseObserver
                public void error(@Nonnull Throwable th) {
                    ((Observer) ResultSink.this.observer.get()).error(th);
                    ResultSink.this.closeSilently();
                }

                @Override // hu.akarnokd.reactive4java.base.BaseObserver
                public void finish() {
                    ResultSink.this.start();
                }

                @Override // hu.akarnokd.reactive4java.base.Observer
                public void next(U u) {
                    ResultSink.this.start();
                }
            }

            public ResultSink(Observer<? super T> observer, Closeable closeable) {
                super(observer, closeable);
                this.delays = new CompositeCloseable(new Closeable[0]);
                this.lock = new ReentrantLock(R4JConfigManager.get().useFairLocks());
                this.registration = new SequentialCloseable();
            }

            public Closeable run() {
                if (ByObservable.this.registerDelay == null) {
                    start();
                } else {
                    this.registration.set(Observers.registerSafe(ByObservable.this.registerDelay, new StartObserver()));
                }
                return new CompositeCloseable(this.registration, this.delays);
            }

            protected void start() {
                this.registration.set(Observers.registerSafe(ByObservable.this.source, this));
            }

            @Override // hu.akarnokd.reactive4java.base.Observer
            public void next(T t) {
                try {
                    Observable observable = (Observable) ByObservable.this.delaySelector.invoke(t);
                    SingleCloseable singleCloseable = new SingleCloseable();
                    this.delays.add(singleCloseable);
                    singleCloseable.set(Observers.registerSafe(observable, new DelayObserver(t, singleCloseable)));
                } catch (Throwable th) {
                    error(th);
                }
            }

            @Override // hu.akarnokd.reactive4java.base.BaseObserver
            public void error(@Nonnull Throwable th) {
                this.lock.lock();
                try {
                    this.observer.get().error(th);
                    closeSilently();
                } finally {
                    this.lock.unlock();
                }
            }

            @Override // hu.akarnokd.reactive4java.base.BaseObserver
            public void finish() {
                this.lock.lock();
                try {
                    this.atEnd = true;
                    Closeables.closeSilently((Closeable) this.registration);
                    checkDone();
                } finally {
                    this.lock.unlock();
                }
            }

            @GuardedBy("lock")
            protected void checkDone() {
                if (this.atEnd && this.delays.isEmpty()) {
                    this.observer.get().finish();
                    closeSilently();
                }
            }
        }

        public ByObservable(@Nonnull Observable<? extends T> observable, @Nullable Observable<U> observable2, @Nonnull Func1<? super T, ? extends Observable<V>> func1) {
            this.source = observable;
            this.registerDelay = observable2;
            this.delaySelector = func1;
        }

        @Override // hu.akarnokd.reactive4java.util.Producer
        protected Closeable run(Observer<? super T> observer, Closeable closeable, Action1<Closeable> action1) {
            ResultSink resultSink = new ResultSink(observer, closeable);
            action1.invoke(resultSink);
            return resultSink.run();
        }
    }

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$ByTime.class */
    public static class ByTime<T> implements Observable<T> {

        @Nonnull
        private final Observable<? extends T> source;
        private final long time;

        @Nonnull
        private final TimeUnit unit;

        @Nonnull
        private final Scheduler pool;
        private boolean instantError;

        public ByTime(@Nonnull Observable<? extends T> observable, long j, @Nonnull TimeUnit timeUnit, @Nonnull Scheduler scheduler, boolean z) {
            this.pool = scheduler;
            this.unit = timeUnit;
            this.source = observable;
            this.time = j;
            this.instantError = z;
        }

        @Override // hu.akarnokd.reactive4java.base.Observable
        @Nonnull
        public Closeable register(@Nonnull final Observer<? super T> observer) {
            return new DefaultObserverEx<T>(false) { // from class: hu.akarnokd.reactive4java.reactive.Delay.ByTime.1

                @Nonnull
                final BlockingQueue<Closeable> outstanding = new LinkedBlockingQueue();

                @Override // hu.akarnokd.reactive4java.util.DefaultObserverEx, hu.akarnokd.reactive4java.util.DefaultObserver
                public void onClose() {
                    LinkedList linkedList = new LinkedList();
                    this.outstanding.drainTo(linkedList);
                    Iterator it = linkedList.iterator();
                    while (it.hasNext()) {
                        Closeables.closeSilently((Closeable) it.next());
                    }
                }

                @Override // hu.akarnokd.reactive4java.util.DefaultObserver
                public void onError(@Nonnull Throwable th) {
                    if (ByTime.this.instantError) {
                        this.outstanding.add(ByTime.this.pool.schedule(new OnError(this.lock, this.outstanding, observer, th, this), ByTime.this.time, ByTime.this.unit));
                    } else {
                        observer.error(th);
                        close();
                    }
                }

                @Override // hu.akarnokd.reactive4java.util.DefaultObserver
                public void onFinish() {
                    this.outstanding.add(ByTime.this.pool.schedule(new OnFinish(this.lock, this.outstanding, observer, this), ByTime.this.time, ByTime.this.unit));
                }

                @Override // hu.akarnokd.reactive4java.util.DefaultObserver
                public void onNext(T t) {
                    this.outstanding.add(ByTime.this.pool.schedule(new OnNext(this.lock, this.outstanding, observer, t), ByTime.this.time, ByTime.this.unit));
                }
            }.registerWith(this.source);
        }
    }

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$DelayedObservation.class */
    public static abstract class DelayedObservation<T> extends DefaultRunnable {
        protected final Observer<? super T> observer;
        protected final BlockingQueue<Closeable> queue;

        public DelayedObservation(@Nonnull Lock lock, @Nonnull BlockingQueue<Closeable> blockingQueue, @Nonnull Observer<? super T> observer) {
            super(lock);
            this.queue = blockingQueue;
            this.observer = observer;
        }

        @Override // hu.akarnokd.reactive4java.util.DefaultRunnable
        public final void onRun() {
            try {
                onRun2();
            } finally {
                this.queue.poll();
            }
        }

        public abstract void onRun2();
    }

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$OnError.class */
    public static class OnError<T> extends DelayedObservation<T> {

        @Nonnull
        private final Throwable error;

        @Nonnull
        private final Closeable c;

        public OnError(@Nonnull Lock lock, @Nonnull BlockingQueue<Closeable> blockingQueue, @Nonnull Observer<? super T> observer, @Nonnull Throwable th, @Nonnull Closeable closeable) {
            super(lock, blockingQueue, observer);
            this.error = th;
            this.c = closeable;
        }

        @Override // hu.akarnokd.reactive4java.reactive.Delay.DelayedObservation
        public void onRun2() {
            this.observer.error(this.error);
            Closeables.closeSilently(this.c);
        }
    }

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$OnFinish.class */
    public static class OnFinish<T> extends DelayedObservation<T> {

        @Nonnull
        private final Closeable c;

        public OnFinish(@Nonnull Lock lock, @Nonnull BlockingQueue<Closeable> blockingQueue, @Nonnull Observer<? super T> observer, @Nonnull Closeable closeable) {
            super(lock, blockingQueue, observer);
            this.c = closeable;
        }

        @Override // hu.akarnokd.reactive4java.reactive.Delay.DelayedObservation
        public void onRun2() {
            this.observer.finish();
            Closeables.closeSilently(this.c);
        }
    }

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$OnNext.class */
    public static class OnNext<T> extends DelayedObservation<T> {
        private final T value;

        public OnNext(@Nonnull Lock lock, @Nonnull BlockingQueue<Closeable> blockingQueue, @Nonnull Observer<? super T> observer, T t) {
            super(lock, blockingQueue, observer);
            this.value = t;
        }

        @Override // hu.akarnokd.reactive4java.reactive.Delay.DelayedObservation
        public void onRun2() {
            this.observer.next(this.value);
        }
    }

    /* loaded from: input_file:hu/akarnokd/reactive4java/reactive/Delay$Registration.class */
    public static class Registration<T> implements Observable<T> {
        private Observable<? extends T> source;
        private long time;
        private TimeUnit unit;
        private Scheduler pool;

        public Registration(Observable<? extends T> observable, long j, TimeUnit timeUnit, Scheduler scheduler) {
            this.source = observable;
            this.time = j;
            this.unit = timeUnit;
            this.pool = scheduler;
        }

        @Override // hu.akarnokd.reactive4java.base.Observable
        @Nonnull
        public Closeable register(@Nonnull final Observer<? super T> observer) {
            SingleCloseable singleCloseable = new SingleCloseable();
            final SingleCloseable singleCloseable2 = new SingleCloseable();
            singleCloseable.set(this.pool.schedule(new DefaultRunnable() { // from class: hu.akarnokd.reactive4java.reactive.Delay.Registration.1
                @Override // hu.akarnokd.reactive4java.util.DefaultRunnable
                protected void onRun() {
                    singleCloseable2.set(Observers.registerSafe(Registration.this.source, observer));
                }
            }, this.time, this.unit));
            return new CompositeCloseable(singleCloseable, singleCloseable2);
        }
    }

    private Delay() {
    }
}
