package org.yamcs.cascading;

import java.time.Instant;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.yamcs.TmPacket;
import org.yamcs.YConfiguration;
import org.yamcs.client.archive.ArchiveClient;
import org.yamcs.protobuf.TmPacketData;
import org.yamcs.tctm.AbstractTmDataLink;
import org.yamcs.tctm.AggregatedDataLink;
import org.yamcs.tctm.Link;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.yarch.streamsql.StreamSqlParserConstants;

/* loaded from: input_file:org/yamcs/cascading/YamcsArchiveTmLink.class */
public class YamcsArchiveTmLink extends AbstractTmDataLink {
    YamcsLink parentLink;
    private List<String> containers;
    int retrievalDays;
    int mergeTime;
    int gapFillingInterval;
    Queue<Gap> queue = new ArrayDeque();
    List<Gap> prevGaps;
    CompletableFuture<Void> runningTask;
    private long start;
    private long stop;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/cascading/YamcsArchiveTmLink$Gap.class */
    public static class Gap implements Comparable<Gap> {
        long start;
        long stop;

        public Gap(long j, long j2) {
            this.start = j;
            this.stop = j2;
        }

        @Override // java.lang.Comparable
        public int compareTo(Gap gap) {
            return Long.compare(this.start, gap.start);
        }

        public String toString() {
            return "[" + TimeEncoding.toString(this.start) + " - " + TimeEncoding.toString(this.stop) + "]";
        }
    }

    public YamcsArchiveTmLink(YamcsLink yamcsLink) {
        this.parentLink = yamcsLink;
    }

    @Override // org.yamcs.tctm.AbstractTmDataLink, org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public void init(String str, String str2, YConfiguration yConfiguration) {
        YConfiguration swapConfig = YamcsTmLink.swapConfig(yConfiguration, "tmArchiveStream", "tmStream", "tm_dump");
        super.init(str, str2, swapConfig);
        this.retrievalDays = swapConfig.getInt("retrievalDays", StreamSqlParserConstants.DIGIT);
        this.mergeTime = swapConfig.getInt("mergeTime", 300) * 1000;
        this.mergeTime = 1000;
        this.gapFillingInterval = swapConfig.getInt("gapFillingInterval", 300);
        this.log.debug("Archive retrieval for {} days", Integer.valueOf(this.retrievalDays));
    }

    protected void doStart() {
        if (!isEffectivelyDisabled()) {
            doEnable();
        }
        notifyStarted();
    }

    @Override // org.yamcs.tctm.AbstractLink
    public void doDisable() {
    }

    @Override // org.yamcs.tctm.AbstractLink
    public void doEnable() {
        if (this.containers != null) {
            scheduleDataRetrieval();
        }
    }

    protected void doStop() {
        if (!isDisabled()) {
            doDisable();
        }
        notifyStopped();
    }

    @Override // org.yamcs.tctm.AbstractLink
    protected Link.Status connectionStatus() {
        return this.parentLink.connectionStatus();
    }

    @Override // org.yamcs.tctm.Link
    public AggregatedDataLink getParent() {
        return this.parentLink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleDataRetrieval() {
        this.parentLink.getExecutor().execute(this::retrieveGaps);
    }

    void retrieveGaps() {
        if (connectionStatus() != Link.Status.OK || isEffectivelyDisabled()) {
            return;
        }
        if (this.runningTask == null || this.runningTask.isDone()) {
            if (this.queue.isEmpty()) {
                if (this.runningTask != null) {
                    this.log.debug("Retrieval finished, looking for new gaps");
                    this.runningTask = null;
                }
                if (this.prevGaps == null) {
                    identifyGaps();
                } else {
                    checkRemainingGaps();
                }
            }
            if (this.queue.isEmpty()) {
                return;
            }
            retrieveGap(this.queue.poll());
        }
    }

    private void processPacketData(TmPacketData tmPacketData) {
        TmPacket tmPacket = new TmPacket(this.timeService.getMissionTime(), tmPacketData.getPacket().toByteArray());
        if (tmPacketData.hasGenerationTime()) {
            tmPacket.setGenerationTime(TimeEncoding.fromProtobufTimestamp(tmPacketData.getGenerationTime()));
        }
        tmPacket.setSequenceCount(tmPacketData.getSequenceNumber());
        this.packetCount.incrementAndGet();
        processPacket(tmPacket);
    }

    void identifyGaps() {
        this.start = this.timeService.getMissionTime() - (86400000 * this.retrievalDays);
        this.stop = this.timeService.getMissionTime();
        List<Gap> identifyGaps = new TmGapFinder(this.yamcsInstance, this.parentLink, this.eventProducer, this.retrievalDays, str -> {
            return isPacketRequired(str);
        }).identifyGaps(this.start, this.stop);
        if (identifyGaps.size() == 0) {
            this.log.debug("No gap identified.");
            this.log.debug("Scheduling next gap filling in {} seconds", Integer.valueOf(this.gapFillingInterval));
            this.parentLink.getExecutor().schedule(this::retrieveGaps, this.gapFillingInterval, TimeUnit.SECONDS);
        } else {
            Collections.sort(identifyGaps);
            this.prevGaps = identifyGaps;
            this.log.info("Identified {} gaps for the retrieval", Integer.valueOf(identifyGaps.size()));
            this.queue.addAll(identifyGaps);
        }
    }

    void checkRemainingGaps() {
        for (Gap gap : new TmGapFinder(this.yamcsInstance, this.parentLink, this.eventProducer, this.retrievalDays, str -> {
            return isPacketRequired(str);
        }).identifyGaps(this.start, this.stop)) {
            if (Collections.binarySearch(this.prevGaps, gap) >= 0 && gap.stop < this.stop) {
                this.log.warn("Gap {} still remains after replay", gap);
            }
        }
        this.prevGaps = null;
        this.log.debug("Scheduling next gap filling in {} seconds", Integer.valueOf(this.gapFillingInterval));
        this.parentLink.getExecutor().schedule(this::retrieveGaps, this.gapFillingInterval, TimeUnit.SECONDS);
    }

    void retrieveGap(Gap gap) {
        this.log.debug("Retrieving gap {}", gap);
        this.runningTask = this.parentLink.getClient().createArchiveClient(this.parentLink.getUpstreamInstance()).streamPackets(tmPacketData -> {
            processPacketData(tmPacketData);
        }, Instant.ofEpochMilli(TimeEncoding.toUnixMillisec(gap.start)), Instant.ofEpochMilli(TimeEncoding.toUnixMillisec(gap.stop + 1)), new ArchiveClient.StreamOptions.StreamOption[]{ArchiveClient.StreamOptions.packets((String[]) this.containers.toArray(new String[0]))});
        this.runningTask.whenComplete((r5, th) -> {
            if (th != null) {
                this.log.warn("Error in gap retrieval", th);
            }
            scheduleDataRetrieval();
        });
    }

    private boolean isPacketRequired(String str) {
        return this.containers.contains(str);
    }

    public void setContainers(List<String> list) {
        this.containers = list;
    }
}
