package org.yamcs.parameterarchive;

import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.yamcs.ConfigurationException;
import org.yamcs.Processor;
import org.yamcs.YConfiguration;
import org.yamcs.parameter.ParameterConsumer;
import org.yamcs.parameter.ParameterValue;
import org.yamcs.utils.LoggingUtils;

/* loaded from: input_file:org/yamcs/parameterarchive/RealtimeArchiveFiller.class */
public class RealtimeArchiveFiller extends AbstractExecutionThreadService implements ParameterConsumer {
    int flushInterval;
    final String yamcsInstance;
    Processor realtimeProcessor;
    int subscriptionId;
    protected final ParameterIdDb parameterIdMap;
    protected final ParameterGroupIdDb parameterGroupIdMap;
    final ParameterArchive parameterArchive;
    private final Logger log;
    long threshold;
    int maxSegmentSize;
    ArchiveIntervalFiller first;
    ArchiveIntervalFiller second;
    String processorName = "realtime";
    final BlockingQueue<List<ParameterValue>> queue = new ArrayBlockingQueue(10);
    long numParams = 0;

    public RealtimeArchiveFiller(ParameterArchive parameterArchive, YConfiguration yConfiguration) {
        this.parameterArchive = parameterArchive;
        this.parameterIdMap = parameterArchive.getParameterIdDb();
        this.parameterGroupIdMap = parameterArchive.getParameterGroupIdDb();
        this.yamcsInstance = parameterArchive.getYamcsInstance();
        this.log = LoggingUtils.getLogger(getClass(), this.yamcsInstance);
        if (yConfiguration != null) {
            parseConfig(yConfiguration);
        }
    }

    private void parseConfig(YConfiguration yConfiguration) {
        this.flushInterval = yConfiguration.getInt("flushInterval", 300);
        this.processorName = yConfiguration.getString("processorName", this.processorName);
        this.maxSegmentSize = yConfiguration.getInt("maxSegmentSize", ArchiveFillerTask.DEFAULT_MAX_SEGMENT_SIZE);
        this.threshold = yConfiguration.getInt("orderingThreshold", 20000);
    }

    protected void run() throws Exception {
        while (isRunning()) {
            List<ParameterValue> poll = this.queue.poll(this.flushInterval, TimeUnit.SECONDS);
            if (poll == null || poll.isEmpty()) {
                flush();
            } else {
                if (this.first == null) {
                    this.first = new ArchiveIntervalFiller(this.parameterArchive, this.log, ParameterArchive.getIntervalStart(poll.stream().mapToLong(parameterValue -> {
                        return parameterValue.getGenerationTime();
                    }).min().getAsLong()), this.maxSegmentSize);
                }
                long processParameters = processParameters(poll);
                if (processParameters >= 0 && this.second != null && processParameters > this.second.intervalStart + this.threshold) {
                    this.first.flush();
                    this.first = this.second;
                    this.second = null;
                }
            }
        }
        flush();
    }

    @Override // org.yamcs.parameter.ParameterConsumer
    public void updateItems(int i, List<ParameterValue> list) {
        try {
            this.queue.put(list);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void flush() {
        try {
            if (this.first != null) {
                this.first.flush();
            }
            if (this.second != null) {
                this.second.flush();
            }
        } catch (Exception e) {
            this.log.error("Exception flushing the parameter archive");
        }
    }

    protected void startUp() {
        this.realtimeProcessor = Processor.getInstance(this.yamcsInstance, this.processorName);
        if (this.realtimeProcessor == null) {
            throw new ConfigurationException("No processor named '" + this.processorName + "' in instance " + this.yamcsInstance);
        }
        this.subscriptionId = this.realtimeProcessor.getParameterRequestManager().subscribeAll(this);
    }

    protected void shutDown() {
        this.realtimeProcessor.getParameterRequestManager().unsubscribeAll(this.subscriptionId);
    }

    protected long processParameters(List<ParameterValue> list) throws IOException, RocksDBException {
        HashMap hashMap = new HashMap();
        for (ParameterValue parameterValue : list) {
            long generationTime = parameterValue.getGenerationTime();
            if (generationTime >= this.first.intervalStart) {
                if (parameterValue.getParameterQualifiedNamed() == null) {
                    this.log.warn("No qualified name for parameter value {}, ignoring", parameterValue);
                } else {
                    ((BasicParameterList) hashMap.computeIfAbsent(Long.valueOf(generationTime), l -> {
                        return new BasicParameterList(this.parameterIdMap);
                    })).add(parameterValue);
                }
            }
        }
        long j = -1;
        for (Map.Entry entry : hashMap.entrySet()) {
            long longValue = ((Long) entry.getKey()).longValue();
            BasicParameterList basicParameterList = (BasicParameterList) entry.getValue();
            long intervalStart = ParameterArchive.getIntervalStart(longValue);
            if (intervalStart == this.first.intervalStart) {
                this.first.addParameters(longValue, basicParameterList);
            } else {
                if (this.second == null) {
                    this.second = new ArchiveIntervalFiller(this.parameterArchive, this.log, intervalStart, this.maxSegmentSize);
                }
                this.second.addParameters(longValue, basicParameterList);
            }
            if (longValue > j) {
                j = longValue;
            }
        }
        return j;
    }

    protected void writeToArchive(long j, Collection<PGSegment> collection) {
        try {
            this.parameterArchive.writeToArchive(j, collection);
        } catch (RocksDBException | IOException e) {
            this.log.error("failed to write data to the archive", e);
        }
    }

    public long getNumProcessedParameters() {
        return this.numParams;
    }
}
