package io.polaris.core.data.consumer;

import io.polaris.core.data.buffer.BufferChannel;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:io/polaris/core/data/consumer/ConsumeDriver.class */
public class ConsumeDriver<T> implements IConsumerDriver<T> {
    private boolean running;
    private final ConsumerThread<T>[] threads;
    private final BufferChannel<T> channel;
    private final ReentrantLock lock;

    private ConsumeDriver(BufferChannel<T> bufferChannel, int i) {
        this.running = false;
        this.channel = bufferChannel;
        this.threads = new ConsumerThread[i];
        this.lock = new ReentrantLock();
    }

    public ConsumeDriver(String str, BufferChannel<T> bufferChannel, IConsumer<T> iConsumer, int i, long j) {
        this(bufferChannel, i);
        for (int i2 = 0; i2 < i; i2++) {
            this.threads[i2] = new ConsumerThread<>("DataCarrier." + str + ".Consumer." + i2 + "", iConsumer, j);
            this.threads[i2].setDaemon(true);
        }
    }

    @Override // io.polaris.core.data.consumer.IConsumerDriver
    public void begin(BufferChannel<T> bufferChannel) {
        if (this.running) {
            return;
        }
        this.lock.lock();
        try {
            allocateBuffer2Thread();
            for (ConsumerThread<T> consumerThread : this.threads) {
                consumerThread.start();
            }
            this.running = true;
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // io.polaris.core.data.consumer.IConsumerDriver
    public boolean isRunning(BufferChannel<T> bufferChannel) {
        return this.running;
    }

    private void allocateBuffer2Thread() {
        int bufferCount = this.channel.getBufferCount();
        for (int i = 0; i < bufferCount; i++) {
            this.threads[i % this.threads.length].addSource(this.channel.getBuffer(i));
        }
    }

    @Override // io.polaris.core.data.consumer.IConsumerDriver
    public void close(BufferChannel<T> bufferChannel) {
        this.lock.lock();
        try {
            this.running = false;
            for (ConsumerThread<T> consumerThread : this.threads) {
                consumerThread.shutdown();
            }
        } finally {
            this.lock.unlock();
        }
    }
}
