/*
 * Decompiled with CFR 0.152.
 */
package org.rx.core;

import io.netty.util.internal.ThreadLocalRandom;
import java.sql.Time;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import org.rx.annotation.Subscribe;
import org.rx.bean.DateTime;
import org.rx.bean.FlagsEnum;
import org.rx.core.Constants;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.ObjectChangedEvent;
import org.rx.core.Reflects;
import org.rx.core.RunFlag;
import org.rx.core.RxConfig;
import org.rx.core.ThreadPool;
import org.rx.core.TimeoutFlag;
import org.rx.core.TimeoutFuture;
import org.rx.core.WheelTimer;
import org.rx.exception.TraceHandler;
import org.rx.util.function.Action;
import org.rx.util.function.Func;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Tasks {
    private static final Logger log = LoggerFactory.getLogger(Tasks.class);
    static final List<ThreadPool> nodes = new CopyOnWriteArrayList<ThreadPool>();
    static final ExecutorService executor;
    static final WheelTimer timer;
    static final Queue<Action> shutdownActions;
    static int poolCount;

    @Subscribe(topicClass=RxConfig.class)
    static synchronized void onChanged(ObjectChangedEvent event) {
        int newCount = RxConfig.INSTANCE.threadPool.replicas;
        if (newCount == poolCount) {
            return;
        }
        log.info("RxMeta {} changed {} -> {}", new Object[]{"app.threadPool.replicas", poolCount, newCount});
        for (int i = 0; i < newCount; ++i) {
            nodes.add(0, new ThreadPool(String.format("N%s", i)));
        }
        poolCount = newCount;
        if (nodes.size() > poolCount) {
            timer.setTimeout(() -> {
                if (nodes.size() == poolCount) {
                    Extends.circuitContinue(false);
                    return;
                }
                for (int i = poolCount; i < nodes.size(); ++i) {
                    if (nodes.get(i).getActiveCount() != 0) continue;
                    nodes.remove(i);
                }
            }, 60000L, nodes, (FlagsEnum<TimeoutFlag>)TimeoutFlag.PERIOD.flags(new TimeoutFlag[]{TimeoutFlag.REPLACE}));
        }
    }

    public static ThreadPool nextPool() {
        return nodes.get(ThreadLocalRandom.current().nextInt(0, poolCount));
    }

    public static ExecutorService executor() {
        return executor;
    }

    public static WheelTimer timer() {
        return timer;
    }

    public static void addShutdownHook(Action fn) {
        shutdownActions.offer(fn);
    }

    @SafeVarargs
    public static <T> T sequentialRetry(Func<T> ... funcs) {
        Throwable last = null;
        for (Func<T> func : funcs) {
            try {
                return func.invoke();
            }
            catch (Throwable e) {
                last = e;
            }
        }
        if (last != null) {
            throw last;
        }
        return null;
    }

    @SafeVarargs
    public static <T> T randomRetry(Func<T> ... funcs) {
        int i;
        int mid = ThreadLocalRandom.current().nextInt(0, funcs.length);
        Throwable last = null;
        for (i = 0; i < mid; ++i) {
            try {
                return funcs[i].invoke();
            }
            catch (Throwable e) {
                last = e;
                continue;
            }
        }
        for (i = mid; i < funcs.length; ++i) {
            try {
                return funcs[i].invoke();
            }
            catch (Throwable e) {
                last = e;
                continue;
            }
        }
        if (last != null) {
            throw last;
        }
        return null;
    }

    public static <T> T await(Future<T> future) {
        if (future instanceof CompletableFuture) {
            return ((CompletableFuture)future).join();
        }
        return future.get();
    }

    public static <T> T awaitQuietly(Func<T> func, long millis) {
        return Tasks.awaitQuietly(Tasks.run(func), millis);
    }

    public static <T> T awaitQuietly(Future<T> future, long millis) {
        try {
            return future.get(millis, TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            TraceHandler.INSTANCE.log("awaitNow {} timeout", Reflects.stackClass(2).getName());
        }
        catch (Exception e) {
            TraceHandler.INSTANCE.log(e);
        }
        return null;
    }

    public static Future<Void> run(Action task) {
        return Tasks.nextPool().run(task);
    }

    public static Future<Void> run(Action task, Object taskId, FlagsEnum<RunFlag> flags) {
        return Tasks.nextPool().run(task, taskId, flags);
    }

    public static <T> Future<T> run(Func<T> task) {
        return Tasks.nextPool().run(task);
    }

    public static <T> Future<T> run(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags) {
        return Tasks.nextPool().run(task, taskId, flags);
    }

    public static CompletableFuture<Void> runAsync(Action task) {
        return Tasks.nextPool().runAsync(task);
    }

    public static CompletableFuture<Void> runAsync(Action task, Object taskId, FlagsEnum<RunFlag> flags) {
        return Tasks.nextPool().runAsync(task, taskId, flags);
    }

    public static <T> CompletableFuture<T> runAsync(Func<T> task) {
        return Tasks.nextPool().runAsync(task);
    }

    public static <T> CompletableFuture<T> runAsync(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags) {
        return Tasks.nextPool().runAsync(task, taskId, flags);
    }

    public static TimeoutFuture<?> setTimeout(Action task, long delay) {
        return timer.setTimeout(task, delay);
    }

    public static TimeoutFuture<?> setTimeout(Action task, long delay, Object taskId, FlagsEnum<TimeoutFlag> flags) {
        return timer.setTimeout(task, delay, taskId, flags);
    }

    public static List<? extends ScheduledFuture<?>> scheduleDaily(Action task, String ... timeArray) {
        return Linq.from(timeArray).select(p -> Tasks.scheduleDaily(task, Time.valueOf(p))).toList();
    }

    public static ScheduledFuture<?> scheduleDaily(@NonNull Action task, @NonNull Time time) {
        if (task == null) {
            throw new NullPointerException("task is marked non-null but is null");
        }
        if (time == null) {
            throw new NullPointerException("time is marked non-null but is null");
        }
        long oneDay = 86400000L;
        long initDelay = DateTime.now().setTimePart(time.toString()).getTime() - System.currentTimeMillis();
        initDelay = initDelay > 0L ? initDelay : oneDay + initDelay;
        return Tasks.schedulePeriod(task, initDelay, oneDay);
    }

    public static ScheduledFuture<?> schedulePeriod(Action task, long period) {
        return Tasks.schedulePeriod(task, period, period);
    }

    public static ScheduledFuture<?> schedulePeriod(@NonNull Action task, long initialDelay, long period) {
        if (task == null) {
            throw new NullPointerException("task is marked non-null but is null");
        }
        return timer.setTimeout(task, d -> d == 0L ? initialDelay : period, null, Constants.TIMER_PERIOD_FLAG);
    }

    static {
        shutdownActions = new ConcurrentLinkedQueue<Action>();
        Tasks.onChanged(null);
        executor = new AbstractExecutorService(){
            boolean shutdown;

            @Override
            public Future<?> submit(Runnable task) {
                return Tasks.nextPool().submit(task);
            }

            @Override
            public <T> Future<T> submit(Runnable task, T result) {
                return Tasks.nextPool().submit(task, result);
            }

            @Override
            public <T> Future<T> submit(Callable<T> task) {
                return Tasks.nextPool().submit(task);
            }

            @Override
            public void execute(Runnable command) {
                Tasks.nextPool().execute(command);
            }

            @Override
            public void shutdown() {
                this.shutdown = true;
            }

            @Override
            public List<Runnable> shutdownNow() {
                this.shutdown = true;
                return Collections.emptyList();
            }

            @Override
            public boolean isTerminated() {
                return this.shutdown;
            }

            @Override
            public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
                return this.shutdown;
            }

            @Override
            public boolean isShutdown() {
                return this.shutdown;
            }
        };
        timer = new WheelTimer(executor);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            Action fn;
            while ((fn = shutdownActions.poll()) != null) {
                try {
                    fn.invoke();
                }
                catch (Throwable e) {
                    TraceHandler.INSTANCE.log(e);
                }
            }
        }));
        try {
            Reflects.writeStaticField(CompletableFuture.class, "asyncPool", executor);
        }
        catch (Throwable e) {
            try {
                Reflects.writeStaticField(CompletableFuture.class, "ASYNC_POOL", executor);
            }
            catch (Throwable ie) {
                log.warn("setAsyncPool {}", (Object)e, (Object)ie);
            }
        }
    }
}

