/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.util.concurrent;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.neo4j.util.concurrent.AsyncApply;
import org.neo4j.util.concurrent.BinaryLatch;
import org.neo4j.util.concurrent.Work;
import org.neo4j.util.concurrent.WorkSync;

class WorkSyncTest {
    private static ExecutorService executor;
    private UnsynchronisedAdder sum = new UnsynchronisedAdder();
    private UnsynchronisedAdder count = new UnsynchronisedAdder();
    private Adder adder = new Adder();
    private WorkSync<Adder, AddWork> sync = new WorkSync((Object)this.adder);
    private Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

    WorkSyncTest() {
    }

    @BeforeAll
    static void startExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterAll
    static void stopExecutor() {
        executor.shutdownNow();
    }

    private static void usleep(long micros) {
        long now;
        long deadline = System.nanoTime() + TimeUnit.MICROSECONDS.toNanos(micros);
        while ((now = System.nanoTime()) < deadline) {
        }
    }

    @AfterEach
    private void refillSemaphore() {
        this.semaphore.drainPermits();
        this.semaphore.release(Integer.MAX_VALUE);
    }

    private Future<Void> makeWorkStuckAtSemaphore(int delta) {
        this.semaphore.drainPermits();
        Future<Void> concurrentWork = executor.submit(new CallableWork(new AddWork(delta)));
        Assertions.assertThrows(TimeoutException.class, () -> {
            Void cfr_ignored_0 = (Void)concurrentWork.get(10L, TimeUnit.MILLISECONDS);
        });
        while (!this.semaphore.hasQueuedThreads()) {
            WorkSyncTest.usleep(1L);
        }
        return concurrentWork;
    }

