package itez.kit.queue;

import com.jfinal.kit.Kv;
import itez.kit.ELog;
import itez.kit.log.ELogBase;
import itez.kit.log.ExceptionUtil;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:itez/kit/queue/QueueFactory.class */
public class QueueFactory {
    private IQueueHandle handle;
    private final BlockingQueue<Kv> queue;
    private final ExecutorService customerPool;
    private final int maxQueueSize;
    private final int threadPoolSize;
    private boolean longTerm;
    private final ELogBase log = ELog.log(getClass());
    private boolean factoryState = false;

    /* loaded from: input_file:itez/kit/queue/QueueFactory$QueueThread.class */
    class QueueThread implements Runnable {
        private static final long THREAD_SLEEP = 1000;
        private AtomicInteger count = new AtomicInteger(0);

        QueueThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            QueueFactory.this.log.info("消费者线程启动：{}", Long.valueOf(Thread.currentThread().getId()));
            while (QueueFactory.this.getFactoryState()) {
                try {
                    Kv poll = QueueFactory.this.poll();
                    if (poll != null) {
                        QueueFactory.this.log.info("消费者线程激活：{}，消费总数：{}，队列长度：{}", Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.count.incrementAndGet()), Integer.valueOf(QueueFactory.this.getQueueSize()));
                        QueueFactory.this.getHandle().handle(poll);
                    } else if (QueueFactory.this.getLongTerm()) {
                        QueueFactory.this.log.info("消费者线程休眠：{}，消费总数：{}，队列长度：{}", Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.count.get()), Integer.valueOf(QueueFactory.this.getQueueSize()));
                        Thread.sleep(THREAD_SLEEP);
                    } else {
                        QueueFactory.this.log.info("消费者线程停止：{}，消费总数：{}，队列长度：{}", Long.valueOf(Thread.currentThread().getId()), Integer.valueOf(this.count.get()), Integer.valueOf(QueueFactory.this.getQueueSize()));
                        if (QueueFactory.this.getFactoryState()) {
                            QueueFactory.this.getHandle().callback();
                        }
                        QueueFactory.this.stop();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    QueueFactory.this.log.error("消费者线程异常退出：{}", ExceptionUtil.getMessage(e));
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueFactory(IQueueHandle iQueueHandle, int i, int i2, boolean z) {
        this.longTerm = false;
        if (iQueueHandle == null) {
            throw new RuntimeException("未发现有效的QueueHandle");
        }
        this.handle = iQueueHandle;
        this.maxQueueSize = i;
        this.threadPoolSize = i2;
        this.longTerm = z;
        this.queue = new LinkedBlockingQueue(i);
        this.customerPool = Executors.newFixedThreadPool(i2);
    }

    public void start() {
        if (this.factoryState) {
            return;
        }
        for (int i = 0; i < this.threadPoolSize; i++) {
            this.customerPool.execute(new QueueThread());
        }
        this.factoryState = true;
    }

    public void stop() {
        stop(false);
    }

    public void stopNow() {
        stop(true);
    }

    private void stop(boolean z) {
        if (this.factoryState) {
            if (z) {
                this.log.info("线程池已停止，未处理的任务数：{}", Integer.valueOf(this.customerPool.shutdownNow().size()));
            } else {
                this.customerPool.shutdown();
                try {
                    this.customerPool.awaitTermination(3L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                }
            }
            this.factoryState = false;
        }
    }

    public void offer(Kv kv) {
        if (this.queue.size() >= this.maxQueueSize) {
            throw new RuntimeException("队列已满");
        }
        if (!this.queue.offer(kv)) {
            throw new RuntimeException("插入队列失败");
        }
    }

    public Kv poll() {
        return this.queue.poll();
    }

    public Kv poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public int getQueueSize() {
        return this.queue.size();
    }

    public IQueueHandle getHandle() {
        return this.handle;
    }

    public int getMaxQueueSize() {
        return this.maxQueueSize;
    }

    public int getThreadPoolSize() {
        return this.threadPoolSize;
    }

    public boolean getFactoryState() {
        return this.factoryState;
    }

    public boolean getLongTerm() {
        return this.longTerm;
    }
}
