/*
 * Decompiled with CFR 0.152.
 */
package top.hserver.core.queue;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import top.hserver.core.ioc.annotation.queue.QueueHandlerType;
import top.hserver.core.queue.QueueData;
import top.hserver.core.queue.QueueEventHandler;
import top.hserver.core.queue.QueueFactory;
import top.hserver.core.queue.QueueHandleMethod;
import top.hserver.core.queue.QueueInfo;
import top.hserver.core.server.util.NamedThreadFactory;

public class QueueFactoryImpl
implements QueueFactory {
    private Disruptor<QueueData> disruptor;

    @Override
    public void createQueue(String queueName, int bufferSize, QueueHandlerType queueHandlerType, List<QueueHandleMethod> queueHandleMethods) {
        this.disruptor = new Disruptor(QueueData::new, bufferSize, (ThreadFactory)new NamedThreadFactory("queue:" + queueName));
        Map<Integer, List<QueueHandleMethod>> collect = queueHandleMethods.stream().sorted(Comparator.comparingInt(QueueHandleMethod::getLevel)).collect(Collectors.groupingBy(QueueHandleMethod::getLevel));
        EventHandlerGroup eventHandlerGroup = null;
        Iterator<Integer> iterator = collect.keySet().iterator();
        int flag = 0;
        while (iterator.hasNext()) {
            int i;
            EventHandler[] queueEventHandlers;
            Integer next = iterator.next();
            List<QueueHandleMethod> handleMethods = collect.get(next);
            for (int i2 = 0; i2 < handleMethods.size(); ++i2) {
                QueueHandleMethod queueHandleMethod = handleMethods.get(i2);
                int size = queueHandleMethod.getSize();
                if (size <= 1) continue;
                for (int j = 0; j < size - 1; ++j) {
                    handleMethods.add(queueHandleMethod);
                }
                queueHandleMethod.setSize(1);
            }
            if (flag == 0) {
                queueEventHandlers = new QueueEventHandler[handleMethods.size()];
                for (i = 0; i < handleMethods.size(); ++i) {
                    QueueHandleMethod queueHandleMethod = handleMethods.get(i);
                    queueEventHandlers[i] = new QueueEventHandler(queueName, queueHandleMethod.getMethod());
                }
                eventHandlerGroup = queueHandlerType == QueueHandlerType.REPEAT ? this.disruptor.handleEventsWith(queueEventHandlers) : this.disruptor.handleEventsWithWorkerPool((WorkHandler[])queueEventHandlers);
                ++flag;
                continue;
            }
            queueEventHandlers = new QueueEventHandler[handleMethods.size()];
            for (i = 0; i < handleMethods.size(); ++i) {
                QueueHandleMethod queueHandleMethod = handleMethods.get(i);
                queueEventHandlers[i] = new QueueEventHandler(queueName, queueHandleMethod.getMethod());
            }
            if (queueHandlerType == QueueHandlerType.REPEAT) {
                eventHandlerGroup.then(queueEventHandlers);
                continue;
            }
            eventHandlerGroup.thenHandleEventsWithWorkerPool((WorkHandler[])queueEventHandlers);
        }
    }

    @Override
    public void start() {
        this.disruptor.start();
    }

    @Override
    public void stop() {
        this.disruptor.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void producer(QueueData queueData) {
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        long sequence = ringBuffer.next();
        try {
            QueueData rdata = (QueueData)ringBuffer.get(sequence);
            rdata.setArgs(queueData.getArgs());
            rdata.setfQueue(queueData.getfQueue());
            rdata.setQueueName(queueData.getQueueName());
            rdata.setThreadSize(queueData.getThreadSize());
        }
        finally {
            ringBuffer.publish(sequence);
        }
    }

    @Override
    public QueueInfo queueInfo() {
        QueueInfo queueInfo = new QueueInfo();
        queueInfo.setBufferSize(this.disruptor.getBufferSize());
        queueInfo.setCursor(this.disruptor.getCursor());
        queueInfo.setRemainQueueSize(this.disruptor.getRingBuffer().remainingCapacity());
        return queueInfo;
    }
}

