package io.polaris.core.data.consumer;

import io.polaris.core.data.buffer.BufferChannel;
import io.polaris.core.tuple.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:io/polaris/core/data/consumer/BulkConsumerThread.class */
public class BulkConsumerThread<T> extends Thread {
    private volatile boolean running;
    private volatile List<Tuple2<BufferChannel<T>, IConsumer<T>>> bulkConsumers;
    private volatile long size;
    private final long thinkTime;

    public BulkConsumerThread(String str, long j) {
        super(str);
        this.running = true;
        this.bulkConsumers = new ArrayList();
        this.thinkTime = j;
    }

    BulkConsumerThread<T> copy() {
        BulkConsumerThread<T> bulkConsumerThread = new BulkConsumerThread<>(getName(), this.thinkTime);
        bulkConsumerThread.bulkConsumers = this.bulkConsumers;
        bulkConsumerThread.size = this.size;
        return bulkConsumerThread;
    }

    public void addBulk(BufferChannel<T> bufferChannel, IConsumer<T> iConsumer) {
        Tuple2 tuple2 = new Tuple2(bufferChannel, iConsumer);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.bulkConsumers);
        arrayList.add(tuple2);
        this.bulkConsumers = arrayList;
        this.size += bufferChannel.size();
    }

    public long size() {
        return this.size;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.running = true;
        ArrayList arrayList = new ArrayList(2000);
        while (this.running) {
            boolean z = false;
            Iterator<Tuple2<BufferChannel<T>, IConsumer<T>>> it = this.bulkConsumers.iterator();
            while (it.hasNext()) {
                z = z || consume(it.next(), arrayList);
            }
            if (!z) {
                try {
                    Thread.sleep(this.thinkTime);
                } catch (InterruptedException e) {
                }
            }
        }
        for (Tuple2<BufferChannel<T>, IConsumer<T>> tuple2 : this.bulkConsumers) {
            consume(tuple2, arrayList);
            tuple2.getSecond().onExit();
        }
    }

    private boolean consume(Tuple2<BufferChannel<T>, IConsumer<T>> tuple2, List<T> list) {
        for (int i = 0; i < tuple2.getFirst().getBufferCount(); i++) {
            tuple2.getFirst().getBuffer(i).drainTo(list);
        }
        if (list.isEmpty()) {
            return false;
        }
        try {
            try {
                tuple2.getSecond().consume(list);
                list.clear();
                return true;
            } catch (Throwable th) {
                tuple2.getSecond().onError(list, th);
                list.clear();
                return true;
            }
        } catch (Throwable th2) {
            list.clear();
            throw th2;
        }
    }

    public void shutdown() {
        this.running = false;
    }
}
