/*
 * Decompiled with CFR 0.152.
 */
package io.polaris.core.data.consumer;

import io.polaris.core.data.buffer.BufferChannel;
import io.polaris.core.data.consumer.BulkConsumerThread;
import io.polaris.core.data.consumer.IBulkConsumerDriver;
import io.polaris.core.data.consumer.IConsumer;
import java.util.ArrayList;
import java.util.List;

public class BulkConsumeDriver<T>
implements IBulkConsumerDriver<T> {
    private final List<BulkConsumerThread<T>> threads;
    private volatile boolean running = false;

    public BulkConsumeDriver(String name, int size, long thinkTime) {
        this.threads = new ArrayList<BulkConsumerThread<T>>(size);
        for (int i = 0; i < size; ++i) {
            BulkConsumerThread thread = new BulkConsumerThread("DataCarrier." + name + ".BulkConsumer." + i + "", thinkTime);
            thread.setDaemon(true);
            this.threads.add(thread);
        }
    }

    @Override
    public synchronized void add(BufferChannel<T> channel, IConsumer<T> consumer) {
        BulkConsumerThread<T> thread = this.getLowestPayload();
        thread.addBulk(channel, consumer);
    }

    private BulkConsumerThread<T> getLowestPayload() {
        BulkConsumerThread<T> winner = this.threads.get(0);
        for (int i = 1; i < this.threads.size(); ++i) {
            BulkConsumerThread<T> thread = this.threads.get(i);
            if (thread.size() >= winner.size()) continue;
            winner = thread;
        }
        return winner;
    }

    @Override
    public boolean isRunning(BufferChannel<T> channel) {
        return this.running;
    }

    @Override
    public void begin(BufferChannel<T> channel) {
        if (this.running) {
            return;
        }
        for (BulkConsumerThread<T> thread : this.threads) {
            thread.start();
        }
        this.running = true;
    }

    @Override
    public void close(BufferChannel<T> channel) {
        for (BulkConsumerThread<T> thread : this.threads) {
            thread.shutdown();
        }
    }
}

