/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.common.util;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.functions.runtime.shaded.javax.annotation.concurrent.ThreadSafe;

public class FutureUtil {
    public static CompletableFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> futures) {
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    }

    public static <T> CompletableFuture<List<T>> waitForAll(Stream<CompletableFuture<List<T>>> futures) {
        return futures.reduce(CompletableFuture.completedFuture(new ArrayList()), (pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV -> {
            preV.addAll(currV);
            return preV;
        })));
    }

    public static CompletableFuture<Object> waitForAny(Collection<? extends CompletableFuture<?>> futures) {
        return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
    }

    public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures, Predicate<Object> tester) {
        return FutureUtil.waitForAny(futures).thenCompose(v -> {
            if (tester.test(v)) {
                futures.forEach(f -> {
                    if (!f.isDone()) {
                        f.cancel(true);
                    }
                });
                return CompletableFuture.completedFuture(Optional.of(v));
            }
            Collection doneFutures = futures.stream().filter(f -> f.isDone()).collect(Collectors.toList());
            futures.removeAll(doneFutures);
            Optional<Object> value = doneFutures.stream().filter(f -> !f.isCompletedExceptionally()).map(CompletableFuture::join).filter(tester).findFirst();
            if (!value.isPresent()) {
                if (futures.size() == 0) {
                    return CompletableFuture.completedFuture(Optional.empty());
                }
                return FutureUtil.waitForAny(futures, tester);
            }
            futures.forEach(f -> {
                if (!f.isDone()) {
                    f.cancel(true);
                }
            });
            return CompletableFuture.completedFuture(Optional.of(value.get()));
        });
    }

    public static CompletableFuture<Void> waitForAllAndSupportCancel(Collection<? extends CompletableFuture<?>> futures) {
        CompletableFuture[] futuresArray = futures.toArray(new CompletableFuture[0]);
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futuresArray);
        FutureUtil.whenCancelledOrTimedOut(combinedFuture, () -> {
            for (CompletableFuture completableFuture : futuresArray) {
                if (completableFuture.isDone()) continue;
                completableFuture.cancel(false);
            }
        });
        return combinedFuture;
    }

    public static void whenCancelledOrTimedOut(CompletableFuture<?> future, Runnable cancelAction) {
        CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
        cancellationHandler.setCancelAction(cancelAction);
        cancellationHandler.attachToFuture(future);
    }

    public static <T> CompletableFuture<T> failedFuture(Throwable t) {
        CompletableFuture future = new CompletableFuture();
        future.completeExceptionally(t);
        return future;
    }

    public static Throwable unwrapCompletionException(Throwable ex) {
        if (ex instanceof CompletionException) {
            return ex.getCause();
        }
        if (ex instanceof ExecutionException) {
            return ex.getCause();
        }
        return ex;
    }

    public static <T> CompletableFuture<T> createFutureWithTimeout(Duration timeout, ScheduledExecutorService executor, Supplier<Throwable> exceptionSupplier) {
        return FutureUtil.addTimeoutHandling(new CompletableFuture(), timeout, executor, exceptionSupplier);
    }

    public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> future, Duration timeout, ScheduledExecutorService executor, Supplier<Throwable> exceptionSupplier) {
        ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
            if (!future.isDone()) {
                future.completeExceptionally((Throwable)exceptionSupplier.get());
            }
        }, timeout.toMillis(), TimeUnit.MILLISECONDS);
        future.whenComplete((res, exception) -> scheduledFuture.cancel(false));
        return future;
    }

    public static TimeoutException createTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
        return new LowOverheadTimeoutException(message, sourceClass, sourceMethod);
    }

    public static <T> Optional<Throwable> getException(CompletableFuture<T> future) {
        if (future != null && future.isCompletedExceptionally()) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                return Optional.ofNullable(e);
            }
            catch (ExecutionException e) {
                return Optional.ofNullable(e.getCause());
            }
        }
        return Optional.empty();
    }

    public static CompletionException wrapToCompletionException(Throwable throwable) {
        if (throwable instanceof CompletionException) {
            return (CompletionException)throwable;
        }
        return new CompletionException(throwable);
    }

    private static class LowOverheadTimeoutException
    extends TimeoutException {
        private static final long serialVersionUID = 1L;

        LowOverheadTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
            super(message);
            this.setStackTrace(new StackTraceElement[]{new StackTraceElement(sourceClass.getName(), sourceMethod, null, -1)});
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    @ThreadSafe
    public static class Sequencer<T> {
        private CompletableFuture<T> sequencerFuture = CompletableFuture.completedFuture(null);
        private final boolean allowExceptionBreakChain;

        public Sequencer(boolean allowExceptionBreakChain) {
            this.allowExceptionBreakChain = allowExceptionBreakChain;
        }

        public static <T> Sequencer<T> create(boolean allowExceptionBreakChain) {
            return new Sequencer<T>(allowExceptionBreakChain);
        }

        public static <T> Sequencer<T> create() {
            return new Sequencer<T>(false);
        }

        public synchronized CompletableFuture<T> sequential(Supplier<CompletableFuture<T>> newTask) {
            Objects.requireNonNull(newTask);
            if (this.sequencerFuture.isDone()) {
                if (this.sequencerFuture.isCompletedExceptionally() && this.allowExceptionBreakChain) {
                    return this.sequencerFuture;
                }
                this.sequencerFuture = newTask.get();
                return this.sequencerFuture;
            }
            this.sequencerFuture = this.allowExceptionBreakChain ? this.sequencerFuture.thenCompose(__ -> (CompletionStage)newTask.get()) : ((CompletableFuture)this.sequencerFuture.exceptionally(ex -> null)).thenCompose(__ -> (CompletionStage)newTask.get());
            return this.sequencerFuture;
        }
    }
}

