package io.polaris.core.concurrent.pool;

import io.polaris.core.consts.SymbolConsts;
import io.polaris.core.log.ILogger;
import io.polaris.core.log.ILoggers;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/polaris/core/concurrent/pool/RoutingPooledExecutor.class */
public class RoutingPooledExecutor<E> {
    private static final ILogger log = ILoggers.of((Class<?>) RoutingPooledExecutor.class);
    private RunnableStatistics statistics;
    private int queueSize = 1000;
    private int errorLimit = -1;
    private boolean openStatistics = false;
    private Function<E, Integer> router = Objects::hashCode;
    private final AtomicReference<Consumer<ErrorRecords<E>>> rejectConsumerRef = new AtomicReference<>();
    private final List<RoutingPooledExecutor<E>.Executor<E>> executors = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/polaris/core/concurrent/pool/RoutingPooledExecutor$Executor.class */
    public class Executor<E> implements RunnableState<E> {
        private volatile boolean running;
        private BlockingQueue<E> blockingQueue;
        private final Lock awaitLock;
        private final AtomicInteger activeCount;
        private final Condition awaitCondition;
        private Runnable runnable;

        private Executor() {
            this.running = false;
            this.awaitLock = new ReentrantLock();
            this.activeCount = new AtomicInteger();
            this.awaitCondition = this.awaitLock.newCondition();
        }

        @Override // io.polaris.core.concurrent.pool.RunnableState
        public void notifyFinished() {
            this.awaitLock.lock();
            try {
                this.awaitCondition.signalAll();
            } finally {
                this.awaitLock.unlock();
            }
        }

        public void setRunnable(Runnable runnable) {
            this.runnable = runnable;
        }

        public void setRunning(boolean z) {
            this.running = z;
        }

        @Override // io.polaris.core.concurrent.pool.RunnableStatisticsHolder
        public RunnableStatistics runnableStatistics() {
            return RoutingPooledExecutor.this.statistics;
        }

        @Override // io.polaris.core.concurrent.pool.RunnableState
        public void incrementActiveCount() {
            this.activeCount.incrementAndGet();
        }

        @Override // io.polaris.core.concurrent.pool.RunnableState
        public void decrementActiveCount() {
            this.activeCount.decrementAndGet();
        }

        @Override // io.polaris.core.concurrent.pool.RunnableState
        public boolean hasNext() {
            return this.running || !this.blockingQueue.isEmpty();
        }

