/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.rx.java.test;

import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.rx.java.ContextScheduler;
import io.vertx.rx.java.RxHelper;
import io.vertx.test.core.VertxTestBase;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.plugins.RxJavaPlugins;
import rx.plugins.RxJavaSchedulersHook;

public class SchedulerTest
extends VertxTestBase {
    private WorkerExecutor workerExecutor;

    public void setUp() throws Exception {
        super.setUp();
        this.workerExecutor = this.vertx.createSharedWorkerExecutor(this.name.getMethodName());
    }

    protected void tearDown() throws Exception {
        this.workerExecutor.close();
        super.tearDown();
        Method meth = RxJavaPlugins.class.getDeclaredMethod("reset", new Class[0]);
        meth.setAccessible(true);
        meth.invoke((Object)RxJavaPlugins.getInstance(), new Object[0]);
    }

    private void assertEventLoopThread(Thread thread) {
        String threadName = thread.getName();
        this.assertTrue("Was expecting event loop thread instead of " + threadName, threadName.startsWith("vert.x-eventloop-thread"));
    }

    private void assertWorkerThread(Thread thread) {
        String threadName = thread.getName();
        this.assertTrue("Was expecting worker thread instead of " + threadName, threadName.startsWith("vert.x-worker-thread"));
    }

    private void assertWorkerExecutorThread(Thread thread) {
        String threadName = thread.getName();
        this.assertTrue("Was expecting worker executor thread instead of " + threadName, threadName.startsWith(this.name.getMethodName()));
    }

    @Test
    public void testScheduleImmediatly() throws Exception {
        this.testScheduleImmediatly(() -> new ContextScheduler(this.vertx, false), this::assertEventLoopThread);
    }

    @Test
    public void testScheduleImmediatlyBlocking() throws Exception {
        this.testScheduleImmediatly(() -> new ContextScheduler(this.vertx, true), this::assertWorkerThread);
    }

    @Test
    public void testScheduleImmediatlyWorkerExecutor() throws Exception {
        this.testScheduleImmediatly(() -> new ContextScheduler(this.workerExecutor), this::assertWorkerExecutorThread);
    }

    private void testScheduleImmediatly(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        AtomicReference thread = new AtomicReference();
        worker.schedule(() -> {
            thread.set(Thread.currentThread());
            latch.countDown();
        }, 0L, TimeUnit.MILLISECONDS);
        this.awaitLatch(latch);
        threadAssert.accept((Thread)thread.get());
    }

    @Test
    public void testScheduleObserveOnReturnsOnTheCorrectThread() {
        Context testContext = this.vertx.getOrCreateContext();
        AtomicBoolean isOnVertxThread = new AtomicBoolean();
        testContext.runOnContext(v -> {
            ContextScheduler scheduler = new ContextScheduler(testContext, false);
            Observable observable = Observable.create(subscriber -> {
                isOnVertxThread.set(Context.isOnVertxThread());
                subscriber.onNext((Object)"expected");
                subscriber.onCompleted();
            }).observeOn((Scheduler)scheduler).doOnNext(o -> this.assertEquals(Vertx.currentContext(), testContext));
            new Thread(() -> observable.subscribe(item -> this.assertEquals("expected", item), arg_0 -> ((SchedulerTest)this).fail(arg_0), () -> ((SchedulerTest)this).testComplete())).start();
        });
        this.await();
        this.assertFalse(isOnVertxThread.get());
    }

    @Test
    public void testScheduleWithDelayObserveOnReturnsOnTheCorrectThread() {
        Context testContext = this.vertx.getOrCreateContext();
        AtomicBoolean isOnVertxThread = new AtomicBoolean();
        testContext.runOnContext(v -> {
            ContextScheduler scheduler = new ContextScheduler(testContext, false);
            Observable observable = Observable.create(subscriber -> {
                isOnVertxThread.set(Context.isOnVertxThread());
                subscriber.onNext((Object)"expected");
                subscriber.onCompleted();
            }).delay(10L, TimeUnit.MILLISECONDS, (Scheduler)scheduler).doOnNext(o -> this.assertEquals(Vertx.currentContext(), testContext));
            new Thread(() -> observable.subscribe(item -> this.assertEquals("expected", item), arg_0 -> ((SchedulerTest)this).fail(arg_0), () -> ((SchedulerTest)this).testComplete())).start();
        });
        this.assertFalse(isOnVertxThread.get());
        this.await();
    }

    @Test
    public void testScheduleDelayed() throws Exception {
        this.testScheduleDelayed(() -> new ContextScheduler(this.vertx, false), this::assertEventLoopThread);
    }

    @Test
    public void testScheduleDelayedBlocking() throws Exception {
        this.testScheduleDelayed(() -> new ContextScheduler(this.vertx, true), this::assertWorkerThread);
    }

    @Test
    public void testScheduleDelayedWorkerExecutor() throws Exception {
        this.testScheduleDelayed(() -> new ContextScheduler(this.workerExecutor), this::assertWorkerExecutorThread);
    }

    private void testScheduleDelayed(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) throws Exception {
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        long time = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference thread = new AtomicReference();
        AtomicLong execTime = new AtomicLong();
        worker.schedule(() -> {
            thread.set(Thread.currentThread());
            execTime.set(System.currentTimeMillis() - time);
            latch.countDown();
        }, 40L, TimeUnit.MILLISECONDS);
        this.awaitLatch(latch);
        threadAssert.accept((Thread)thread.get());
        this.assertTrue(execTime.get() >= 40L);
    }

    @Test
    public void testSchedulePeriodic() {
        this.testSchedulePeriodic(() -> new ContextScheduler(this.vertx, false), this::assertEventLoopThread);
    }

    @Test
    public void testSchedulePeriodicBlocking() {
        this.testSchedulePeriodic(() -> new ContextScheduler(this.vertx, true), this::assertWorkerThread);
    }

    @Test
    public void testSchedulePeriodicWorkerExecutor() {
        this.testSchedulePeriodic(() -> new ContextScheduler(this.workerExecutor), this::assertWorkerExecutorThread);
    }

    private void testSchedulePeriodic(Supplier<ContextScheduler> scheduler, Consumer<Thread> threadAssert) {
        this.disableThreadChecks();
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        AtomicLong time = new AtomicLong(System.currentTimeMillis() - 40L);
        AtomicInteger count = new AtomicInteger();
        AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
        sub.set(worker.schedulePeriodically(() -> {
            threadAssert.accept(Thread.currentThread());
            if (count.incrementAndGet() > 2) {
                ((Subscription)sub.get()).unsubscribe();
                this.testComplete();
            } else {
                long now = System.currentTimeMillis();
                long delta = now - time.get();
                this.assertTrue("" + delta, delta >= 40L);
                time.set(now);
            }
        }, 0L, 40L, TimeUnit.MILLISECONDS));
        this.await();
    }

    @Test
    public void testUnsubscribeBeforeExecute() throws Exception {
        this.testUnsubscribeBeforeExecute(() -> new ContextScheduler(this.vertx, false));
    }

    @Test
    public void testUnsubscribeBeforeExecuteBlocking() throws Exception {
        this.testUnsubscribeBeforeExecute(() -> new ContextScheduler(this.vertx, true));
    }

    @Test
    public void testUnsubscribeBeforeExecuteWorkerExecutor() throws Exception {
        this.testUnsubscribeBeforeExecute(() -> new ContextScheduler(this.workerExecutor));
    }

    private void testUnsubscribeBeforeExecute(Supplier<ContextScheduler> scheduler) throws Exception {
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        CountDownLatch latch = new CountDownLatch(1);
        Subscription sub = worker.schedule(latch::countDown, 20L, TimeUnit.MILLISECONDS);
        sub.unsubscribe();
        this.assertFalse(latch.await(40L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testUnsubscribeDuringExecute() throws Exception {
        this.testUnsubscribeDuringExecute(() -> new ContextScheduler(this.vertx, false));
    }

    @Test
    public void testUnsubscribeDuringExecuteBlocking() throws Exception {
        this.testUnsubscribeDuringExecute(() -> new ContextScheduler(this.vertx, true));
    }

    @Test
    public void testUnsubscribeDuringExecuteWorkerExecutor() throws Exception {
        this.testUnsubscribeDuringExecute(() -> new ContextScheduler(this.workerExecutor));
    }

    private void testUnsubscribeDuringExecute(Supplier<ContextScheduler> scheduler) throws Exception {
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        AtomicInteger count = new AtomicInteger();
        AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
        sub.set(worker.schedulePeriodically(() -> {
            if (count.getAndIncrement() == 0) {
                ((Subscription)sub.get()).unsubscribe();
            }
        }, 0L, 5L, TimeUnit.MILLISECONDS));
        Thread.sleep(60L);
        this.assertEquals(1L, count.get());
    }

    @Test
    public void testUnsubscribeBetweenActions() throws Exception {
        this.testUnsubscribeBetweenActions(() -> new ContextScheduler(this.vertx, false));
    }

    @Test
    public void testUnsubscribeBetweenActionsBlocking() throws Exception {
        this.testUnsubscribeBetweenActions(() -> new ContextScheduler(this.vertx, true));
    }

    @Test
    public void testUnsubscribeBetweenActionsWorkerExecutor() throws Exception {
        this.testUnsubscribeBetweenActions(() -> new ContextScheduler(this.workerExecutor));
    }

    private void testUnsubscribeBetweenActions(Supplier<ContextScheduler> scheduler) throws Exception {
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        AtomicInteger count = new AtomicInteger();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
        sub.set(worker.schedulePeriodically(() -> {
            if (count.incrementAndGet() == 4) {
                latch.countDown();
            }
        }, 0L, 20L, TimeUnit.MILLISECONDS));
        this.awaitLatch(latch);
        ((Subscription)sub.get()).unsubscribe();
        Thread.sleep(60L);
        this.assertEquals(4L, count.get());
    }

    @Test
    public void testWorkerUnsubscribe() throws Exception {
        this.testWorkerUnsubscribe(() -> new ContextScheduler(this.vertx, false));
    }

    @Test
    public void testWorkerUnsubscribeBlocking() throws Exception {
        this.testWorkerUnsubscribe(() -> new ContextScheduler(this.vertx, true));
    }

    @Test
    public void testWorkerUnsubscribeWorkerExecutor() throws Exception {
        this.testWorkerUnsubscribe(() -> new ContextScheduler(this.workerExecutor));
    }

    private void testWorkerUnsubscribe(Supplier<ContextScheduler> scheduler) throws Exception {
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        CountDownLatch latch = new CountDownLatch(2);
        Subscription sub1 = worker.schedule(latch::countDown, 40L, TimeUnit.MILLISECONDS);
        Subscription sub2 = worker.schedule(latch::countDown, 40L, TimeUnit.MILLISECONDS);
        worker.unsubscribe();
        this.assertTrue(sub1.isUnsubscribed());
        this.assertTrue(sub2.isUnsubscribed());
        this.assertFalse(latch.await(40L, TimeUnit.MILLISECONDS));
        this.assertEquals(2L, latch.getCount());
    }

    @Test
    public void testPeriodicRescheduleAfterActionBlocking() {
        ContextScheduler scheduler2 = new ContextScheduler(this.vertx, true);
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        AtomicBoolean b = new AtomicBoolean();
        long time = System.nanoTime();
        worker.schedulePeriodically(() -> this.lambda$testPeriodicRescheduleAfterActionBlocking$36(b, time, (Scheduler.Worker)worker), 20L, 20L, TimeUnit.MILLISECONDS);
        this.await();
    }

    @Test
    public void testSchedulerHook() throws Exception {
        this.testSchedulerHook(() -> new ContextScheduler(this.vertx, false));
    }

    @Test
    public void testSchedulerHookBlocking() throws Exception {
        this.testSchedulerHook(() -> new ContextScheduler(this.vertx, true));
    }

    @Test
    public void testSchedulerHookWorkerExecutor() throws Exception {
        this.testSchedulerHook(() -> new ContextScheduler(this.workerExecutor));
    }

    private void testSchedulerHook(Supplier<ContextScheduler> scheduler) throws Exception {
        RxJavaPlugins plugins = RxJavaPlugins.getInstance();
        final AtomicInteger scheduled = new AtomicInteger();
        final AtomicInteger called = new AtomicInteger();
        final CountDownLatch latchCalled = new CountDownLatch(1);
        plugins.registerSchedulersHook(new RxJavaSchedulersHook(){

            public Action0 onSchedule(Action0 action) {
                scheduled.incrementAndGet();
                return () -> {
                    action.call();
                    called.getAndIncrement();
                    latchCalled.countDown();
                };
            }
        });
        ContextScheduler scheduler2 = scheduler.get();
        ContextScheduler.ContextWorker worker = scheduler2.createWorker();
        this.assertEquals(0L, scheduled.get());
        this.assertEquals(0L, called.get());
        CountDownLatch latch = new CountDownLatch(1);
        AtomicInteger workerScheduledVal = new AtomicInteger();
        AtomicInteger workerCalledVal = new AtomicInteger();
        worker.schedule(() -> {
            workerScheduledVal.set(scheduled.get());
            workerCalledVal.set(called.get());
            latch.countDown();
        }, 0L, TimeUnit.SECONDS);
        this.awaitLatch(latch);
        this.awaitLatch(latchCalled);
        this.assertEquals(1L, scheduled.get());
        this.assertEquals(1L, called.get());
        this.assertEquals(1L, workerScheduledVal.get());
        this.assertEquals(0L, workerCalledVal.get());
    }

    @Test
    public void testRemovedFromContextAfterRun() throws Exception {
        ContextScheduler scheduler = (ContextScheduler)RxHelper.blockingScheduler((Vertx)this.vertx);
        ContextScheduler.ContextWorker worker = scheduler.createWorker();
        CountDownLatch latch = new CountDownLatch(1);
        worker.schedule(latch::countDown);
        this.awaitLatch(latch);
        SchedulerTest.waitUntil(() -> worker.countActions() == 0);
    }

    @Test
    public void testRemovedFromContextAfterDelay() throws Exception {
        ContextScheduler scheduler = (ContextScheduler)RxHelper.blockingScheduler((Vertx)this.vertx);
        ContextScheduler.ContextWorker worker = scheduler.createWorker();
        CountDownLatch latch = new CountDownLatch(1);
        worker.schedule(latch::countDown, 10L, TimeUnit.MILLISECONDS);
        this.awaitLatch(latch);
        SchedulerTest.waitUntil(() -> worker.countActions() == 0);
    }

    @Test
    public void testUnsubscribePeriodicInTask() throws Exception {
        ContextScheduler scheduler = (ContextScheduler)RxHelper.blockingScheduler((Vertx)this.vertx);
        ContextScheduler.ContextWorker worker = scheduler.createWorker();
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> ref = new AtomicReference<Subscription>();
        ref.set(worker.schedulePeriodically(() -> {
            Subscription sub;
            while ((sub = (Subscription)ref.get()) == null) {
                Thread.yield();
            }
            sub.unsubscribe();
            latch.countDown();
        }, 10L, 10L, TimeUnit.MILLISECONDS));
        this.awaitLatch(latch);
        SchedulerTest.waitUntil(() -> worker.countActions() == 0);
    }

    private /* synthetic */ void lambda$testPeriodicRescheduleAfterActionBlocking$36(AtomicBoolean b, long time, Scheduler.Worker worker) {
        if (b.compareAndSet(false, true)) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                this.fail();
            }
        } else {
            this.assertTrue(System.nanoTime() - time > TimeUnit.NANOSECONDS.convert(50L, TimeUnit.MILLISECONDS));
            worker.unsubscribe();
            this.testComplete();
        }
    }
}