    @Test
    void mustApplyWork() throws Exception {
        this.sync.apply((Work)new AddWork(10));
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)10L));
        this.sync.apply((Work)new AddWork(20));
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)30L));
    }

    @Test
    void mustCombineWork() throws Exception {
        final BinaryLatch startLatch = new BinaryLatch();
        final BinaryLatch blockLatch = new BinaryLatch();
        FutureTask<Void> blocker = new FutureTask<Void>(new CallableWork(new AddWork(1){

            @Override
            public void apply(Adder adder) {
                super.apply(adder);
                startLatch.release();
                blockLatch.await();
            }
        }));
        new Thread(blocker).start();
        startLatch.await();
        ArrayList<FutureTask<Void>> tasks = new ArrayList<FutureTask<Void>>();
        tasks.add(blocker);
        for (int i = 0; i < 20; ++i) {
            CallableWork callableWork = new CallableWork(new AddWork(1));
            FutureTask<Void> futureTask = new FutureTask<Void>(callableWork);
            tasks.add(futureTask);
            Thread thread = new Thread(futureTask);
            thread.start();
            while (thread.getState() != Thread.State.TIMED_WAITING) {
            }
        }
        blockLatch.release();
        for (FutureTask futureTask : tasks) {
            futureTask.get();
        }
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.lessThan((Comparable)Long.valueOf(this.sum.sum())));
    }

    @Test
    void mustApplyWorkEvenWhenInterrupted() throws Exception {
        Thread.currentThread().interrupt();
        this.sync.apply((Work)new AddWork(10));
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)10L));
        Assertions.assertTrue((boolean)Thread.interrupted());
    }

    @Test
    void mustRecoverFromExceptions() throws Exception {
        final AtomicBoolean broken = new AtomicBoolean(true);
        Adder adder = new Adder(){

            @Override
            public void add(int delta) {
                if (broken.get()) {
                    throw new IllegalStateException("boom!");
                }
                super.add(delta);
            }
        };
        this.sync = new WorkSync((Object)adder);
        try {
            executor.submit(new CallableWork(new AddWork(10))).get();
            Assertions.fail((String)"Should have thrown");
        }
        catch (ExecutionException exception) {
            MatcherAssert.assertThat((Object)exception.getCause(), (Matcher)Matchers.instanceOf(ExecutionException.class));
            exception = (ExecutionException)exception.getCause();
            MatcherAssert.assertThat((Object)exception.getCause(), (Matcher)Matchers.instanceOf(IllegalStateException.class));
        }
        broken.set(false);
        this.sync.apply((Work)new AddWork(20));
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)20L));
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.is((Object)1L));
    }

    @Test
    void mustNotApplyWorkInParallelUnderStress() throws Exception {
        int workers = Runtime.getRuntime().availableProcessors() * 5;
        int iterations = 1000;
        int incrementValue = 42;
        CountDownLatch startLatch = new CountDownLatch(workers);
        CountDownLatch endLatch = new CountDownLatch(workers);
        AtomicBoolean start = new AtomicBoolean();
        Callable<Void> work = () -> {
            boolean spin;
            startLatch.countDown();
            while (spin = !start.get()) {
            }
            for (int i = 0; i < iterations; ++i) {
                this.sync.apply((Work)new AddWork(incrementValue));
            }
            endLatch.countDown();
            return null;
        };
        ArrayList<Future<Void>> futureList = new ArrayList<Future<Void>>();
        for (int i = 0; i < workers; ++i) {
            futureList.add(executor.submit(work));
        }
        startLatch.await();
        start.set(true);
        endLatch.await();
        for (Future future : futureList) {
            future.get();
        }
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.lessThan((Comparable)Long.valueOf(workers * iterations)));
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)(incrementValue * workers * iterations)));
    }

    @Test
    void mustNotApplyAsyncWorkInParallelUnderStress() throws Exception {
        int workers = Runtime.getRuntime().availableProcessors() * 5;
        int iterations = 1000;
        int incrementValue = 42;
        CountDownLatch startLatch = new CountDownLatch(workers);
        CountDownLatch endLatch = new CountDownLatch(workers);
        AtomicBoolean start = new AtomicBoolean();
        Callable<Void> work = () -> {
            boolean spin;
            startLatch.countDown();
            while (spin = !start.get()) {
            }
            ThreadLocalRandom rng = ThreadLocalRandom.current();
            ArrayList<AsyncApply> asyncs = new ArrayList<AsyncApply>();
            for (int i = 0; i < iterations; ++i) {
                asyncs.add(this.sync.applyAsync((Work)new AddWork(incrementValue)));
                if (rng.nextInt(10) != 0) continue;
                for (AsyncApply async : asyncs) {
                    async.await();
                }
                asyncs.clear();
            }
            for (AsyncApply async : asyncs) {
                async.await();
            }
            endLatch.countDown();
            return null;
        };
        ArrayList<Future<Void>> futureList = new ArrayList<Future<Void>>();
        for (int i = 0; i < workers; ++i) {
            futureList.add(executor.submit(work));
        }
        startLatch.await();
        start.set(true);
        endLatch.await();
        for (Future future : futureList) {
            future.get();
        }
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.lessThan((Comparable)Long.valueOf(workers * iterations)));
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)(incrementValue * workers * iterations)));
    }

    @Test
    void mustApplyWorkAsync() throws Exception {
        AsyncApply a = this.sync.applyAsync((Work)new AddWork(10));
        a.await();
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)10L));
        AsyncApply b = this.sync.applyAsync((Work)new AddWork(20));
        AsyncApply c = this.sync.applyAsync((Work)new AddWork(30));
        b.await();
        c.await();
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)60L));
    }

    @Test
    void mustCombineWorkAsync() throws Exception {
        this.makeWorkStuckAtSemaphore(1);
        AsyncApply a = this.sync.applyAsync((Work)new AddWork(1));
        AsyncApply b = this.sync.applyAsync((Work)new AddWork(1));
        AsyncApply c = this.sync.applyAsync((Work)new AddWork(1));
        this.semaphore.release(2);
        a.await();
        b.await();
        c.await();
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)4L));
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.is((Object)2L));
    }

    @Test
    void mustApplyWorkAsyncEvenWhenInterrupted() throws Exception {
        Thread.currentThread().interrupt();
        this.sync.applyAsync((Work)new AddWork(10)).await();
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)10L));
        Assertions.assertTrue((boolean)Thread.interrupted());
    }

    @Test
    void asyncWorkThatThrowsMustRememberException() {
        final RuntimeException boo = new RuntimeException("boo");
        AsyncApply asyncApply = this.sync.applyAsync((Work)new AddWork(10){

            @Override
            public void apply(Adder adder) {
                super.apply(adder);
                throw boo;
            }
        });
        try {
            asyncApply.await();
            Assertions.fail((String)"Should have thrown");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)Matchers.sameInstance((Object)boo));
        }
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)10L));
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.is((Object)1L));
        try {
            asyncApply.await();
            Assertions.fail((String)"Should have thrown");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)Matchers.sameInstance((Object)boo));
        }
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)10L));
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.is((Object)1L));
    }

    @Test
    void asyncWorkThatThrowsInAwaitMustRememberException() throws Exception {
        Future<Void> stuckAtSemaphore = this.makeWorkStuckAtSemaphore(1);
        final RuntimeException boo = new RuntimeException("boo");
        AsyncApply asyncApply = this.sync.applyAsync((Work)new AddWork(10){

            @Override
            public void apply(Adder adder) {
                super.apply(adder);
                throw boo;
            }
        });
        this.refillSemaphore();
        stuckAtSemaphore.get();
        try {
            asyncApply.await();
            Assertions.fail((String)"Should have thrown");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)Matchers.sameInstance((Object)boo));
        }
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)11L));
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.is((Object)2L));
        try {
            asyncApply.await();
            Assertions.fail((String)"Should have thrown");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)Matchers.sameInstance((Object)boo));
        }
        MatcherAssert.assertThat((Object)this.sum.sum(), (Matcher)Matchers.is((Object)11L));
        MatcherAssert.assertThat((Object)this.count.sum(), (Matcher)Matchers.is((Object)2L));
    }

    private static class UnsynchronisedAdder {
        private volatile long value;

        private UnsynchronisedAdder() {
        }

        public void add(long delta) {
            long v = this.value;
            Thread.yield();
            WorkSyncTest.usleep(ThreadLocalRandom.current().nextInt(50));
            this.value = v + delta;
        }

        public void increment() {
            this.add(1L);
        }

        public long sum() {
            return this.value;
        }
    }

    private class CallableWork
    implements Callable<Void> {
        private final AddWork addWork;

        CallableWork(AddWork addWork) {
            this.addWork = addWork;
        }

        @Override
        public Void call() throws ExecutionException {
            WorkSyncTest.this.sync.apply((Work)this.addWork);
            return null;
        }
    }

    private class Adder {
        private Adder() {
        }

        public void add(int delta) {
            WorkSyncTest.this.semaphore.acquireUninterruptibly();
            WorkSyncTest.this.sum.add(delta);
            WorkSyncTest.this.count.increment();
        }
    }

    private static class AddWork
    implements Work<Adder, AddWork> {
        private int delta;

        private AddWork(int delta) {
            this.delta = delta;
        }

        public AddWork combine(AddWork work) {
            this.delta += work.delta;
            return this;
        }

        public void apply(Adder adder) {
            WorkSyncTest.usleep(50L);
            adder.add(this.delta);
        }
    }
}

