/*
 * Decompiled with CFR 0.152.
 */
package step.core.timeseries;

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import step.core.collections.Collection;
import step.core.timeseries.Bucket;
import step.core.timeseries.BucketAttributes;
import step.core.timeseries.BucketBuilder;
import step.core.timeseries.TimeSeries;

public class TimeSeriesIngestionPipeline
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(TimeSeriesIngestionPipeline.class);
    private static final long OFFSET = 10000L;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Collection<Bucket> collection;
    private final ConcurrentHashMap<Long, Map<BucketAttributes, BucketBuilder>> seriesQueue = new ConcurrentHashMap();
    private final ScheduledExecutorService scheduler;
    private final LongAdder flushCount = new LongAdder();
    private final long sourceResolution;

    protected TimeSeriesIngestionPipeline(Collection<Bucket> collection, long resolutionInMs) {
        this.collection = collection;
        this.scheduler = null;
        this.sourceResolution = resolutionInMs;
    }

    protected TimeSeriesIngestionPipeline(Collection<Bucket> collection, long resolutionInMs, long flushingPeriodInMs) {
        this.collection = collection;
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduler.scheduleAtFixedRate(() -> this.flush(false), flushingPeriodInMs, flushingPeriodInMs, TimeUnit.MILLISECONDS);
        this.sourceResolution = resolutionInMs;
    }

    public void ingestPoint(Map<String, String> attributes, long timestamp, long value) {
        this.ingestPoint(new BucketAttributes(attributes), timestamp, value);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ingestPoint(BucketAttributes attributes, long timestamp, long value) {
        if (logger.isTraceEnabled()) {
            logger.trace("Ingesting point. Attributes=" + attributes.toString() + ", Timestamp=" + timestamp + ", Value=" + value);
        }
        this.lock.readLock().lock();
        try {
            long index = TimeSeries.timestampToBucketTimestamp(timestamp, this.sourceResolution);
            Map bucketsForTimestamp = this.seriesQueue.computeIfAbsent(index, k -> new ConcurrentHashMap());
            bucketsForTimestamp.computeIfAbsent(attributes, k -> BucketBuilder.create(index).withAttributes(attributes)).ingest(value);
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public void flush() {
        this.flush(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flush(boolean flushAll) {
        this.lock.writeLock().lock();
        try {
            this.debug("Flushing");
            long now = System.currentTimeMillis();
            this.seriesQueue.forEach((k, v) -> {
                if (flushAll || k < now - 10000L) {
                    this.seriesQueue.remove(k).forEach((attributes, bucketBuilder) -> this.collection.save((Object)bucketBuilder.build()));
                }
            });
            this.flushCount.increment();
            this.debug("Flushed");
        }
        catch (Throwable e) {
            logger.error("Error while flushing", e);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public long getFlushCount() {
        return this.flushCount.longValue();
    }

    @Override
    public void close() {
        if (this.scheduler != null) {
            this.scheduler.shutdown();
        }
        this.flush();
    }

    private void debug(String message) {
        if (logger.isDebugEnabled()) {
            logger.debug(message);
        }
    }
}

