/*
 * Decompiled with CFR 0.152.
 */
package io.polaris.core.concurrent.pool;

import io.polaris.core.concurrent.pool.ErrorRecords;
import io.polaris.core.concurrent.pool.RunnableDelegates;
import io.polaris.core.concurrent.pool.RunnableState;
import io.polaris.core.concurrent.pool.RunnableStatistics;
import io.polaris.core.concurrent.pool.TransactionConsumer;
import io.polaris.core.log.ILogger;
import io.polaris.core.log.ILoggers;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

public class RoutingPooledExecutor<E> {
    private static final ILogger log = ILoggers.of(RoutingPooledExecutor.class);
    private int queueSize = 1000;
    private int errorLimit = -1;
    private boolean openStatistics = false;
    private Function<E, Integer> router = Objects::hashCode;
    private RunnableStatistics statistics;
    private final AtomicReference<Consumer<ErrorRecords<E>>> rejectConsumerRef = new AtomicReference();
    private final List<Executor<E>> executors = new CopyOnWriteArrayList<Executor<E>>();

    public void setRejectConsumer(Consumer<ErrorRecords<E>> rejectConsumer) {
        this.rejectConsumerRef.set(rejectConsumer);
    }

    public void addConsumer(Consumer<E> consumer) {
        this.addConsumer(1, consumer);
    }

    public <R> void addConsumer(TransactionConsumer<E, R> consumer) {
        this.addConsumer(1, consumer);
    }

    public void addConsumer(int count, Consumer<E> consumer) {
        if (this.isRunning()) {
            throw new IllegalStateException("\u6b63\u5728\u8fd0\u884c\u4e2d");
        }
        for (int k = 0; k < count; ++k) {
            Executor executor = new Executor();
            Runnable runnable = RunnableDelegates.createDelegate(executor, consumer, this.rejectConsumerRef);
            executor.setRunnable(runnable);
            this.executors.add(executor);
        }
    }

    public <R> void addConsumer(int count, TransactionConsumer<E, R> consumer) {
        if (this.isRunning()) {
            throw new IllegalStateException("\u6b63\u5728\u8fd0\u884c\u4e2d");
        }
        for (int k = 0; k < count; ++k) {
            Executor executor = new Executor();
            Runnable runnable = RunnableDelegates.createDelegate(executor, consumer, this.rejectConsumerRef);
            executor.setRunnable(runnable);
            this.executors.add(executor);
        }
    }

    public void start() {
        int count;
        if (this.isRunning()) {
            throw new IllegalStateException("\u6b63\u5728\u8fd0\u884c\u4e2d");
        }
        if (this.executors.isEmpty()) {
            throw new IllegalArgumentException("\u672a\u63d0\u4f9b\u6d88\u8d39\u8005");
        }
        if (this.openStatistics || this.errorLimit >= 0) {
            this.statistics = new RunnableStatistics(this.errorLimit);
        }
        if ((count = this.executors.size()) > 0) {
            for (int i = 0; i < count; ++i) {
                Executor<E> executor = this.executors.get(i);
                ((Executor)executor).start();
            }
        }
    }

    public boolean isRunning() {
        int count = this.executors.size();
        if (count > 0) {
            for (int i = 0; i < count; ++i) {
                Executor<E> executor = this.executors.get(i);
                if (!((Executor)executor).running) continue;
                return true;
            }
        }
        return false;
    }

    public void offer(Iterable<E> datas) {
        for (E data : datas) {
            this.offer(data);
        }
    }

    @SafeVarargs
    public final void offer(E ... datas) {
        if (datas.length == 0) {
            return;
        }
        if (datas.length == 1) {
            this.offer((int)this.router.apply(datas[0]), datas[0]);
            return;
        }
        for (E data : datas) {
            this.offer((int)this.router.apply(data), data);
        }
    }

    public void offer(E data) {
        this.offer((int)this.router.apply(data), data);
    }

    public void offer(int index, Iterable<E> datas) {
        Executor<E> executor = this.getExecutor(index);
        for (E data : datas) {
            ((Executor)executor).offer(data);
        }
    }

    @SafeVarargs
    public final void offer(int index, E ... datas) {
        Executor<E> executor = this.getExecutor(index);
        for (E data : datas) {
            ((Executor)executor).offer(data);
        }
    }

