package step.core.timeseries;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.collections.Collection;

/* loaded from: input_file:step/core/timeseries/TimeSeriesIngestionPipeline.class */
public class TimeSeriesIngestionPipeline implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(TimeSeriesIngestionPipeline.class);
    private static final long OFFSET = 10000;
    private final Collection<Bucket> collection;
    private final long sourceResolution;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentHashMap<Long, Map<BucketAttributes, BucketBuilder>> seriesQueue = new ConcurrentHashMap<>();
    private final LongAdder flushCount = new LongAdder();
    private final ScheduledExecutorService scheduler = null;

    /* JADX INFO: Access modifiers changed from: protected */
    public TimeSeriesIngestionPipeline(Collection<Bucket> collection, long j) {
        this.collection = collection;
        this.sourceResolution = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TimeSeriesIngestionPipeline(Collection<Bucket> collection, long j, long j2) {
        this.collection = collection;
        this.scheduler.scheduleAtFixedRate(() -> {
            flush(false);
        }, j2, j2, TimeUnit.MILLISECONDS);
        this.sourceResolution = j;
    }

    public void ingestPoint(Map<String, String> map, long j, long j2) {
        ingestPoint(new BucketAttributes(map), j, j2);
    }

    public void ingestPoint(BucketAttributes bucketAttributes, long j, long j2) {
        if (logger.isTraceEnabled()) {
            Logger logger2 = logger;
            logger2.trace("Ingesting point. Attributes=" + bucketAttributes.toString() + ", Timestamp=" + j + ", Value=" + logger2);
        }
        this.lock.readLock().lock();
        try {
            long timestampToBucketTimestamp = TimeSeries.timestampToBucketTimestamp(j, this.sourceResolution);
            this.seriesQueue.computeIfAbsent(Long.valueOf(timestampToBucketTimestamp), l -> {
                return new ConcurrentHashMap();
            }).computeIfAbsent(bucketAttributes, bucketAttributes2 -> {
                return BucketBuilder.create(timestampToBucketTimestamp).withAttributes(bucketAttributes);
            }).ingest(j2);
            this.lock.readLock().unlock();
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public void flush() {
        flush(true);
    }

    private void flush(boolean z) {
        this.lock.writeLock().lock();
        try {
            try {
                debug("Flushing");
                long currentTimeMillis = System.currentTimeMillis();
                this.seriesQueue.forEach((l, map) -> {
                    if (z || l.longValue() < currentTimeMillis - OFFSET) {
                        this.seriesQueue.remove(l).forEach((bucketAttributes, bucketBuilder) -> {
                            this.collection.save(bucketBuilder.build());
                        });
                    }
                });
                this.flushCount.increment();
                debug("Flushed");
                this.lock.writeLock().unlock();
            } catch (Throwable th) {
                logger.error("Error while flushing", th);
                this.lock.writeLock().unlock();
            }
        } catch (Throwable th2) {
            this.lock.writeLock().unlock();
            throw th2;
        }
    }

    public long getFlushCount() {
        return this.flushCount.longValue();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        flush();
    }

    private void debug(String str) {
        if (logger.isDebugEnabled()) {
            logger.debug(str);
        }
    }
}
