/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.rx2;

import com.github.davidmoten.guavamini.Optional;
import com.github.davidmoten.rx2.observable.CachedObservable;
import com.github.davidmoten.rx2.observable.CloseableObservableWithReset;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public final class Observables {
    private Observables() {
    }

    public static <T> CachedObservable<T> cache(Observable<T> source) {
        return new CachedObservable<T>(source);
    }

    public static <T> Observable<T> cache(Observable<T> source, final long duration, final TimeUnit unit, final Scheduler.Worker worker) {
        final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
        CachedObservable<T> cache = new CachedObservable<T>(source);
        cacheRef.set(cache);
        return cache.doOnSubscribe((Consumer)new Consumer<Disposable>(){

            public void accept(Disposable d) {
                Runnable action = new Runnable(){

                    @Override
                    public void run() {
                        ((CachedObservable)((Object)cacheRef.get())).reset();
                    }
                };
                worker.schedule(action, duration, unit);
            }
        });
    }

    public static <T> CloseableObservableWithReset<T> cache(Observable<T> source, final long duration, final TimeUnit unit, final Scheduler scheduler) {
        final AtomicReference<CachedObservable<T>> cacheRef = new AtomicReference<CachedObservable<T>>();
        final AtomicReference<Optional> workerRef = new AtomicReference<Optional>(Optional.absent());
        CachedObservable<T> cache = new CachedObservable<T>(source);
        cacheRef.set(cache);
        Runnable closeAction = new Runnable(){

            @Override
            public void run() {
                Optional w;
                while ((w = (Optional)workerRef.get()) != null) {
                    if (!workerRef.compareAndSet(w, null)) continue;
                    if (w.isPresent()) {
                        ((Scheduler.Worker)w.get()).dispose();
                    }
                    workerRef.set(null);
                    break;
                }
            }
        };
        Runnable resetAction = new Runnable(){

            @Override
            public void run() {
                Observables.startScheduledResetAgain(duration, unit, scheduler, cacheRef, workerRef);
            }
        };
        return new CloseableObservableWithReset<T>(cache, closeAction, resetAction);
    }

    private static <T> void startScheduledResetAgain(long duration, TimeUnit unit, Scheduler scheduler, final AtomicReference<CachedObservable<T>> cacheRef, AtomicReference<Optional<Scheduler.Worker>> workerRef) {
        Optional w;
        Optional<Scheduler.Worker> wOld;
        Runnable action = new Runnable(){

            @Override
            public void run() {
                ((CachedObservable)((Object)cacheRef.get())).reset();
            }
        };
        do {
            if ((wOld = workerRef.get()) != null) continue;
            return;
        } while (!workerRef.compareAndSet(wOld, (Optional<Scheduler.Worker>)(w = Optional.of((Object)scheduler.createWorker()))));
        if (wOld.isPresent()) {
            ((Scheduler.Worker)wOld.get()).dispose();
        }
        ((Scheduler.Worker)w.get()).schedule(action, duration, unit);
    }
}