    public void offer(int index, E data) {
        Executor<E> executor = this.getExecutor(index);
        ((Executor)executor).offer(data);
    }

    private Executor<E> getExecutor(int index) {
        index = Math.abs(index) % this.executors.size();
        return this.executors.get(index);
    }

    public void await() {
        int count = this.executors.size();
        if (count > 0) {
            for (int i = 0; i < count; ++i) {
                Executor<E> executor = this.executors.get(i);
                ((Executor)executor).await();
            }
        }
        if (this.statistics != null) {
            log.info("total: {}, success: {}, error: {}", this.statistics.getTotal().get(), this.statistics.getSuccess().get(), this.statistics.getError().get());
        }
    }

    public void shutdown() {
        int count = this.executors.size();
        if (count > 0) {
            for (int i = 0; i < count; ++i) {
                Executor<E> executor = this.executors.get(i);
                executor.setRunning(false);
            }
        }
        this.await();
        this.executors.clear();
    }

    public boolean isExceedErrorLimit() {
        return this.statistics != null && this.statistics.isExceedErrorLimit();
    }

    public int getExecutorCount() {
        return this.executors.size();
    }

    public RunnableStatistics runnableStatistics() {
        return this.statistics;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setErrorLimit(int errorLimit) {
        this.errorLimit = errorLimit;
    }

    public int getErrorLimit() {
        return this.errorLimit;
    }

    public void setOpenStatistics(boolean openStatistics) {
        this.openStatistics = openStatistics;
    }

    public boolean isOpenStatistics() {
        return this.openStatistics;
    }

    public void setRouter(Function<E, Integer> router) {
        this.router = router;
    }

    public Function<E, Integer> getRouter() {
        return this.router;
    }

    private class Executor<E>
    implements RunnableState<E> {
        private volatile boolean running = false;
        private BlockingQueue<E> blockingQueue;
        private final Lock awaitLock = new ReentrantLock();
        private final AtomicInteger activeCount = new AtomicInteger();
        private final Condition awaitCondition = this.awaitLock.newCondition();
        private Runnable runnable;

        private Executor() {
        }

        @Override
        public void notifyFinished() {
            this.awaitLock.lock();
            try {
                this.awaitCondition.signalAll();
            }
            finally {
                this.awaitLock.unlock();
            }
        }

        public void setRunnable(Runnable runnable) {
            this.runnable = runnable;
        }

        public void setRunning(boolean running) {
            this.running = running;
        }

        @Override
        public RunnableStatistics runnableStatistics() {
            return RoutingPooledExecutor.this.statistics;
        }

        @Override
        public void incrementActiveCount() {
            this.activeCount.incrementAndGet();
        }

        @Override
        public void decrementActiveCount() {
            this.activeCount.decrementAndGet();
        }

        @Override
        public boolean hasNext() {
            return this.running || !this.blockingQueue.isEmpty();
        }

        @Override
        public E next() {
            return this.blockingQueue.poll();
        }

        private void start() {
            if (this.running) {
                throw new IllegalStateException("\u6b63\u5728\u8fd0\u884c\u4e2d");
            }
            this.blockingQueue = new ArrayBlockingQueue(RoutingPooledExecutor.this.queueSize, true);
            this.running = true;
            new Thread(this.runnable).start();
        }

        private void await() {
            while (this.running || this.activeCount.get() > 0) {
                this.awaitLock.lock();
                try {
                    this.awaitCondition.await(100L, TimeUnit.NANOSECONDS);
                }
                catch (InterruptedException e) {
                    log.trace("", e);
                }
                finally {
                    this.awaitLock.unlock();
                }
            }
        }

        private void offer(E data) {
            if (!this.running) {
                throw new IllegalStateException("\u72b6\u6001\u5df2\u505c\u6b62");
            }
            if (RoutingPooledExecutor.this.isExceedErrorLimit()) {
                throw new IllegalStateException("\u5904\u7406\u5931\u8d25\u6570\u91cf\u8d85\u9650(" + RoutingPooledExecutor.this.getErrorLimit() + ")");
            }
            while (true) {
                try {
                    boolean rs;
                    while (!(rs = this.blockingQueue.offer(data, 10L, TimeUnit.NANOSECONDS))) {
                    }
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
        }
    }
}

