package io.polaris.core.concurrent.pool;

import io.polaris.core.consts.SymbolConsts;
import io.polaris.core.log.ILogger;
import io.polaris.core.log.ILoggers;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

/* loaded from: input_file:io/polaris/core/concurrent/pool/PooledExecutor.class */
public class PooledExecutor<E> implements RunnableState<E> {
    private static final ILogger log = ILoggers.of((Class<?>) PooledExecutor.class);
    public static final int CORE_NUM = Runtime.getRuntime().availableProcessors();
    public static final float MAXIMUM_FACTOR = 8.0f;
    public static final int KEEP_ALIVE_TIME = 1000;
    private BlockingQueue<E> blockingQueue;
    private ThreadPoolExecutor pool;
    private RunnableStatistics statistics;
    private int poolSize = CORE_NUM;
    private int maximumPoolSize = CORE_NUM * 8;
    private int queueSize = 1000;
    private int errorLimit = -1;
    private boolean openStatistics = false;
    private volatile boolean running = false;
    private final Lock awaitLock = new ReentrantLock();
    private final Condition awaitCondition = this.awaitLock.newCondition();
    private final AtomicInteger activeCount = new AtomicInteger(0);
    private final List<Runnable> consumers = new CopyOnWriteArrayList();
    private final AtomicReference<Consumer<ErrorRecords<E>>> rejectConsumerRef = new AtomicReference<>();

    public static ThreadPoolExecutor newWorkerPool(int i, int i2) {
        return newWorkerPool(i, i2, 1000L);
    }

    public static ThreadPoolExecutor newWorkerPool(int i) {
        return newWorkerPool(i, (int) (i * 8.0f), 1000L);
    }

    public static ThreadPoolExecutor newWorkerPool() {
        return newWorkerPool(CORE_NUM);
    }

    public static ThreadPoolExecutor newWorkerPool(int i, int i2, long j) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(i, i2, j, TimeUnit.MILLISECONDS, new SynchronousQueue(true), Executors.defaultThreadFactory());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    @Override // io.polaris.core.concurrent.pool.RunnableStatisticsHolder
    public RunnableStatistics runnableStatistics() {
        return this.statistics;
    }

    @Override // io.polaris.core.concurrent.pool.RunnableState
    public boolean hasNext() {
        return this.running || !this.blockingQueue.isEmpty();
    }

    @Override // io.polaris.core.concurrent.pool.RunnableState
    public E next() {
        return this.blockingQueue.poll();
    }

    @Override // io.polaris.core.concurrent.pool.RunnableState
    public void incrementActiveCount() {
        this.activeCount.incrementAndGet();
    }

    @Override // io.polaris.core.concurrent.pool.RunnableState
    public void decrementActiveCount() {
        this.activeCount.decrementAndGet();
    }

    @Override // io.polaris.core.concurrent.pool.RunnableState
    public void notifyFinished() {
        this.awaitLock.lock();
        try {
            this.awaitCondition.signalAll();
        } finally {
            this.awaitLock.unlock();
        }
    }

    public void setRejectConsumer(Consumer<ErrorRecords<E>> consumer) {
        this.rejectConsumerRef.set(consumer);
    }

    public void addConsumer(Consumer<E> consumer) {
        addConsumer(1, consumer);
    }

    public <R> void addConsumer(TransactionConsumer<E, R> transactionConsumer) {
        addConsumer(1, transactionConsumer);
    }

    public void addConsumer(int i, Consumer<E> consumer) {
        if (this.running) {
            throw new IllegalStateException("正在运行中");
        }
        for (int i2 = 0; i2 < i; i2++) {
            this.consumers.add(RunnableDelegates.createDelegate(this, consumer, this.rejectConsumerRef));
        }
    }

    public <R> void addConsumer(int i, TransactionConsumer<E, R> transactionConsumer) {
        if (this.running) {
            throw new IllegalStateException("正在运行中");
        }
        for (int i2 = 0; i2 < i; i2++) {
            this.consumers.add(RunnableDelegates.createDelegate(this, transactionConsumer, this.rejectConsumerRef));
        }
    }

