/*
 * Decompiled with CFR 0.152.
 */
package io.airlift.concurrent;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.AsyncSemaphore;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import jakarta.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.Test;

public class TestAsyncSemaphore {
    private final ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"async-semaphore-%s")));

    @Test
    public void testInlineExecution() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(1, (Executor)this.executor, this::submitTask);
        AtomicInteger count = new AtomicInteger();
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit(count::incrementAndGet));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
        Assertions.assertThat((int)count.get()).isEqualTo(1000);
    }

    @Test
    public void testSingleThreadBoundedConcurrency() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(1, (Executor)this.executor, this::submitTask);
        AtomicInteger count = new AtomicInteger();
        AtomicInteger concurrency = new AtomicInteger();
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit(() -> {
                count.incrementAndGet();
                int currentConcurrency = concurrency.incrementAndGet();
                io.airlift.testing.Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(1));
                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                concurrency.decrementAndGet();
            }));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
        Assertions.assertThat((int)count.get()).isEqualTo(1000);
    }

    @Test
    public void testMultiThreadBoundedConcurrency() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, this::submitTask);
        AtomicInteger count = new AtomicInteger();
        AtomicInteger concurrency = new AtomicInteger();
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit(() -> {
                count.incrementAndGet();
                int currentConcurrency = concurrency.incrementAndGet();
                io.airlift.testing.Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                concurrency.decrementAndGet();
            }));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
        Assertions.assertThat((int)count.get()).isEqualTo(1000);
    }

    @Test
    public void testMultiSubmitters() {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, this::submitTask);
        AtomicInteger count = new AtomicInteger();
        AtomicInteger concurrency = new AtomicInteger();
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch completionLatch = new CountDownLatch(100);
        for (int i = 0; i < 100; ++i) {
            this.executor.execute(() -> {
                Uninterruptibles.awaitUninterruptibly((CountDownLatch)startLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
                asyncSemaphore.submit(() -> {
                    count.incrementAndGet();
                    int currentConcurrency = concurrency.incrementAndGet();
                    io.airlift.testing.Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
                    Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
                    concurrency.decrementAndGet();
                    completionLatch.countDown();
                });
            });
        }
        startLatch.countDown();
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)completionLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        Assertions.assertThat((int)count.get()).isEqualTo(100);
    }

    @Test
    public void testFailedTasks() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, this::submitTask);
        AtomicInteger successCount = new AtomicInteger();
        AtomicInteger failureCount = new AtomicInteger();
        AtomicInteger concurrency = new AtomicInteger();
        CountDownLatch completionLatch = new CountDownLatch(1000);
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            ListenableFuture future = asyncSemaphore.submit(() -> TestAsyncSemaphore.assertFailedConcurrency(concurrency));
            Futures.addCallback((ListenableFuture)future, TestAsyncSemaphore.completionCallback(successCount, failureCount, completionLatch), (Executor)MoreExecutors.directExecutor());
            futures.add(future);
        }
        completionLatch.await(1L, TimeUnit.MINUTES);
        for (ListenableFuture future : futures) {
            try {
                future.get();
                Fail.fail();
            }
            catch (Exception exception) {}
        }
        Assertions.assertThat((int)successCount.get()).isEqualTo(0);
        Assertions.assertThat((int)failureCount.get()).isEqualTo(1000);
    }

    @Test
    public void testFailedTaskSubmission() throws Exception {
        AtomicInteger successCount = new AtomicInteger();
        AtomicInteger failureCount = new AtomicInteger();
        AtomicInteger concurrency = new AtomicInteger();
        CountDownLatch completionLatch = new CountDownLatch(1000);
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, task -> {
            throw TestAsyncSemaphore.assertFailedConcurrency(concurrency);
        });
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            ListenableFuture future = asyncSemaphore.submit(Fail::fail);
            Futures.addCallback((ListenableFuture)future, TestAsyncSemaphore.completionCallback(successCount, failureCount, completionLatch), (Executor)MoreExecutors.directExecutor());
            futures.add(future);
        }
        completionLatch.await(1L, TimeUnit.MINUTES);
        for (ListenableFuture future : futures) {
            try {
                future.get();
                Fail.fail();
            }
            catch (Exception exception) {}
        }
        Assertions.assertThat((int)successCount.get()).isEqualTo(0);
        Assertions.assertThat((int)failureCount.get()).isEqualTo(1000);
    }

    @Test
    public void testFailedTaskWithMultipleSubmitters() {
        AtomicInteger successCount = new AtomicInteger();
        AtomicInteger failureCount = new AtomicInteger();
        AtomicInteger concurrency = new AtomicInteger();
        CountDownLatch startLatch = new CountDownLatch(1);
        CountDownLatch completionLatch = new CountDownLatch(100);
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(2, (Executor)this.executor, task -> {
            throw TestAsyncSemaphore.assertFailedConcurrency(concurrency);
        });
        ConcurrentLinkedQueue futures = new ConcurrentLinkedQueue();
        for (int i = 0; i < 100; ++i) {
            this.executor.execute(() -> {
                Uninterruptibles.awaitUninterruptibly((CountDownLatch)startLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
                ListenableFuture future = asyncSemaphore.submit(Fail::fail);
                futures.add(future);
                Futures.addCallback((ListenableFuture)future, TestAsyncSemaphore.completionCallback(successCount, failureCount, completionLatch), (Executor)MoreExecutors.directExecutor());
            });
        }
        startLatch.countDown();
        Uninterruptibles.awaitUninterruptibly((CountDownLatch)completionLatch, (long)1L, (TimeUnit)TimeUnit.MINUTES);
        for (ListenableFuture future : futures) {
            try {
                future.get();
                Fail.fail();
            }
            catch (Exception exception) {}
        }
        Assertions.assertThat((int)successCount.get()).isEqualTo(0);
        Assertions.assertThat((int)failureCount.get()).isEqualTo(100);
    }

    @Test
    public void testNoStackOverflow() throws Exception {
        AsyncSemaphore asyncSemaphore = new AsyncSemaphore(1, (Executor)this.executor, object -> Futures.immediateFuture(null));
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        for (int i = 0; i < 1000; ++i) {
            futures.add(asyncSemaphore.submit(new Object()));
        }
        Futures.allAsList(futures).get(1L, TimeUnit.MINUTES);
    }

    public static int[] testedConcurrency() {
        return new int[]{1, 2, 3, 5};
    }

    @Test
    public void testProcessAllEmptyList() throws Exception {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            ListenableFuture result = AsyncSemaphore.processAll((List)ImmutableList.of(), i -> Futures.immediateCancelledFuture(), (int)concurrency, (Executor)MoreExecutors.directExecutor());
            Assertions.assertThat((Future)result).isDone();
            Assertions.assertThat((List)((List)result.get())).isEqualTo((Object)ImmutableList.of());
        }
    }

    @Test
    public void testProcessAllSingleCallable() throws Exception {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            SettableFuture future = SettableFuture.create();
            ListenableFuture result = AsyncSemaphore.processAll((List)ImmutableList.of((Object)1), i -> future, (int)concurrency, (Executor)MoreExecutors.directExecutor());
            Assertions.assertThat((Future)result).isNotDone();
            future.set((Object)"value");
            Assertions.assertThat((Future)result).isDone();
            Assertions.assertThat((List)((List)result.get())).isEqualTo((Object)ImmutableList.of((Object)"value"));
        }
    }

    @Test
    public void testProcessAllConcurrencyLimit() throws Exception {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            TestingTasks tasks = new TestingTasks(concurrency + 2);
            ListenableFuture result = AsyncSemaphore.processAll(tasks.getTasks(), tasks::submit, (int)concurrency, (Executor)MoreExecutors.directExecutor());
            Assertions.assertThat((Future)result).isNotDone();
            Assertions.assertThat(tasks.getFutures()).hasSize(concurrency);
            tasks.getFutures().get(0).set((Object)"value0");
            Assertions.assertThat((Future)result).isNotDone();
            Assertions.assertThat(tasks.getFutures()).hasSize(concurrency + 1);
            tasks.getFutures().get(1).set((Object)"value1");
            Assertions.assertThat((Future)result).isNotDone();
            Assertions.assertThat(tasks.getFutures()).hasSize(concurrency + 2);
            tasks.getFutures().get(2).set((Object)"value2");
            Assertions.assertThat(tasks.getFutures()).hasSize(concurrency + 2);
            for (int i2 = 3; i2 < tasks.getFutures().size(); ++i2) {
                tasks.getFutures().get(i2).set((Object)("value" + i2));
            }
            Assertions.assertThat((Future)result).isDone();
            Assertions.assertThat((List)((List)result.get())).isEqualTo(IntStream.range(0, tasks.getFutures().size()).mapToObj(i -> "value" + i).collect(ImmutableList.toImmutableList()));
        }
    }

    @Test
    public void testProcessAllCallableFailure() {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            TestAsyncSemaphore.testProcessAllFailure(concurrency, () -> {
                throw new RuntimeException("callable failed");
            }, "callable failed");
        }
    }

    @Test
    public void testProcessAllFutureFailure() {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            TestAsyncSemaphore.testProcessAllFailure(concurrency, () -> Futures.immediateFailedFuture((Throwable)new RuntimeException("future failed")), "future failed");
        }
    }

    @Test
    public void testProcessAllFutureCancellation() {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            TestAsyncSemaphore.testProcessAllFailure(concurrency, Futures::immediateCancelledFuture, "Task was cancelled");
        }
    }

    @Test
    public void testProcessAllCancellation() {
        for (int concurrency : TestAsyncSemaphore.testedConcurrency()) {
            TestingTasks tasks = new TestingTasks(concurrency + 1);
            ListenableFuture result = AsyncSemaphore.processAll(tasks.getTasks(), tasks::submit, (int)concurrency, (Executor)MoreExecutors.directExecutor());
            Assertions.assertThat((Future)result).isNotDone();
            result.cancel(true);
            Assertions.assertThat(tasks.getFutures()).hasSize(concurrency);
            for (ListenableFuture listenableFuture : tasks.getFutures()) {
                Assertions.assertThat((Future)listenableFuture).isCancelled();
            }
            tasks = new TestingTasks(concurrency + 2);
            result = AsyncSemaphore.processAll(tasks.getTasks(), tasks::submit, (int)concurrency, (Executor)MoreExecutors.directExecutor());
            Assertions.assertThat((Future)result).isNotDone();
            tasks.getFutures().get(0).set((Object)"value");
            Assertions.assertThat((Future)result).isNotDone();
            result.cancel(true);
            Assertions.assertThat(tasks.getFutures()).hasSize(concurrency + 1);
            for (int i = 1; i < concurrency + 1; ++i) {
                Assertions.assertThat((Future)((Future)tasks.getFutures().get(i))).isCancelled();
            }
        }
    }

    private static void testProcessAllFailure(int concurrency, Supplier<ListenableFuture<String>> failure, String message) {
        int i;
        TestingTasks tasks = new TestingTasks(concurrency);
        tasks.injectFailure(concurrency - 1, failure);
        ListenableFuture result = AsyncSemaphore.processAll(tasks.getTasks(), tasks::submit, (int)concurrency, (Executor)MoreExecutors.directExecutor());
        Assertions.assertThat((Future)result).isDone();
        TestAsyncSemaphore.assertThatFutureFailsWithMessageContaining(result, message);
        for (ListenableFuture listenableFuture : tasks.getFutures()) {
            Assertions.assertThat((Future)listenableFuture).isCancelled();
        }
        tasks = new TestingTasks(concurrency + 1);
        tasks.injectFailure(concurrency, failure);
        result = AsyncSemaphore.processAll(tasks.getTasks(), tasks::submit, (int)concurrency, (Executor)MoreExecutors.directExecutor());
        Assertions.assertThat((Future)result).isNotDone();
        tasks.getFutures().get(0).set((Object)"value");
        Assertions.assertThat((Future)result).isDone();
        TestAsyncSemaphore.assertThatFutureFailsWithMessageContaining(result, message);
        for (i = 1; i < concurrency; ++i) {
            Assertions.assertThat((Future)((Future)tasks.getFutures().get(i))).isCancelled();
        }
        tasks = new TestingTasks(concurrency + 2);
        tasks.injectFailure(concurrency + 1, failure);
        result = AsyncSemaphore.processAll(tasks.getTasks(), tasks::submit, (int)concurrency, (Executor)MoreExecutors.directExecutor());
        Assertions.assertThat((Future)result).isNotDone();
        tasks.getFutures().get(0).set((Object)"value");
        Assertions.assertThat((Future)result).isNotDone();
        tasks.getFutures().get(1).set((Object)"value");
        Assertions.assertThat((Future)result).isDone();
        TestAsyncSemaphore.assertThatFutureFailsWithMessageContaining(result, message);
        for (i = 2; i < concurrency; ++i) {
            Assertions.assertThat((Future)((Future)tasks.getFutures().get(i))).isCancelled();
        }
    }

    private static void assertThatFutureFailsWithMessageContaining(Future<?> future, String message) {
        Assertions.assertThat(future).failsWithin(0L, TimeUnit.SECONDS).withThrowableOfType(Exception.class).withMessageContaining(message);
    }

    private static RuntimeException assertFailedConcurrency(AtomicInteger concurrency) {
        int currentConcurrency = concurrency.incrementAndGet();
        io.airlift.testing.Assertions.assertLessThanOrEqual((Comparable)Integer.valueOf(currentConcurrency), (Comparable)Integer.valueOf(2));
        Uninterruptibles.sleepUninterruptibly((long)1L, (TimeUnit)TimeUnit.MILLISECONDS);
        concurrency.decrementAndGet();
        throw new IllegalStateException();
    }

    private static FutureCallback<Object> completionCallback(final AtomicInteger successCount, final AtomicInteger failureCount, final CountDownLatch completionLatch) {
        return new FutureCallback<Object>(){

            public void onSuccess(@Nullable Object result) {
                successCount.incrementAndGet();
                completionLatch.countDown();
            }

            public void onFailure(Throwable t) {
                failureCount.incrementAndGet();
                completionLatch.countDown();
            }
        };
    }

    private ListenableFuture<Void> submitTask(Runnable task) {
        return MoreFutures.asVoid((ListenableFuture)this.executor.submit(task));
    }

    private static class TestingTasks {
        private final List<Integer> tasks;
        private final List<SettableFuture<String>> futures = new CopyOnWriteArrayList<SettableFuture<String>>();
        private final Map<Integer, Supplier<ListenableFuture<String>>> failures = new ConcurrentHashMap<Integer, Supplier<ListenableFuture<String>>>();

        private TestingTasks(int count) {
            this.tasks = (List)IntStream.range(0, count).boxed().collect(ImmutableList.toImmutableList());
        }

        public void injectFailure(int task, Supplier<ListenableFuture<String>> failure) {
            this.failures.put(task, failure);
        }

        public ListenableFuture<String> submit(Integer value) {
            Supplier<ListenableFuture<String>> failure = this.failures.get(value);
            if (failure != null) {
                return failure.get();
            }
            SettableFuture future = SettableFuture.create();
            this.futures.add((SettableFuture<String>)future);
            return future;
        }

        public List<Integer> getTasks() {
            return this.tasks;
        }

        public List<SettableFuture<String>> getFutures() {
            return ImmutableList.copyOf(this.futures);
        }
    }
}

