package io.polaris.core.data;

import io.polaris.core.assertion.Arguments;
import io.polaris.core.concurrent.PooledThreadFactory;
import io.polaris.core.concurrent.Schedules;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/* loaded from: input_file:io/polaris/core/data/BatchDataCollector.class */
public class BatchDataCollector<E> {
    private final AtomicLong lastTime;
    private final ArrayBlockingQueue<E> buffer;
    private final int maxStoreSize;
    private final long maxStoreNanos;
    private final Consumer<List<E>> consumer;
    private final boolean withShutdownHook;
    private volatile boolean running;
    private volatile ScheduledExecutorService scheduler;

    public BatchDataCollector(int i, long j, TimeUnit timeUnit, Consumer<List<E>> consumer) {
        this(i, timeUnit.toNanos(j), (Consumer) consumer, true);
    }

    public BatchDataCollector(int i, long j, TimeUnit timeUnit, Consumer<List<E>> consumer, boolean z) {
        this(i, timeUnit.toNanos(j), consumer, z);
    }

    public BatchDataCollector(int i, long j, Consumer<List<E>> consumer, boolean z) {
        this.running = true;
        Arguments.isTrue(i > 0, "maxStoreSize must be greater than 0");
        Arguments.isTrue(j > 0, "maxStoreNanos must be greater than 0");
        this.maxStoreNanos = j;
        this.maxStoreSize = i;
        this.consumer = consumer;
        this.withShutdownHook = z;
        this.buffer = new ArrayBlockingQueue<>(i);
        this.lastTime = new AtomicLong(System.nanoTime());
        if (z) {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    this.running = false;
                    flush();
                } catch (Exception e) {
                }
            }));
        }
    }

    public int getMaxStoreSize() {
        return this.maxStoreSize;
    }

    public long getMaxStoreNanos() {
        return this.maxStoreNanos;
    }

    public Consumer<List<E>> getConsumer() {
        return this.consumer;
    }

    public boolean startScheduler() {
        if (this.consumer == null || this.scheduler != null) {
            return false;
        }
        synchronized (this) {
            if (this.scheduler != null) {
                return false;
            }
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, new PooledThreadFactory(BatchDataCollector.class.getSimpleName()));
            this.scheduler = newScheduledThreadPool;
            newScheduledThreadPool.scheduleAtFixedRate(this::tryFlush, this.maxStoreNanos, this.maxStoreNanos, TimeUnit.NANOSECONDS);
            if (this.withShutdownHook) {
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    try {
                        this.scheduler = null;
                        Schedules.shutdown(newScheduledThreadPool);
                    } catch (Exception e) {
                    }
                }));
            }
            return true;
        }
    }

    public boolean stopScheduler() {
        if (this.scheduler == null) {
            return false;
        }
        synchronized (this) {
            if (this.scheduler == null) {
                return false;
            }
            ScheduledExecutorService scheduledExecutorService = this.scheduler;
            this.scheduler = null;
            Schedules.shutdown(scheduledExecutorService);
            return true;
        }
    }

    public void collect(E e) {
        collect((Iterable) Collections.singletonList(e), (Consumer) this.consumer);
    }

    public void collect(E e, Consumer<List<E>> consumer) {
        collect((Iterable) Collections.singletonList(e), (Consumer) consumer);
    }

    public void collect(Iterable<E> iterable) {
        collect((Iterable) iterable, (Consumer) this.consumer);
    }

    public void collect(Iterable<E> iterable, Consumer<List<E>> consumer) {
        if (consumer == null) {
            throw new IllegalStateException("数据处理器不能为空");
        }
        for (E e : iterable) {
            while (!this.buffer.offer(e)) {
                flush(consumer);
            }
        }
        if (!this.running) {
            flush(consumer);
            return;
        }
        if (System.nanoTime() - this.lastTime.get() > this.maxStoreNanos) {
            flush(consumer);
        }
    }

    public void tryFlush() {
        tryFlush(this.consumer);
    }

    public boolean tryFlush(Consumer<List<E>> consumer) {
        if (this.buffer.size() >= this.maxStoreSize) {
            flush(consumer);
            return true;
        }
        if (!(System.nanoTime() - this.lastTime.get() > this.maxStoreNanos)) {
            return false;
        }
        flush(consumer);
        return true;
    }

    public void flush() {
        flush(this.consumer);
    }

    public void flush(Consumer<List<E>> consumer) {
        if (consumer == null) {
            throw new IllegalStateException("数据消费器不能为空");
        }
        this.lastTime.set(System.nanoTime());
        ArrayList arrayList = new ArrayList(this.buffer.size());
        this.buffer.drainTo(arrayList);
        if (arrayList.isEmpty()) {
            return;
        }
        consumer.accept(arrayList);
    }
}
