package cn.schoolwow.data.thread.module.producerConsumer.flow;

import cn.schoolwow.data.thread.module.parent.domain.QuickDataThreadConfig;
import cn.schoolwow.data.thread.module.producerConsumer.domain.ConsumerContext;
import cn.schoolwow.quickflow.domain.FlowContext;
import cn.schoolwow.quickflow.flow.BusinessFlow;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:cn/schoolwow/data/thread/module/producerConsumer/flow/InitialConsumerThreadFlow.class */
public class InitialConsumerThreadFlow implements BusinessFlow {
    public void executeBusinessFlow(FlowContext flowContext) throws Exception {
        for (ConsumerContext consumerContext : ((QuickDataThreadConfig) flowContext.checkData("quickDataThreadConfig")).consumerContextMap.values()) {
            consumerContext.threadPoolExecutor.execute(() -> {
                ArrayList arrayList = new ArrayList();
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    try {
                        Object poll = consumerContext.registerConsumerRequest.blockingQueue.poll(1L, TimeUnit.SECONDS);
                        if (null != poll) {
                            arrayList.add(poll);
                        }
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (!arrayList.isEmpty() && currentTimeMillis2 >= consumerContext.registerConsumerRequest.intervalTime) {
                            consumerContext.registerConsumerRequest.messageConsumer.consumeMessage(arrayList);
                            arrayList.clear();
                            currentTimeMillis = System.currentTimeMillis();
                        }
                        if (!arrayList.isEmpty() && arrayList.size() >= consumerContext.registerConsumerRequest.messageSize) {
                            consumerContext.registerConsumerRequest.messageConsumer.consumeMessage(arrayList);
                            arrayList.clear();
                            currentTimeMillis = System.currentTimeMillis();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    public String name() {
        return "初始化消费者线程";
    }
}