    public void start() {
        if (this.running) {
            throw new IllegalStateException("正在运行中");
        }
        if (this.consumers.isEmpty()) {
            throw new IllegalArgumentException("未提供消费者");
        }
        this.running = true;
        this.pool = newWorkerPool(this.poolSize, this.maximumPoolSize);
        if (this.openStatistics || this.errorLimit >= 0) {
            this.statistics = new RunnableStatistics(this.errorLimit);
        }
        this.blockingQueue = new ArrayBlockingQueue(this.queueSize, true);
        for (Runnable runnable : this.consumers) {
            try {
                this.pool.execute(runnable);
            } catch (RejectedExecutionException e) {
                extend();
                try {
                    this.pool.execute(runnable);
                } catch (RejectedExecutionException e2) {
                    return;
                }
            }
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    public void offer(Iterable<E> iterable) {
        Iterator<E> it = iterable.iterator();
        while (it.hasNext()) {
            offer((PooledExecutor<E>) it.next());
        }
    }

    @SafeVarargs
    public final void offer(E... eArr) {
        for (E e : eArr) {
            offer((PooledExecutor<E>) e);
        }
    }

    public boolean isExceedErrorLimit() {
        return this.statistics != null && this.statistics.isExceedErrorLimit();
    }

    public void offer(E e) {
        if (!this.running) {
            throw new IllegalStateException("状态已停止");
        }
        if (isExceedErrorLimit()) {
            throw new IllegalStateException("处理失败数量超限(" + getErrorLimit() + SymbolConsts.RIGHT_PARENTHESIS);
        }
        while (!this.blockingQueue.offer(e, 1L, TimeUnit.MILLISECONDS)) {
            try {
            } catch (InterruptedException e2) {
                log.trace("", e2);
            }
        }
    }

    public void await() {
        while (true) {
            if (!this.running && this.activeCount.get() <= 0 && this.blockingQueue.isEmpty()) {
                break;
            }
            this.awaitLock.lock();
            try {
                this.awaitCondition.await(1L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.trace("", e);
            } finally {
                this.awaitLock.unlock();
            }
        }
        if (this.statistics != null) {
            log.info("total: {}, success: {}, error: {}", Long.valueOf(this.statistics.getTotal().get()), Long.valueOf(this.statistics.getSuccess().get()), Long.valueOf(this.statistics.getError().get()));
        }
    }

    public void shutdown() {
        this.running = false;
        await();
        this.consumers.clear();
        closePool();
    }

    private void closePool() {
        if (this.pool != null) {
            ThreadPoolExecutor threadPoolExecutor = this.pool;
            this.pool = null;
            try {
                threadPoolExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (!threadPoolExecutor.isShutdown()) {
                threadPoolExecutor.shutdownNow();
                if (!threadPoolExecutor.isShutdown()) {
                    threadPoolExecutor.shutdown();
                }
            }
            log.info("关闭线程池完成");
        }
    }

    private void extend() {
        if (this.pool.getMaximumPoolSize() < CORE_NUM * 100) {
            this.pool.setMaximumPoolSize(this.pool.getMaximumPoolSize() + CORE_NUM);
            this.pool.setCorePoolSize(this.pool.getCorePoolSize() + CORE_NUM);
        }
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setMaximumPoolSize(int i) {
        this.maximumPoolSize = i;
    }

    public int getMaximumPoolSize() {
        return this.maximumPoolSize;
    }

    public void setQueueSize(int i) {
        this.queueSize = i;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setErrorLimit(int i) {
        this.errorLimit = i;
    }

    public int getErrorLimit() {
        return this.errorLimit;
    }

    public void setOpenStatistics(boolean z) {
        this.openStatistics = z;
    }

    public boolean isOpenStatistics() {
        return this.openStatistics;
    }
}
