package io.polaris.core.data;

import io.polaris.core.assertion.Arguments;
import io.polaris.core.concurrent.PooledThreadFactory;
import io.polaris.core.concurrent.Schedules;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/* loaded from: input_file:io/polaris/core/data/MultiBatchDataCollector.class */
public class MultiBatchDataCollector<K, E> {
    private final Map<K, BatchDataCollector<E>> collectors;
    private final int maxStoreSize;
    private final long maxStoreNanos;
    private final boolean withShutdownHook;
    private volatile ScheduledExecutorService scheduler;

    public MultiBatchDataCollector(int i, long j, TimeUnit timeUnit) {
        this(i, timeUnit.toNanos(j), true);
    }

    public MultiBatchDataCollector(int i, long j, TimeUnit timeUnit, boolean z) {
        this(i, timeUnit.toNanos(j), z);
    }

    public MultiBatchDataCollector(int i, long j, boolean z) {
        this.collectors = new ConcurrentHashMap();
        Arguments.isTrue(i > 0, "maxStoreSize must be greater than 0");
        Arguments.isTrue(j > 0, "maxStoreNanos must be greater than 0");
        this.maxStoreSize = i;
        this.maxStoreNanos = j;
        this.withShutdownHook = z;
    }

    public long getMaxStoreNanos() {
        return this.maxStoreNanos;
    }

    public int getMaxStoreSize() {
        return this.maxStoreSize;
    }

    public void startSchedulerSeverally() {
        this.collectors.forEach((obj, batchDataCollector) -> {
            batchDataCollector.startScheduler();
        });
    }

    public void stopSchedulerSeverally() {
        this.collectors.forEach((obj, batchDataCollector) -> {
            batchDataCollector.stopScheduler();
        });
    }

    public boolean startScheduler() {
        if (this.scheduler != null) {
            return false;
        }
        synchronized (this) {
            if (this.scheduler != null) {
                return false;
            }
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(this.collectors.isEmpty() ? Runtime.getRuntime().availableProcessors() : Integer.max(this.collectors.size(), 4), new PooledThreadFactory(MultiBatchDataCollector.class.getSimpleName()));
            this.scheduler = newScheduledThreadPool;
            newScheduledThreadPool.scheduleAtFixedRate(() -> {
                Iterator<Map.Entry<K, BatchDataCollector<E>>> it = this.collectors.entrySet().iterator();
                while (it.hasNext()) {
                    BatchDataCollector<E> value = it.next().getValue();
                    if (value.getConsumer() != null) {
                        value.tryFlush();
                    }
                }
            }, this.maxStoreNanos, this.maxStoreNanos, TimeUnit.NANOSECONDS);
            if (this.withShutdownHook) {
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    try {
                        this.scheduler = null;
                        Schedules.shutdown(newScheduledThreadPool);
                    } catch (Exception e) {
                    }
                }));
            }
            return true;
        }
    }

    public boolean stopScheduler() {
        if (this.scheduler == null) {
            return false;
        }
        synchronized (this) {
            if (this.scheduler == null) {
                return false;
            }
            ScheduledExecutorService scheduledExecutorService = this.scheduler;
            this.scheduler = null;
            Schedules.shutdown(scheduledExecutorService);
            return true;
        }
    }

    private BatchDataCollector<E> getCollector(K k, Consumer<List<E>> consumer) {
        return this.collectors.computeIfAbsent(k, obj -> {
            return new BatchDataCollector(this.maxStoreSize, this.maxStoreNanos, consumer, this.withShutdownHook);
        });
    }

    private BatchDataCollector<E> getCollector(K k) {
        return (BatchDataCollector) Objects.requireNonNull(this.collectors.get(k));
    }

    public boolean register(K k, Consumer<List<E>> consumer) {
        return register((MultiBatchDataCollector<K, E>) k, new BatchDataCollector<>(this.maxStoreSize, this.maxStoreNanos, consumer, this.withShutdownHook));
    }

    public boolean register(K k, BatchDataCollector<E> batchDataCollector) {
        return this.collectors.putIfAbsent(k, batchDataCollector) == null;
    }

    public void collect(K k, E e) {
        collect((MultiBatchDataCollector<K, E>) k, (Iterable) Collections.singletonList(e));
    }

    public void collect(K k, Iterable<E> iterable) {
        getCollector(k).collect((Iterable) iterable);
    }

    public void tryFlush(K k) {
        getCollector(k).tryFlush();
    }

    public void flush(K k) {
        getCollector(k).flush();
    }

    public void collect(K k, E e, Consumer<List<E>> consumer) {
        collect((MultiBatchDataCollector<K, E>) k, (Iterable) Collections.singletonList(e), (Consumer) consumer);
    }

    public void collect(K k, Iterable<E> iterable, Consumer<List<E>> consumer) {
        getCollector(k, consumer).collect((Iterable) iterable, (Consumer) consumer);
    }

    public void tryFlush(K k, Consumer<List<E>> consumer) {
        getCollector(k, consumer).tryFlush(consumer);
    }

    public void flush(K k, Consumer<List<E>> consumer) {
        getCollector(k, consumer).flush(consumer);
    }
}