        @Override // io.polaris.core.concurrent.pool.RunnableState
        public E next() {
            return this.blockingQueue.poll();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void start() {
            if (this.running) {
                throw new IllegalStateException("正在运行中");
            }
            this.blockingQueue = new ArrayBlockingQueue(RoutingPooledExecutor.this.queueSize, true);
            this.running = true;
            new Thread(this.runnable).start();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void await() {
            while (true) {
                if (!this.running && this.activeCount.get() <= 0) {
                    return;
                }
                this.awaitLock.lock();
                try {
                    this.awaitCondition.await(100L, TimeUnit.NANOSECONDS);
                } catch (InterruptedException e) {
                    RoutingPooledExecutor.log.trace("", e);
                } finally {
                    this.awaitLock.unlock();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void offer(E e) {
            if (!this.running) {
                throw new IllegalStateException("状态已停止");
            }
            if (RoutingPooledExecutor.this.isExceedErrorLimit()) {
                throw new IllegalStateException("处理失败数量超限(" + RoutingPooledExecutor.this.getErrorLimit() + SymbolConsts.RIGHT_PARENTHESIS);
            }
            while (!this.blockingQueue.offer(e, 10L, TimeUnit.NANOSECONDS)) {
            }
        }
    }

    public void setRejectConsumer(Consumer<ErrorRecords<E>> consumer) {
        this.rejectConsumerRef.set(consumer);
    }

    public void addConsumer(Consumer<E> consumer) {
        addConsumer(1, consumer);
    }

    public <R> void addConsumer(TransactionConsumer<E, R> transactionConsumer) {
        addConsumer(1, transactionConsumer);
    }

    public void addConsumer(int i, Consumer<E> consumer) {
        if (isRunning()) {
            throw new IllegalStateException("正在运行中");
        }
        for (int i2 = 0; i2 < i; i2++) {
            RoutingPooledExecutor<E>.Executor<E> executor = new Executor<>();
            executor.setRunnable(RunnableDelegates.createDelegate(executor, consumer, this.rejectConsumerRef));
            this.executors.add(executor);
        }
    }

    public <R> void addConsumer(int i, TransactionConsumer<E, R> transactionConsumer) {
        if (isRunning()) {
            throw new IllegalStateException("正在运行中");
        }
        for (int i2 = 0; i2 < i; i2++) {
            RoutingPooledExecutor<E>.Executor<E> executor = new Executor<>();
            executor.setRunnable(RunnableDelegates.createDelegate(executor, transactionConsumer, this.rejectConsumerRef));
            this.executors.add(executor);
        }
    }

    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("正在运行中");
        }
        if (this.executors.isEmpty()) {
            throw new IllegalArgumentException("未提供消费者");
        }
        if (this.openStatistics || this.errorLimit >= 0) {
            this.statistics = new RunnableStatistics(this.errorLimit);
        }
        int size = this.executors.size();
        if (size > 0) {
            for (int i = 0; i < size; i++) {
                this.executors.get(i).start();
            }
        }
    }

    public boolean isRunning() {
        int size = this.executors.size();
        if (size <= 0) {
            return false;
        }
        for (int i = 0; i < size; i++) {
            if (((Executor) this.executors.get(i)).running) {
                return true;
            }
        }
        return false;
    }

    public void offer(Iterable<E> iterable) {
        Iterator<E> it = iterable.iterator();
        while (it.hasNext()) {
            offer((RoutingPooledExecutor<E>) it.next());
        }
    }

    @SafeVarargs
    public final void offer(E... eArr) {
        if (eArr.length == 0) {
            return;
        }
        if (eArr.length == 1) {
            offer(this.router.apply(eArr[0]).intValue(), (int) eArr[0]);
            return;
        }
        for (E e : eArr) {
            offer(this.router.apply(e).intValue(), (int) e);
        }
    }

    public void offer(E e) {
        offer(this.router.apply(e).intValue(), (int) e);
    }

    public void offer(int i, Iterable<E> iterable) {
        RoutingPooledExecutor<E>.Executor<E> executor = getExecutor(i);
        Iterator<E> it = iterable.iterator();
        while (it.hasNext()) {
            executor.offer(it.next());
        }
    }

    @SafeVarargs
    public final void offer(int i, E... eArr) {
        RoutingPooledExecutor<E>.Executor<E> executor = getExecutor(i);
        for (E e : eArr) {
            executor.offer(e);
        }
    }

    public void offer(int i, E e) {
        getExecutor(i).offer(e);
    }

    private RoutingPooledExecutor<E>.Executor<E> getExecutor(int i) {
        return this.executors.get(Math.abs(i) % this.executors.size());
    }

    public void await() {
        int size = this.executors.size();
        if (size > 0) {
            for (int i = 0; i < size; i++) {
                this.executors.get(i).await();
            }
        }
        if (this.statistics != null) {
            log.info("total: {}, success: {}, error: {}", Long.valueOf(this.statistics.getTotal().get()), Long.valueOf(this.statistics.getSuccess().get()), Long.valueOf(this.statistics.getError().get()));
        }
    }

    public void shutdown() {
        int size = this.executors.size();
        if (size > 0) {
            for (int i = 0; i < size; i++) {
                this.executors.get(i).setRunning(false);
            }
        }
        await();
        this.executors.clear();
    }

    public boolean isExceedErrorLimit() {
        return this.statistics != null && this.statistics.isExceedErrorLimit();
    }

    public int getExecutorCount() {
        return this.executors.size();
    }

    public RunnableStatistics runnableStatistics() {
        return this.statistics;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setErrorLimit(int i) {
        this.errorLimit = i;
    }

    public int getErrorLimit() {
        return this.errorLimit;
    }

    public void setOpenStatistics(boolean z) {
        this.openStatistics = z;
    }

    public boolean isOpenStatistics() {
        return this.openStatistics;
    }

    public void setRouter(Function<E, Integer> function) {
        this.router = function;
    }

    public Function<E, Integer> getRouter() {
        return this.router;
    }
}
