package risesoft.data.transfer.base.executor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import risesoft.data.transfer.core.close.Closed;
import risesoft.data.transfer.core.exception.CommonErrorCode;
import risesoft.data.transfer.core.exception.TransferException;
import risesoft.data.transfer.core.executor.Executor;
import risesoft.data.transfer.core.executor.ExecutorFacotry;
import risesoft.data.transfer.core.executor.ExecutorListener;
import risesoft.data.transfer.core.executor.ExecutorTaskQueue;
import risesoft.data.transfer.core.log.Logger;
import risesoft.data.transfer.core.log.LoggerFactory;
import risesoft.data.transfer.core.util.CloseUtils;
import risesoft.data.transfer.core.util.Configuration;
import risesoft.data.transfer.core.util.pool.ObjectPool;
import risesoft.data.transfer.core.util.pool.SyncObjectPool;

/* loaded from: input_file:risesoft/data/transfer/base/executor/ConcurrentThreadExecutorTaskQueue.class */
public class ConcurrentThreadExecutorTaskQueue implements ExecutorTaskQueue {
    private ExecutorListener executorListener;
    private ObjectPool<Executor> executorPool;
    private ExecutorFacotry executor;
    private Logger logger;
    private int size;
    private ConcurrentLinkedQueue<Object> linkedQueue = new ConcurrentLinkedQueue<>();
    private boolean isStart = false;
    private volatile boolean isShutdown = false;
    private Object source = this;

    public ConcurrentThreadExecutorTaskQueue(Configuration configuration, LoggerFactory loggerFactory) {
        this.size = configuration.getInt("size", 5).intValue();
        this.logger = loggerFactory.getLogger(configuration.getString("name", "ThreadPoolExecutorTaskQueue"));
        this.executorPool = new SyncObjectPool(this.size, () -> {
            if (this.logger.isDebug()) {
                this.logger.debug(this.source, "create executor instance:" + this.executorPool.getConcurrentSize() + " created instance size:" + this.size);
            }
            return this.executor.getInstance();
        });
        if (this.logger.isInfo()) {
            this.logger.info(this.source, "inited max size " + this.size);
        }
    }

    public synchronized void close() throws Exception {
        int concurrentSize = this.executorPool.getConcurrentSize();
        this.logger.info(this, "close:" + this.executorPool.getConcurrentSize());
        for (int i = 0; i < concurrentSize; i++) {
            CloseUtils.close((Closed) this.executorPool.getInstance());
        }
        this.executor.close();
        this.isStart = false;
    }

    public void add(Object obj) {
        if (!this.isStart) {
            throw TransferException.as(CommonErrorCode.CONFIG_ERROR, "不支持在未启动的状态下执行任务,请确保您未将此实现类用于生产者,生产者必须是异步的线程!");
        }
        this.linkedQueue.add(obj);
        runJob();
    }

    public void addBatch(Collection collection) {
        Iterator it = collection.iterator();
        while (it.hasNext()) {
            add(it.next());
        }
    }

    public Collection<Object> getResidue() {
        return new ArrayList();
    }

    public int getResidueSize() {
        return this.executorPool.getConcurrentSize();
    }

    public void setExecutorFacoty(ExecutorFacotry executorFacotry) {
        this.executor = executorFacotry;
    }

    public void start() {
        this.executorPool.clear();
        this.executorListener.start();
        this.isStart = true;
    }

    public int getExecutorSize() {
        return this.size;
    }

    public void shutdown() throws Exception {
        this.logger.info(this, "shutdown");
        this.isShutdown = true;
        close();
    }

    public void setExecutorListener(ExecutorListener executorListener) {
        this.executorListener = executorListener;
    }

    private void runJob() {
        try {
            if (this.isShutdown) {
                return;
            }
            if (this.logger.isDebug()) {
                this.logger.debug(this.source, "run job: ");
            }
            Executor executor = (Executor) this.executorPool.getInstance();
            Object poll = this.linkedQueue.poll();
            this.executorListener.taskStart(poll);
            if (this.isShutdown) {
                return;
            }
            executor.run(poll);
            this.executorPool.back(executor);
            this.logger.debug(this.source, "end job");
            if (this.linkedQueue.size() == 0) {
                this.logger.debug(this.source, "task end");
                this.executorListener.taskEnd(poll);
            }
        } catch (Throwable th) {
            this.logger.error(this.source, th.getMessage());
            this.executorListener.onError(th);
        }
    }
}
