package step.core.timeseries;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.timeseries.accessor.BucketAccessor;

/* loaded from: input_file:step/core/timeseries/TimeSeriesIngestionPipeline.class */
public class TimeSeriesIngestionPipeline implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(TimeSeriesIngestionPipeline.class);
    private final long resolutionInMs;
    private final BucketAccessor bucketAccessor;
    private final Map<Map<String, Object>, Map<Long, BucketBuilder>> series = new ConcurrentHashMap();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final LongAdder flushCount = new LongAdder();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public TimeSeriesIngestionPipeline(BucketAccessor bucketAccessor, long j, long j2) {
        this.bucketAccessor = bucketAccessor;
        this.resolutionInMs = j;
        this.scheduler.scheduleAtFixedRate(this::flush, j2, j2, TimeUnit.MILLISECONDS);
    }

    public void ingestPoint(BucketAttributes bucketAttributes, long j, long j2) {
        trace("Acquiring read lock to ingest point");
        this.lock.readLock().lock();
        try {
            trace("Lock acquired to ingest point");
            Map<Long, BucketBuilder> computeIfAbsent = this.series.computeIfAbsent(bucketAttributes, map -> {
                return new ConcurrentHashMap();
            });
            long j3 = j - (j % this.resolutionInMs);
            computeIfAbsent.computeIfAbsent(Long.valueOf(j3), l -> {
                return BucketBuilder.create(j3).withAttributes(bucketAttributes);
            }).ingest(j2);
            this.lock.readLock().unlock();
            trace("Lock released");
        } catch (Throwable th) {
            this.lock.readLock().unlock();
            throw th;
        }
    }

    public void flush() {
        debug("Trying to acquire write lock");
        this.lock.writeLock().lock();
        try {
            debug("Got write lock");
            this.series.forEach((map, map2) -> {
                map2.forEach((l, bucketBuilder) -> {
                    this.bucketAccessor.save(bucketBuilder.build());
                });
            });
            this.series.clear();
            this.flushCount.increment();
            debug("Flushed");
        } catch (Throwable th) {
            logger.error("Error while flushing", th);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public long getFlushCount() {
        return this.flushCount.longValue();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        flush();
    }

    private void debug(String str) {
        if (logger.isDebugEnabled()) {
            logger.debug(str);
        }
    }

    private void trace(String str) {
        if (logger.isTraceEnabled()) {
            logger.trace(str);
        }
    }
}
