package org.neo4j.util.concurrent;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/neo4j/util/concurrent/WorkSyncTest.class */
public class WorkSyncTest {
    private static ExecutorService executor;
    private final UnsynchronizedAdder sum = new UnsynchronizedAdder();
    private final UnsynchronizedAdder count = new UnsynchronizedAdder();
    private final Adder adder = new Adder();
    private WorkSync<Adder, AddWork> sync = new WorkSync<>(this.adder);
    private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/util/concurrent/WorkSyncTest$AddWork.class */
    public static class AddWork implements Work<Adder, AddWork> {
        private int delta;

        private AddWork(int i) {
            this.delta = i;
        }

        public AddWork combine(AddWork addWork) {
            this.delta += addWork.delta;
            return this;
        }

        @Override // 
        public void apply(Adder adder) {
            WorkSyncTest.usleep(50L);
            adder.add(this.delta);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/util/concurrent/WorkSyncTest$Adder.class */
    public class Adder {
        private Adder() {
        }

        public void add(int i) {
            WorkSyncTest.this.semaphore.acquireUninterruptibly();
            WorkSyncTest.this.sum.add(i);
            WorkSyncTest.this.count.increment();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/util/concurrent/WorkSyncTest$CallableWork.class */
    public class CallableWork implements Callable<Void> {
        private final AddWork addWork;

        CallableWork(AddWork addWork) {
            this.addWork = addWork;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws ExecutionException {
            WorkSyncTest.this.sync.apply(this.addWork);
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/util/concurrent/WorkSyncTest$UnsynchronizedAdder.class */
    public static class UnsynchronizedAdder {
        private volatile long value;

        private UnsynchronizedAdder() {
        }

        public void add(long j) {
            long j2 = this.value;
            Thread.yield();
            WorkSyncTest.usleep(ThreadLocalRandom.current().nextInt(50));
            this.value = j2 + j;
        }

        public void increment() {
            add(1L);
        }

        public long sum() {
            return this.value;
        }
    }

    WorkSyncTest() {
    }

    @BeforeAll
    static void startExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterAll
    static void stopExecutor() {
        executor.shutdownNow();
    }

    private static void usleep(long j) {
        do {
        } while (System.nanoTime() < System.nanoTime() + TimeUnit.MICROSECONDS.toNanos(j));
    }

    @AfterEach
    void refillSemaphore() {
        this.semaphore.drainPermits();
        this.semaphore.release(Integer.MAX_VALUE);
    }

    private Future<Void> makeWorkStuckAtSemaphore(int i) {
        this.semaphore.drainPermits();
        Future<Void> submit = executor.submit(new CallableWork(new AddWork(i)));
        Assertions.assertThrows(TimeoutException.class, () -> {
            submit.get(10L, TimeUnit.MILLISECONDS);
        });
        while (!this.semaphore.hasQueuedThreads()) {
            usleep(1L);
        }
        return submit;
    }

    @Test
    void mustApplyWork() throws Exception {
        this.sync.apply(new AddWork(10));
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(10L);
        this.sync.apply(new AddWork(20));
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(30L);
    }

    @Test
    void mustCombineWork() throws Exception {
        final BinaryLatch binaryLatch = new BinaryLatch();
        final BinaryLatch binaryLatch2 = new BinaryLatch();
        FutureTask futureTask = new FutureTask(new CallableWork(new AddWork(1) { // from class: org.neo4j.util.concurrent.WorkSyncTest.1
            @Override // org.neo4j.util.concurrent.WorkSyncTest.AddWork
            public void apply(Adder adder) {
                super.apply(adder);
                binaryLatch.release();
                binaryLatch2.await();
            }
        }));
        new Thread(futureTask).start();
        binaryLatch.await();
        ArrayList arrayList = new ArrayList();
        arrayList.add(futureTask);
        for (int i = 0; i < 20; i++) {
            FutureTask futureTask2 = new FutureTask(new CallableWork(new AddWork(1)));
            arrayList.add(futureTask2);
            Thread thread = new Thread(futureTask2);
            thread.start();
            do {
            } while (thread.getState() != Thread.State.TIMED_WAITING);
        }
        binaryLatch2.release();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((FutureTask) it.next()).get();
        }
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isLessThan(this.sum.sum());
    }

    @Test
    void mustApplyWorkEvenWhenInterrupted() throws Exception {
        Thread.currentThread().interrupt();
        this.sync.apply(new AddWork(10));
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(10L);
        Assertions.assertTrue(Thread.interrupted());
    }

    @Test
    void mustRecoverFromExceptions() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.sync = new WorkSync<>(new Adder() { // from class: org.neo4j.util.concurrent.WorkSyncTest.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.neo4j.util.concurrent.WorkSyncTest.Adder
            public void add(int i) {
                if (atomicBoolean.get()) {
                    throw new IllegalStateException("boom!");
                }
                super.add(i);
            }
        });
        try {
            executor.submit(new CallableWork(new AddWork(10))).get();
            Assertions.fail("Should have thrown");
        } catch (ExecutionException e) {
            org.assertj.core.api.Assertions.assertThat(e.getCause()).isInstanceOf(ExecutionException.class);
            org.assertj.core.api.Assertions.assertThat(((ExecutionException) e.getCause()).getCause()).isInstanceOf(IllegalStateException.class);
        }
        atomicBoolean.set(false);
        this.sync.apply(new AddWork(20));
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(20L);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isEqualTo(1L);
    }

    @Test
    void mustNotApplyWorkInParallelUnderStress() throws Exception {
        int availableProcessors = Runtime.getRuntime().availableProcessors() * 5;
        int i = 1000;
        int i2 = 42;
        CountDownLatch countDownLatch = new CountDownLatch(availableProcessors);
        CountDownLatch countDownLatch2 = new CountDownLatch(availableProcessors);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Callable callable = () -> {
            countDownLatch.countDown();
            do {
            } while (!atomicBoolean.get());
            for (int i3 = 0; i3 < i; i3++) {
                this.sync.apply(new AddWork(i2));
            }
            countDownLatch2.countDown();
            return null;
        };
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < availableProcessors; i3++) {
            arrayList.add(executor.submit(callable));
        }
        countDownLatch.await();
        atomicBoolean.set(true);
        countDownLatch2.await();
        Futures.getAll(arrayList);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isLessThan(availableProcessors * 1000);
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(42 * availableProcessors * 1000);
    }

    @Test
    void mustNotApplyAsyncWorkInParallelUnderStress() throws Exception {
        int availableProcessors = Runtime.getRuntime().availableProcessors() * 5;
        int i = 1000;
        int i2 = 42;
        CountDownLatch countDownLatch = new CountDownLatch(availableProcessors);
        CountDownLatch countDownLatch2 = new CountDownLatch(availableProcessors);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Callable callable = () -> {
            countDownLatch.countDown();
            do {
            } while (!atomicBoolean.get());
            ThreadLocalRandom current = ThreadLocalRandom.current();
            ArrayList arrayList = new ArrayList();
            for (int i3 = 0; i3 < i; i3++) {
                arrayList.add(this.sync.applyAsync(new AddWork(i2)));
                if (current.nextInt(10) == 0) {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        ((AsyncApply) it.next()).await();
                    }
                    arrayList.clear();
                }
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((AsyncApply) it2.next()).await();
            }
            countDownLatch2.countDown();
            return null;
        };
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < availableProcessors; i3++) {
            arrayList.add(executor.submit(callable));
        }
        countDownLatch.await();
        atomicBoolean.set(true);
        countDownLatch2.await();
        Futures.getAll(arrayList);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isLessThan(availableProcessors * 1000);
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(42 * availableProcessors * 1000);
    }

    @Test
    void mustApplyWorkAsync() throws Exception {
        this.sync.applyAsync(new AddWork(10)).await();
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(10L);
        AsyncApply applyAsync = this.sync.applyAsync(new AddWork(20));
        AsyncApply applyAsync2 = this.sync.applyAsync(new AddWork(30));
        applyAsync.await();
        applyAsync2.await();
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(60L);
    }

    @Test
    void mustCombineWorkAsync() throws Exception {
        makeWorkStuckAtSemaphore(1);
        AsyncApply applyAsync = this.sync.applyAsync(new AddWork(1));
        AsyncApply applyAsync2 = this.sync.applyAsync(new AddWork(1));
        AsyncApply applyAsync3 = this.sync.applyAsync(new AddWork(1));
        this.semaphore.release(2);
        applyAsync.await();
        applyAsync2.await();
        applyAsync3.await();
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(4L);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isEqualTo(2L);
    }

    @Test
    void mustApplyWorkAsyncEvenWhenInterrupted() throws Exception {
        Thread.currentThread().interrupt();
        this.sync.applyAsync(new AddWork(10)).await();
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(10L);
        Assertions.assertTrue(Thread.interrupted());
    }

    @Test
    void asyncWorkThatThrowsMustRememberException() {
        final RuntimeException runtimeException = new RuntimeException("boo");
        AsyncApply applyAsync = this.sync.applyAsync(new AddWork(10) { // from class: org.neo4j.util.concurrent.WorkSyncTest.3
            @Override // org.neo4j.util.concurrent.WorkSyncTest.AddWork
            public void apply(Adder adder) {
                super.apply(adder);
                throw runtimeException;
            }
        });
        try {
            applyAsync.await();
            Assertions.fail("Should have thrown");
        } catch (ExecutionException e) {
            org.assertj.core.api.Assertions.assertThat(e.getCause()).isSameAs(runtimeException);
        }
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(10L);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isEqualTo(1L);
        try {
            applyAsync.await();
            Assertions.fail("Should have thrown");
        } catch (ExecutionException e2) {
            org.assertj.core.api.Assertions.assertThat(e2.getCause()).isSameAs(runtimeException);
        }
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(10L);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isEqualTo(1L);
    }

    @Test
    void asyncWorkThatThrowsInAwaitMustRememberException() throws Exception {
        Future<Void> makeWorkStuckAtSemaphore = makeWorkStuckAtSemaphore(1);
        final RuntimeException runtimeException = new RuntimeException("boo");
        AsyncApply applyAsync = this.sync.applyAsync(new AddWork(10) { // from class: org.neo4j.util.concurrent.WorkSyncTest.4
            @Override // org.neo4j.util.concurrent.WorkSyncTest.AddWork
            public void apply(Adder adder) {
                super.apply(adder);
                throw runtimeException;
            }
        });
        refillSemaphore();
        makeWorkStuckAtSemaphore.get();
        try {
            applyAsync.await();
            Assertions.fail("Should have thrown");
        } catch (ExecutionException e) {
            org.assertj.core.api.Assertions.assertThat(e.getCause()).isSameAs(runtimeException);
        }
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(11L);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isEqualTo(2L);
        try {
            applyAsync.await();
            Assertions.fail("Should have thrown");
        } catch (ExecutionException e2) {
            org.assertj.core.api.Assertions.assertThat(e2.getCause()).isSameAs(runtimeException);
        }
        org.assertj.core.api.Assertions.assertThat(this.sum.sum()).isEqualTo(11L);
        org.assertj.core.api.Assertions.assertThat(this.count.sum()).isEqualTo(2L);
    }
}
