package org.wowtools.common.pcm.ltp;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wowtools.common.pcm.Customer;
import org.wowtools.common.utils.AsyncTaskUtil;

/* loaded from: input_file:org/wowtools/common/pcm/ltp/LtpPcmTask.class */
public class LtpPcmTask<T> {
    private static final Logger log = LoggerFactory.getLogger(LtpPcmTask.class);
    private final Collection<LtpProducer<T>> producers;
    private final Collection<Customer<T>> customers;
    private final LtpBufferPool<T> bufferPool;
    private final AtomicBoolean allFinish;
    private final long waitMilliSecones;

    /* loaded from: input_file:org/wowtools/common/pcm/ltp/LtpPcmTask$ExceptionCell.class */
    private static class ExceptionCell {
        Object o;

        private ExceptionCell() {
        }
    }

    public LtpPcmTask(Collection<LtpProducer<T>> collection, Collection<Customer<T>> collection2, int i, long j) {
        this.allFinish = new AtomicBoolean(false);
        this.producers = collection;
        this.customers = collection2;
        this.bufferPool = new LtpBufferPool<>(i, j);
        this.waitMilliSecones = j;
    }

    public LtpPcmTask(LtpProducer<T> ltpProducer, Collection<Customer<T>> collection, int i, long j) {
        this.allFinish = new AtomicBoolean(false);
        this.producers = new ArrayList(1);
        this.producers.add(ltpProducer);
        this.customers = collection;
        this.bufferPool = new LtpBufferPool<>(i, j);
        this.waitMilliSecones = j;
    }

    public LtpPcmTask(Collection<LtpProducer<T>> collection, Customer<T> customer, int i, long j) {
        this.allFinish = new AtomicBoolean(false);
        this.producers = collection;
        this.customers = new ArrayList(1);
        this.customers.add(customer);
        this.bufferPool = new LtpBufferPool<>(i, j);
        this.waitMilliSecones = j;
    }

    public LtpPcmTask(LtpProducer<T> ltpProducer, Customer<T> customer, int i, long j) {
        this.allFinish = new AtomicBoolean(false);
        this.producers = new ArrayList(1);
        this.producers.add(ltpProducer);
        this.customers = new ArrayList(1);
        this.customers.add(customer);
        this.bufferPool = new LtpBufferPool<>(i, j);
        this.waitMilliSecones = j;
    }

    public void startTask(boolean z) {
        boolean tryAcquire;
        ExceptionCell exceptionCell = new ExceptionCell();
        for (LtpProducer<T> ltpProducer : this.producers) {
            AsyncTaskUtil.execute(() -> {
                Object produce;
                while (!ltpProducer.isFinish() && exceptionCell.o == null && null != (produce = ltpProducer.produce())) {
                    try {
                        this.bufferPool.add(produce);
                    } catch (Exception e) {
                        log.warn("生产者执行异常", e);
                        exceptionCell.o = ltpProducer;
                        return;
                    }
                }
            });
        }
        Semaphore semaphore = new Semaphore(0);
        for (Customer<T> customer : this.customers) {
            AsyncTaskUtil.execute(() -> {
                while (true) {
                    try {
                        T take = this.bufferPool.take();
                        if (null == take) {
                            if (exceptionCell.o != null) {
                                break;
                            } else if (isAllFinish()) {
                                break;
                            }
                        } else {
                            customer.consume(take);
                        }
                    } catch (Exception e) {
                        log.warn("消费者执行异常", e);
                        exceptionCell.o = customer;
                    }
                }
                semaphore.release();
            });
        }
        if (!z) {
            return;
        }
        do {
            try {
                tryAcquire = semaphore.tryAcquire(this.customers.size(), this.waitMilliSecones, TimeUnit.MILLISECONDS);
                if (!tryAcquire && exceptionCell.o != null) {
                    log.warn(exceptionCell.o.getClass() + "执行异常，退出");
                    return;
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } while (!tryAcquire);
    }

    private boolean isAllFinish() {
        if (this.allFinish.get()) {
            return true;
        }
        Iterator<LtpProducer<T>> it = this.producers.iterator();
        while (it.hasNext()) {
            if (!it.next().isFinish()) {
                return false;
            }
        }
        this.allFinish.set(true);
        return true;
    }
}
