/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.async;

import io.datakernel.async.AsyncFunction;
import io.datakernel.async.RetryPolicy;
import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.async.StageConsumer;
import io.datakernel.eventloop.Eventloop;
import java.util.ArrayDeque;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

public interface AsyncCallable<T> {
    public Stage<T> call();

    default public AsyncCallable<T> with(UnaryOperator<AsyncCallable<T>> modifier) {
        return (AsyncCallable)modifier.apply(this);
    }

    public static <T> AsyncCallable<T> of(Supplier<Stage<T>> supplier) {
        return supplier::get;
    }

    public static <A, T> AsyncCallable<T> of(AsyncFunction<? super A, T> function, A a) {
        return () -> function.apply((Object)a);
    }

    public static <A, B, T> AsyncCallable<T> of(BiFunction<? super A, ? super B, Stage<T>> biFunction, A a, B b) {
        return () -> (Stage)biFunction.apply((Object)a, (Object)b);
    }

    default public AsyncCallable<T> sharedCall() {
        return AsyncCallable.sharedCall(this);
    }

    public static <T> AsyncCallable<T> sharedCall(final AsyncCallable<T> actualCallable) {
        return new AsyncCallable<T>(){
            SettableStage<T> runningStage;

            @Override
            public Stage<T> call() {
                if (this.runningStage != null) {
                    return this.runningStage;
                }
                this.runningStage = SettableStage.create();
                this.runningStage.whenComplete((result, throwable) -> {
                    this.runningStage = null;
                });
                actualCallable.call().whenComplete(this.runningStage::set);
                return this.runningStage;
            }
        };
    }

    default public AsyncCallable<T> singleCall() {
        return this.maxCalls(1);
    }

    default public AsyncCallable<T> singleCall(int maxQueueSize) {
        return this.maxCalls(1, maxQueueSize);
    }

    default public AsyncCallable<T> maxCalls(int maxParallelCalls) {
        return this.maxCalls(maxParallelCalls, Integer.MAX_VALUE);
    }

    default public AsyncCallable<T> maxCalls(int maxParallelCalls, int maxQueueSize) {
        return AsyncCallable.maxCalls(this, maxParallelCalls, maxQueueSize);
    }

    public static <T> AsyncCallable<T> maxCalls(final AsyncCallable<T> actualCallable, final int maxParallelCalls, final int maxQueueSize) {
        return new AsyncCallable<T>(){
            private int pendingCalls;
            private final ArrayDeque<SettableStage<T>> deque = new ArrayDeque();

            void processQueue() {
                while (this.pendingCalls < maxParallelCalls && !this.deque.isEmpty()) {
                    SettableStage settableStage = this.deque.pollFirst();
                    ++this.pendingCalls;
                    actualCallable.call().whenComplete((value, throwable) -> {
                        --this.pendingCalls;
                        this.processQueue();
                        settableStage.set(value, throwable);
                    });
                }
            }

            @Override
            public Stage<T> call() {
                if (this.pendingCalls <= maxParallelCalls) {
                    ++this.pendingCalls;
                    return actualCallable.call().whenComplete((value, throwable) -> {
                        --this.pendingCalls;
                        this.processQueue();
                    });
                }
                if (this.deque.size() > maxQueueSize) {
                    return Stage.ofException(new IllegalStateException());
                }
                SettableStage result = SettableStage.create();
                this.deque.addLast(result);
                return result;
            }
        };
    }

    default public AsyncCallable<T> retry(RetryPolicy retryPolicy) {
        return AsyncCallable.retry(this, retryPolicy);
    }

    public static <T> AsyncCallable<T> retry(final AsyncCallable<T> actualCallable, final RetryPolicy retryPolicy) {
        return new AsyncCallable<T>(){

            void doCall(int retryCount, long _retryTimestamp, SettableStage<T> cb) {
                actualCallable.call().whenComplete((value, throwable) -> {
                    if (throwable == null) {
                        cb.set(value);
                    } else {
                        long retryTimestamp;
                        Eventloop eventloop = Eventloop.getCurrentEventloop();
                        long now = eventloop.currentTimeMillis();
                        long nextRetryTimestamp = retryPolicy.nextRetryTimestamp(now, throwable, retryCount, retryTimestamp = _retryTimestamp != 0L ? _retryTimestamp : now);
                        if (nextRetryTimestamp == 0L) {
                            cb.setException(throwable);
                        } else {
                            eventloop.schedule(nextRetryTimestamp, () -> this.doCall(retryCount + 1, retryTimestamp, cb));
                        }
                    }
                });
            }

            @Override
            public Stage<T> call() {
                SettableStage result = SettableStage.create();
                this.doCall(0, 0L, result);
                return result;
            }
        };
    }

    default public AsyncCallable<T> prefetch(int maxSize) {
        return AsyncCallable.prefetch(this, this, maxSize);
    }

    public static <T> AsyncCallable<T> prefetch(final AsyncCallable<T> actualCallable, final AsyncCallable<T> prefetchCallable, final int maxSize) {
        return new AsyncCallable<T>(){
            private int pendingCalls;
            private final ArrayDeque<T> deque = new ArrayDeque();

            private void tryPrefetch() {
                for (int i = 0; i < maxSize - (this.deque.size() + this.pendingCalls); ++i) {
                    ++this.pendingCalls;
                    prefetchCallable.call().whenComplete((value, throwable) -> {
                        --this.pendingCalls;
                        if (throwable == null) {
                            this.deque.addLast(value);
                        }
                    });
                }
            }

            @Override
            public Stage<T> call() {
                Stage result = this.deque.isEmpty() ? actualCallable.call() : Stage.of(this.deque.pollFirst());
                this.tryPrefetch();
                return result;
            }
        };
    }

    default public <V> AsyncCallable<V> thenApply(Function<? super T, ? extends V> function) {
        return () -> this.call().thenApply(function);
    }

    default public AsyncCallable<T> whenComplete(StageConsumer<? super T> action) {
        return () -> this.call().whenComplete(action);
    }
}

