/*
 * Decompiled with CFR 0.152.
 */
package org.rx.core;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.InternalThreadLocalMap;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.NonNull;
import org.rx.bean.BiTuple;
import org.rx.bean.Decimal;
import org.rx.bean.FlagsEnum;
import org.rx.bean.IntWaterMark;
import org.rx.bean.RefCounter;
import org.rx.bean.ULID;
import org.rx.bean.WeakIdentityMap;
import org.rx.core.Constants;
import org.rx.core.Delegate;
import org.rx.core.EventPublisher;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.NEventArgs;
import org.rx.core.Reflects;
import org.rx.core.RunFlag;
import org.rx.core.RxConfig;
import org.rx.core.Sys;
import org.rx.exception.InvalidException;
import org.rx.exception.TraceHandler;
import org.rx.util.function.Action;
import org.rx.util.function.Func;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThreadPool
extends ThreadPoolExecutor {
    private static final Logger log = LoggerFactory.getLogger(ThreadPool.class);
    public static volatile Func<String> traceIdGenerator;
    public static final Delegate<EventPublisher.StaticEventPublisher, NEventArgs<String>> onTraceIdChanged;
    static final ThreadLocal<Queue<String>> CTX_PARENT_TRACE_ID;
    static final ThreadLocal<String> CTX_TRACE_ID;
    static final FastThreadLocal<Boolean> ASYNC_CONTINUE;
    static final FastThreadLocal<Object> COMPLETION_RETURNED_VALUE;
    static final String POOL_NAME_PREFIX = "\u211eThreads-";
    static final IntWaterMark DEFAULT_CPU_WATER_MARK;
    static final HashedWheelTimer timer;
    static final DynamicSizer sizer;
    static final Map<Object, RefCounter<ReentrantLock>> taskLockMap;
    static final Map<Object, CompletableFuture<?>> taskSerialMap;
    final String poolName;
    final Map<Runnable, Task<?>> taskMap = new ConcurrentHashMap();
    final Executor asyncExecutor = x$0 -> super.execute(x$0);

    public static String startTrace(String traceId) {
        return ThreadPool.startTrace(traceId, false);
    }

    public static String startTrace(String traceId, boolean requiresNew) {
        String tid = CTX_TRACE_ID.get();
        if (tid == null) {
            tid = traceId != null ? traceId : (traceIdGenerator != null ? traceIdGenerator.invoke() : ULID.randomULID().toBase64String());
            CTX_TRACE_ID.set(tid);
        } else if (traceId != null && !traceId.equals(tid)) {
            if (!requiresNew) {
                log.warn("The traceId already mapped to {} and can not set to {}", (Object)tid, (Object)traceId);
            } else {
                LinkedList<String> queue = (LinkedList<String>)CTX_PARENT_TRACE_ID.get();
                if (queue == null) {
                    queue = new LinkedList<String>();
                    CTX_PARENT_TRACE_ID.set(queue);
                }
                if (queue.size() > RxConfig.INSTANCE.threadPool.maxTraceDepth) {
                    queue.poll();
                }
                queue.addFirst(tid);
                CTX_TRACE_ID.set(traceId);
                log.info("trace requires new to {} with parent {}", (Object)traceId, (Object)tid);
                tid = traceId;
            }
        }
        onTraceIdChanged.invoke(EventPublisher.STATIC_EVENT_INSTANCE, new NEventArgs<String>(tid));
        return tid;
    }

    public static String traceId() {
        return CTX_TRACE_ID.get();
    }

    public static void endTrace() {
        String parentTid;
        Queue<String> queue = CTX_PARENT_TRACE_ID.get();
        if (queue != null && (parentTid = queue.poll()) != null) {
            CTX_TRACE_ID.set(parentTid);
            if (queue.isEmpty()) {
                CTX_PARENT_TRACE_ID.remove();
            }
        } else {
            parentTid = null;
            CTX_TRACE_ID.remove();
        }
        onTraceIdChanged.invoke(EventPublisher.STATIC_EVENT_INSTANCE, new NEventArgs<String>(parentTid));
    }

    public static <T> T completionReturnedValue() {
        return (T)COMPLETION_RETURNED_VALUE.getIfExists();
    }

    public static int computeThreads(double cpuUtilization, long waitTime, long cpuTime) {
        Extends.require(cpuUtilization, 0.0 <= cpuUtilization && cpuUtilization <= 1.0);
        return (int)Math.max((double)Constants.CPU_THREADS, Math.floor((double)Constants.CPU_THREADS * cpuUtilization * (1.0 + (double)waitTime / (double)cpuTime)));
    }

    static ThreadFactory newThreadFactory(String name) {
        return new DefaultThreadFactory(String.format("%s%s", POOL_NAME_PREFIX, name), true);
    }

    static int incrSize(ThreadPoolExecutor pool) {
        RxConfig.ThreadPoolConfig conf = RxConfig.INSTANCE.threadPool;
        int poolSize = pool.getCorePoolSize() + conf.resizeQuantity;
        if (poolSize > conf.maxDynamicSize) {
            return conf.maxDynamicSize;
        }
        pool.setCorePoolSize(poolSize);
        return poolSize;
    }

    static int decrSize(ThreadPoolExecutor pool) {
        RxConfig.ThreadPoolConfig conf = RxConfig.INSTANCE.threadPool;
        int poolSize = Math.max(conf.minDynamicSize, pool.getCorePoolSize() - conf.resizeQuantity);
        pool.setCorePoolSize(poolSize);
        return poolSize;
    }

    static boolean asyncContinueFlag(boolean def) {
        Boolean ac = (Boolean)ASYNC_CONTINUE.getIfExists();
        ASYNC_CONTINUE.remove();
        if (ac == null) {
            return def;
        }
        return ac;
    }

    @Override
    public void setMaximumPoolSize(int maximumPoolSize) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
        throw new UnsupportedOperationException();
    }

    public ThreadPool(String poolName) {
        this(RxConfig.INSTANCE.threadPool.initSize, RxConfig.INSTANCE.threadPool.queueCapacity, poolName);
    }

    public ThreadPool(int initSize, int queueCapacity, String poolName) {
        this(initSize, queueCapacity, DEFAULT_CPU_WATER_MARK, poolName);
    }

    public ThreadPool(int initSize, int queueCapacity, IntWaterMark cpuWaterMark, String poolName) {
        super(ThreadPool.checkSize(initSize), Integer.MAX_VALUE, RxConfig.INSTANCE.threadPool.keepAliveSeconds, TimeUnit.SECONDS, new ThreadQueue(ThreadPool.checkCapacity(queueCapacity)), ThreadPool.newThreadFactory(poolName), (r, executor) -> {
            if (executor.isShutdown()) {
                log.warn("ThreadPool {} is shutdown", (Object)poolName);
                return;
            }
            executor.getQueue().offer(r);
        });
        super.allowCoreThreadTimeOut(true);
        ((ThreadQueue)super.getQueue()).pool = this;
        this.poolName = poolName;
        this.setDynamicSize(cpuWaterMark);
    }

    private static int checkSize(int size) {
        if (size <= 0) {
            size = Constants.CPU_THREADS + 1;
        }
        return size;
    }

    private static int checkCapacity(int capacity) {
        if (capacity <= 0) {
            capacity = Constants.CPU_THREADS * 32;
        }
        return capacity;
    }

    public void setDynamicSize(IntWaterMark cpuWaterMark) {
        if (cpuWaterMark.getLow() < 0) {
            cpuWaterMark.setLow(0);
        }
        if (cpuWaterMark.getHigh() > 100) {
            cpuWaterMark.setHigh(100);
        }
        sizer.register(this, cpuWaterMark);
    }

    @Override
    public void execute(Runnable command) {
        super.execute(Task.adapt(command));
    }

    @Override
    public Future<?> submit(Runnable task) {
        RunnableFuture<Object> ft = this.newTaskFor(task, null);
        super.execute(ft);
        return ft;
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        RunnableFuture<T> ft = this.newTaskFor(task, result);
        super.execute(ft);
        return ft;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        RunnableFuture<T> ft = this.newTaskFor(task);
        super.execute(ft);
        return ft;
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTaskAdapter<T>((Runnable)Task.adapt(runnable), value);
    }

    @Override
    protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTaskAdapter<T>(Task.adapt(callable));
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return super.invokeAny(Linq.from(tasks).select(p -> Task.adapt(p)).toList());
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return super.invokeAny(Linq.from(tasks).select(p -> Task.adapt(p)).toList(), timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        return super.invokeAll(Linq.from(tasks).select(p -> Task.adapt(p)).toList());
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return super.invokeAll(Linq.from(tasks).select(p -> Task.adapt(p)).toList(), timeout, unit);
    }

    public Future<Void> run(Action task) {
        return this.run(task, null, null);
    }

    public Future<Void> run(Action task, Object taskId, FlagsEnum<RunFlag> flags) {
        return this.submit(new Task(task.toFunc(), flags, taskId));
    }

    public <T> Future<T> run(Func<T> task) {
        return this.run(task, null, null);
    }

    public <T> Future<T> run(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags) {
        return this.submit(new Task<T>(task, flags, taskId));
    }

    public <T> T runAny(Collection<Func<T>> tasks, long timeoutMillis) {
        List<Callable> callables = Linq.from(tasks).select(p -> new Task(p, null, null)).toList();
        return timeoutMillis > 0L ? super.invokeAny(callables, timeoutMillis, TimeUnit.MILLISECONDS) : super.invokeAny(callables);
    }

    public <T> List<Future<T>> runAll(Collection<Func<T>> tasks, long timeoutMillis) {
        List<Callable> callables = Linq.from(tasks).select(p -> new Task(p, null, null)).toList();
        return timeoutMillis > 0L ? super.invokeAll(callables, timeoutMillis, TimeUnit.MILLISECONDS) : super.invokeAll(callables);
    }

    public <T> CompletionService<T> newCompletionService() {
        return new ExecutorCompletionService(this);
    }

    public CompletableFuture<Void> runAsync(Action task) {
        return this.runAsync(task, null, null);
    }

    public CompletableFuture<Void> runAsync(Action task, Object taskId, FlagsEnum<RunFlag> flags) {
        Task t = new Task(task.toFunc(), flags, taskId);
        return CompletableFuture.runAsync(t, this.asyncExecutor);
    }

    public <T> CompletableFuture<T> runAsync(Func<T> task) {
        return this.runAsync(task, null, null);
    }

    public <T> CompletableFuture<T> runAsync(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags) {
        Task<T> t = new Task<T>(task, flags, taskId);
        return CompletableFuture.supplyAsync(t, this.asyncExecutor);
    }

    public <T> Future<T> runSerial(Func<T> task, Object taskId) {
        return this.runSerial(task, taskId, null);
    }

    public <T> Future<T> runSerial(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags) {
        return this.runSerialAsync(task, taskId, flags, true);
    }

    public <T> CompletableFuture<T> runSerialAsync(Func<T> task, Object taskId) {
        return this.runSerialAsync(task, taskId, null);
    }

    public <T> CompletableFuture<T> runSerialAsync(Func<T> task, Object taskId, FlagsEnum<RunFlag> flags) {
        return this.runSerialAsync(task, taskId, flags, false);
    }

    <T> CompletableFuture<T> runSerialAsync(@NonNull Func<T> task, @NonNull Object taskId, FlagsEnum<RunFlag> flags, boolean reuse) {
        CompletionStage f;
        if (task == null) {
            throw new NullPointerException("task is marked non-null but is null");
        }
        if (taskId == null) {
            throw new NullPointerException("taskId is marked non-null but is null");
        }
        Function<Object, CompletableFuture> mfn = k -> {
            Task t = new Task(task, flags, taskId);
            return CompletableFuture.supplyAsync(t, this.asyncExecutor).whenCompleteAsync((r, e) -> taskSerialMap.remove(taskId));
        };
        CompletableFuture newValue = null;
        CompletableFuture v = taskSerialMap.get(taskId);
        CompletableFuture completableFuture = f = v == null && (newValue = mfn.apply(taskId)) != null && (v = taskSerialMap.putIfAbsent(taskId, newValue)) == null ? newValue : v;
        if (newValue == null) {
            f = f.thenApplyAsync(t -> {
                COMPLETION_RETURNED_VALUE.set(t);
                try {
                    Object t2 = task.get();
                    return t2;
                }
                finally {
                    COMPLETION_RETURNED_VALUE.remove();
                }
            }, (Executor)this);
            if (!reuse) {
                taskSerialMap.put(taskId, (CompletableFuture<?>)f);
            }
        }
        return f;
    }

    public <T> MultiTaskFuture<T, T> runAnyAsync(Collection<Func<T>> tasks) {
        CompletableFuture[] futures = Linq.from(tasks).select(task -> {
            Task t = new Task(task, null, null);
            return CompletableFuture.supplyAsync(t, this.asyncExecutor);
        }).toArray();
        return new MultiTaskFuture(CompletableFuture.anyOf(futures), futures);
    }

    public <T> MultiTaskFuture<Void, T> runAllAsync(Collection<Func<T>> tasks) {
        CompletableFuture[] futures = Linq.from(tasks).select(task -> {
            Task t = new Task(task, null, null);
            return CompletableFuture.supplyAsync(t, this.asyncExecutor);
        }).toArray();
        return new MultiTaskFuture(CompletableFuture.allOf(futures), futures);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        Task<?> task = this.setTask(r);
        if (task == null) {
            return;
        }
        FlagsEnum<RunFlag> flags = task.flags;
        if (flags.has(new RunFlag[]{RunFlag.SINGLE})) {
            RefCounter<ReentrantLock> ctx = this.getContextForLock(task.id);
            if (!((ReentrantLock)ctx.ref).tryLock()) {
                throw new InterruptedException(String.format("SingleScope %s locked by other thread", task.id));
            }
            ctx.incrementRefCnt();
            log.debug("CTX tryLock {} -> {}", task.id, (Object)flags.name());
        } else if (flags.has(new RunFlag[]{RunFlag.SYNCHRONIZED})) {
            RefCounter<ReentrantLock> ctx = this.getContextForLock(task.id);
            ctx.incrementRefCnt();
            ((ReentrantLock)ctx.ref).lock();
            log.debug("CTX lock {} -> {}", task.id, (Object)flags.name());
        }
        if (flags.has(new RunFlag[]{RunFlag.PRIORITY}) && !this.getQueue().isEmpty()) {
            ThreadPool.incrSize(this);
        }
        if (task.parent != null) {
            this.setThreadLocalMap(t, task.parent);
        }
        if (flags.has(new RunFlag[]{RunFlag.THREAD_TRACE})) {
            ThreadPool.startTrace(task.traceId);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        RefCounter<ReentrantLock> ctx;
        Task<?> task = this.getTask(r, true);
        if (task == null) {
            return;
        }
        FlagsEnum<RunFlag> flags = task.flags;
        Object id = task.id;
        if (id != null && (ctx = taskLockMap.get(id)) != null) {
            boolean doRemove = false;
            if (ctx.decrementRefCnt() <= 0) {
                taskLockMap.remove(id);
                doRemove = true;
            }
            log.debug("CTX unlock{} {} -> {}", new Object[]{doRemove ? " & clear" : "", id, task.flags.name()});
            ((ReentrantLock)ctx.ref).unlock();
        }
        if (task.parent != null) {
            this.setThreadLocalMap(Thread.currentThread(), null);
        }
        if (flags.has(new RunFlag[]{RunFlag.THREAD_TRACE})) {
            ThreadPool.endTrace();
        }
    }

    private void setThreadLocalMap(Thread t, InternalThreadLocalMap threadLocalMap) {
        if (t instanceof FastThreadLocalThread) {
            ((FastThreadLocalThread)t).setThreadLocalMap(threadLocalMap);
            return;
        }
        ThreadLocal slowThreadLocalMap = (ThreadLocal)Reflects.readStaticField(InternalThreadLocalMap.class, "slowThreadLocalMap");
        if (threadLocalMap == null) {
            slowThreadLocalMap.remove();
            return;
        }
        slowThreadLocalMap.set(threadLocalMap);
    }

    private RefCounter<ReentrantLock> getContextForLock(Object id) {
        if (id == null) {
            throw new InvalidException("SINGLE or SYNCHRONIZED flag require a taskId", new Object[0]);
        }
        return taskLockMap.computeIfAbsent(id, k -> new RefCounter<ReentrantLock>(new ReentrantLock()));
    }

    private Task<?> setTask(Runnable r) {
        Task<Object> task = this.taskMap.get(r);
        if (task == null && (task = r instanceof FutureTaskAdapter ? ((FutureTaskAdapter)r).task : (r instanceof CompletableFuture.AsynchronousCompletionTask ? Task.as(Reflects.readField(r, "fn")) : Task.as(r))) != null) {
            this.taskMap.put(r, task);
        }
        return task;
    }

    private Task<?> getTask(Runnable r, boolean remove) {
        return remove ? this.taskMap.remove(r) : this.taskMap.get(r);
    }

    @Override
    public String toString() {
        return String.format("%s%s@%s", POOL_NAME_PREFIX, this.poolName, Integer.toHexString(this.hashCode()));
    }

    public String getPoolName() {
        return this.poolName;
    }

    static {
        onTraceIdChanged = Delegate.create();
        CTX_PARENT_TRACE_ID = new InheritableThreadLocal<Queue<String>>();
        CTX_TRACE_ID = new InheritableThreadLocal<String>();
        ASYNC_CONTINUE = new FastThreadLocal();
        COMPLETION_RETURNED_VALUE = new FastThreadLocal();
        DEFAULT_CPU_WATER_MARK = new IntWaterMark(RxConfig.INSTANCE.threadPool.lowCpuWaterMark, RxConfig.INSTANCE.threadPool.highCpuWaterMark);
        timer = new HashedWheelTimer(ThreadPool.newThreadFactory("timer"), 800L, TimeUnit.MILLISECONDS, 8);
        sizer = new DynamicSizer();
        taskLockMap = new ConcurrentHashMap<Object, RefCounter<ReentrantLock>>(8);
        taskSerialMap = new ConcurrentHashMap();
    }

    static class DynamicSizer
    implements TimerTask {
        final Map<ThreadPoolExecutor, BiTuple<IntWaterMark, Integer, Integer>> holder = new WeakIdentityMap<ThreadPoolExecutor, BiTuple<IntWaterMark, Integer, Integer>>(8);

        DynamicSizer() {
            timer.newTimeout((TimerTask)this, RxConfig.INSTANCE.threadPool.samplingPeriod, TimeUnit.MILLISECONDS);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(Timeout timeout) throws Exception {
            try {
                Decimal cpuLoad = Decimal.valueOf(Sys.osMx.getSystemCpuLoad() * 100.0);
                for (Map.Entry<ThreadPoolExecutor, BiTuple<IntWaterMark, Integer, Integer>> entry : this.holder.entrySet()) {
                    ThreadPoolExecutor pool = entry.getKey();
                    if (pool instanceof ScheduledExecutorService) {
                        this.scheduledThread(cpuLoad, pool, entry.getValue());
                        continue;
                    }
                    this.thread(cpuLoad, pool, entry.getValue());
                }
            }
            finally {
                timer.newTimeout((TimerTask)this, RxConfig.INSTANCE.threadPool.samplingPeriod, TimeUnit.MILLISECONDS);
            }
        }

        private void thread(Decimal cpuLoad, ThreadPoolExecutor pool, BiTuple<IntWaterMark, Integer, Integer> tuple) {
            IntWaterMark waterMark = (IntWaterMark)tuple.left;
            int decrementCounter = (Integer)tuple.middle;
            int incrementCounter = (Integer)tuple.right;
            String prefix = pool.toString();
            if (log.isDebugEnabled()) {
                log.debug("{} PoolSize={}+[{}] Threshold={}[{}-{}]% de/incrementCounter={}/{}", new Object[]{prefix, pool.getPoolSize(), pool.getQueue().size(), cpuLoad, waterMark.getLow(), waterMark.getHigh(), decrementCounter, incrementCounter});
            }
            if (cpuLoad.gt(waterMark.getHigh())) {
                if (++decrementCounter >= RxConfig.INSTANCE.threadPool.samplingTimes) {
                    log.info("{} PoolSize={}+[{}] Threshold={}[{}-{}]% decrement to {}", new Object[]{prefix, pool.getPoolSize(), pool.getQueue().size(), cpuLoad, waterMark.getLow(), waterMark.getHigh(), ThreadPool.decrSize(pool)});
                    decrementCounter = 0;
                }
            } else {
                decrementCounter = 0;
            }
            if (!pool.getQueue().isEmpty() && cpuLoad.lt(waterMark.getLow())) {
                if (++incrementCounter >= RxConfig.INSTANCE.threadPool.samplingTimes) {
                    log.info("{} PoolSize={}+[{}] Threshold={}[{}-{}]% increment to {}", new Object[]{prefix, pool.getPoolSize(), pool.getQueue().size(), cpuLoad, waterMark.getLow(), waterMark.getHigh(), ThreadPool.incrSize(pool)});
                    incrementCounter = 0;
                }
            } else {
                incrementCounter = 0;
            }
            tuple.middle = decrementCounter;
            tuple.right = incrementCounter;
        }

        private void scheduledThread(Decimal cpuLoad, ThreadPoolExecutor pool, BiTuple<IntWaterMark, Integer, Integer> tuple) {
            IntWaterMark waterMark = (IntWaterMark)tuple.left;
            int decrementCounter = (Integer)tuple.middle;
            int incrementCounter = (Integer)tuple.right;
            String prefix = pool.toString();
            int active = pool.getActiveCount();
            int size = pool.getCorePoolSize();
            float idle = (float)active / (float)size * 100.0f;
            log.debug("{} PoolSize={} QueueSize={} Threshold={}[{}-{}]% idle={} de/incrementCounter={}/{}", new Object[]{prefix, pool.getCorePoolSize(), pool.getQueue().size(), cpuLoad, waterMark.getLow(), waterMark.getHigh(), Float.valueOf(100.0f - idle), decrementCounter, incrementCounter});
            RxConfig.ThreadPoolConfig conf = RxConfig.INSTANCE.threadPool;
            if (size > conf.minDynamicSize && (idle <= (float)waterMark.getHigh() || cpuLoad.gt(waterMark.getHigh()))) {
                if (++decrementCounter >= conf.samplingTimes) {
                    log.info("{} Threshold={}[{}-{}]% idle={} decrement to {}", new Object[]{prefix, cpuLoad, waterMark.getLow(), waterMark.getHigh(), Float.valueOf(100.0f - idle), ThreadPool.decrSize(pool)});
                    decrementCounter = 0;
                }
            } else {
                decrementCounter = 0;
            }
            if (active >= size && cpuLoad.lt(waterMark.getLow())) {
                if (++incrementCounter >= conf.samplingTimes) {
                    log.info("{} Threshold={}[{}-{}]% increment to {}", new Object[]{prefix, cpuLoad, waterMark.getLow(), waterMark.getHigh(), ThreadPool.incrSize(pool)});
                    incrementCounter = 0;
                }
            } else {
                incrementCounter = 0;
            }
            tuple.middle = decrementCounter;
            tuple.right = incrementCounter;
        }

        public void register(ThreadPoolExecutor pool, IntWaterMark cpuWaterMark) {
            if (cpuWaterMark == null) {
                return;
            }
            this.holder.put(pool, BiTuple.of(cpuWaterMark, 0, 0));
        }
    }

    static class FutureTaskAdapter<T>
    extends FutureTask<T> {
        final Task<T> task;

        public FutureTaskAdapter(Callable<T> callable) {
            super(callable);
            this.task = Task.as(callable);
        }

        public FutureTaskAdapter(Runnable runnable, T result) {
            super(runnable, result);
            this.task = Task.as(runnable);
        }
    }

    static class Task<T>
    implements Runnable,
    Callable<T>,
    Supplier<T> {
        final Func<T> fn;
        final FlagsEnum<RunFlag> flags;
        final Object id;
        final InternalThreadLocalMap parent;
        final String traceId;

        static <T> Task<T> adapt(Callable<T> fn) {
            Task<Object> t = Task.as(fn);
            return t != null ? t : new Task<Object>(fn::call, null, null);
        }

        static <T> Task<T> adapt(Runnable fn) {
            Task<Object> t = Task.as(fn);
            return t != null ? t : new Task<Object>(() -> {
                fn.run();
                return null;
            }, null, null);
        }

        static <T> Task<T> as(Object fn) {
            return fn instanceof Task ? (Task)fn : null;
        }

        Task(Func<T> fn, FlagsEnum<RunFlag> flags, Object id) {
            if (flags == null) {
                flags = RunFlag.NONE.flags();
            }
            if (RxConfig.INSTANCE.threadPool.traceName != null) {
                flags.add(new RunFlag[]{RunFlag.THREAD_TRACE});
            }
            this.fn = fn;
            this.flags = flags;
            this.id = id;
            this.parent = flags.has(new RunFlag[]{RunFlag.INHERIT_FAST_THREAD_LOCALS}) ? InternalThreadLocalMap.getIfSet() : null;
            this.traceId = CTX_TRACE_ID.get();
        }

        @Override
        public T call() {
            try {
                return this.fn.invoke();
            }
            catch (Throwable e) {
                TraceHandler.INSTANCE.log(this.toString(), e);
                throw e;
            }
        }

        @Override
        public void run() {
            this.call();
        }

        @Override
        public T get() {
            return this.call();
        }

        public String toString() {
            String hc = this.id != null ? this.id.toString() : Integer.toHexString(this.hashCode());
            return String.format("Task-%s[%s]", hc, this.flags.getValue());
        }
    }

    public static class ThreadQueue
    extends LinkedTransferQueue<Runnable> {
        private static final long serialVersionUID = 4283369202482437480L;
        private ThreadPool pool;
        final int queueCapacity;
        final AtomicInteger counter = new AtomicInteger();

        public boolean isFullLoad() {
            return this.counter.get() >= this.queueCapacity;
        }

        @Override
        public boolean isEmpty() {
            return this.counter.get() == 0;
        }

        @Override
        public int size() {
            return this.counter.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean offer(Runnable r) {
            if (this.isFullLoad()) {
                boolean logged = false;
                while (this.isFullLoad()) {
                    if (!logged) {
                        log.warn("Block caller thread until queue[{}/{}] polled then offer {}", new Object[]{this.counter.get(), this.queueCapacity, r});
                        logged = true;
                    }
                    ThreadQueue threadQueue = this;
                    synchronized (threadQueue) {
                        this.wait(500L);
                    }
                }
                log.debug("Wait poll ok");
            }
            this.counter.incrementAndGet();
            Task task = this.pool.setTask(r);
            if (task != null && task.flags.has(new RunFlag[]{RunFlag.TRANSFER})) {
                super.transfer(r);
                return true;
            }
            return super.offer(r);
        }

        @Override
        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
            boolean ok = true;
            try {
                Runnable r = (Runnable)super.poll(timeout, unit);
                ok = r != null;
                Runnable runnable = r;
                return runnable;
            }
            catch (InterruptedException e) {
                ok = false;
                throw e;
            }
            finally {
                if (ok) {
                    log.debug("Notify poll");
                    this.doNotify();
                }
            }
        }

        @Override
        public Runnable take() throws InterruptedException {
            try {
                Runnable runnable = (Runnable)super.take();
                return runnable;
            }
            finally {
                log.debug("Notify take");
                this.doNotify();
            }
        }

        @Override
        public boolean remove(Object o) {
            boolean ok = super.remove(o);
            if (ok) {
                log.debug("Notify remove");
                this.doNotify();
            }
            return ok;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doNotify() {
            int c = this.counter.decrementAndGet();
            ThreadQueue threadQueue = this;
            synchronized (threadQueue) {
                if (c < 0) {
                    this.counter.set(super.size());
                    TraceHandler.INSTANCE.saveMetric(Constants.MetricName.THREAD_QUEUE_SIZE_ERROR.name(), String.format("FIX SIZE %s -> %s", c, this.counter));
                }
                this.notify();
            }
        }

        public ThreadQueue(int queueCapacity) {
            this.queueCapacity = queueCapacity;
        }
    }

    public static class MultiTaskFuture<T, TS> {
        final CompletableFuture<T> future;
        final CompletableFuture<TS>[] subFutures;

        public MultiTaskFuture(CompletableFuture<T> future, CompletableFuture<TS>[] subFutures) {
            this.future = future;
            this.subFutures = subFutures;
        }

        public CompletableFuture<T> getFuture() {
            return this.future;
        }

        public CompletableFuture<TS>[] getSubFutures() {
            return this.subFutures;
        }
    }
}

