/*
 * Decompiled with CFR 0.152.
 */
package io.polaris.core.data.consumer;

import io.polaris.core.data.buffer.BufferChannel;
import io.polaris.core.data.buffer.IQueueBuffer;
import io.polaris.core.data.consumer.IConsumer;
import io.polaris.core.tuple.Tuple2;
import java.util.ArrayList;
import java.util.List;

public class BulkConsumerThread<T>
extends Thread {
    private volatile boolean running = true;
    private volatile List<Tuple2<BufferChannel<T>, IConsumer<T>>> bulkConsumers = new ArrayList<Tuple2<BufferChannel<T>, IConsumer<T>>>();
    private volatile long size;
    private final long thinkTime;

    public BulkConsumerThread(String name, long thinkTime) {
        super(name);
        this.thinkTime = thinkTime;
    }

    BulkConsumerThread<T> copy() {
        BulkConsumerThread<T> thread = new BulkConsumerThread<T>(this.getName(), this.thinkTime);
        thread.bulkConsumers = this.bulkConsumers;
        thread.size = this.size;
        return thread;
    }

    public void addBulk(BufferChannel<T> channel, IConsumer<T> consumer) {
        Tuple2<BufferChannel<T>, IConsumer<T>> group = new Tuple2<BufferChannel<T>, IConsumer<T>>(channel, consumer);
        ArrayList<Tuple2<BufferChannel<T>, IConsumer<T>>> newList = new ArrayList<Tuple2<BufferChannel<T>, IConsumer<T>>>();
        newList.addAll(this.bulkConsumers);
        newList.add(group);
        this.bulkConsumers = newList;
        this.size += channel.size();
    }

    public long size() {
        return this.size;
    }

    @Override
    public void run() {
        this.running = true;
        ArrayList consumeList = new ArrayList(2000);
        while (this.running) {
            boolean hasData = false;
            for (Tuple2<BufferChannel<T>, IConsumer<T>> target : this.bulkConsumers) {
                boolean consume = this.consume(target, consumeList);
                hasData = hasData || consume;
            }
            if (hasData) continue;
            try {
                Thread.sleep(this.thinkTime);
            }
            catch (InterruptedException interruptedException) {}
        }
        for (Tuple2<BufferChannel<T>, IConsumer<T>> target : this.bulkConsumers) {
            this.consume(target, consumeList);
            target.getSecond().onExit();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean consume(Tuple2<BufferChannel<T>, IConsumer<T>> target, List<T> consumeList) {
        for (int i = 0; i < target.getFirst().getBufferCount(); ++i) {
            IQueueBuffer<T> buffer = target.getFirst().getBuffer(i);
            buffer.drainTo(consumeList);
        }
        if (!consumeList.isEmpty()) {
            try {
                target.getSecond().consume(consumeList);
            }
            catch (Throwable t) {
                target.getSecond().onError(consumeList, t);
            }
            finally {
                consumeList.clear();
            }
            return true;
        }
        return false;
    }

    public void shutdown() {
        this.running = false;
    }
}

