/*
 * Decompiled with CFR 0.152.
 */
package io.polaris.core.data.consumer;

import io.polaris.core.data.buffer.BufferChannel;
import io.polaris.core.data.consumer.ConsumerThread;
import io.polaris.core.data.consumer.IConsumer;
import io.polaris.core.data.consumer.IConsumerDriver;
import java.util.concurrent.locks.ReentrantLock;

public class ConsumeDriver<T>
implements IConsumerDriver<T> {
    private boolean running = false;
    private final ConsumerThread<T>[] threads;
    private final BufferChannel<T> channel;
    private final ReentrantLock lock;

    private ConsumeDriver(BufferChannel<T> channel, int num) {
        this.channel = channel;
        this.threads = new ConsumerThread[num];
        this.lock = new ReentrantLock();
    }

    public ConsumeDriver(String name, BufferChannel<T> channel, IConsumer<T> consumer, int num, long thinkTime) {
        this(channel, num);
        for (int i = 0; i < num; ++i) {
            this.threads[i] = new ConsumerThread<T>("DataCarrier." + name + ".Consumer." + i + "", consumer, thinkTime);
            this.threads[i].setDaemon(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void begin(BufferChannel<T> channel) {
        if (this.running) {
            return;
        }
        this.lock.lock();
        try {
            this.allocateBuffer2Thread();
            for (ConsumerThread<T> thread : this.threads) {
                thread.start();
            }
            this.running = true;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public boolean isRunning(BufferChannel<T> channel) {
        return this.running;
    }

    private void allocateBuffer2Thread() {
        int bufferCount = this.channel.getBufferCount();
        for (int idx = 0; idx < bufferCount; ++idx) {
            int consumerIndex = idx % this.threads.length;
            this.threads[consumerIndex].addSource(this.channel.getBuffer(idx));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(BufferChannel<T> channel) {
        this.lock.lock();
        try {
            this.running = false;
            for (ConsumerThread<T> thread : this.threads) {
                thread.shutdown();
            }
        }
        finally {
            this.lock.unlock();
        }
    }
}

