package org.yamcs.tctm;

import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashMap;
import java.util.zip.GZIPInputStream;
import org.yamcs.ConfigurationException;
import org.yamcs.Spec;
import org.yamcs.StandardTupleDefinitions;
import org.yamcs.TmPacket;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsServer;
import org.yamcs.archive.XtceTmRecorder;
import org.yamcs.archive.YarchReplay;
import org.yamcs.tctm.Link;
import org.yamcs.time.Instant;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.utils.YObjectLoader;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;

/* loaded from: input_file:org/yamcs/tctm/FilePollingTmDataLink.class */
public class FilePollingTmDataLink extends AbstractTmDataLink implements Runnable {
    Path incomingDir;
    boolean deleteAfterImport;
    long delayBetweenPackets = -1;
    long headerSize = -1;
    Stream lastPacketStream;
    Thread thread;
    String packetInputStreamClassName;
    YConfiguration packetInputStreamArgs;

    @Override // org.yamcs.tctm.Link
    public Spec getSpec() {
        Spec defaultSpec = getDefaultSpec();
        defaultSpec.addOption("incomingDir", Spec.OptionType.STRING);
        defaultSpec.addOption("deleteAfterImport", Spec.OptionType.BOOLEAN).withDefault(true);
        defaultSpec.addOption("delayBetweenPackets", Spec.OptionType.INTEGER);
        defaultSpec.addOption("lastPacketStream", Spec.OptionType.STRING);
        defaultSpec.addOption("headerSize", Spec.OptionType.INTEGER);
        defaultSpec.addOption("packetInputStreamClassName", Spec.OptionType.STRING);
        defaultSpec.addOption("packetInputStreamArgs", Spec.OptionType.MAP).withSpec(Spec.ANY);
        return defaultSpec;
    }

    @Override // org.yamcs.tctm.AbstractTmDataLink, org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public void init(String str, String str2, YConfiguration yConfiguration) {
        super.init(str, str2, yConfiguration);
        if (yConfiguration.containsKey("incomingDir")) {
            this.incomingDir = Path.of(yConfiguration.getString("incomingDir"), new String[0]);
        } else {
            this.log.warn("Deprecation warning: specify the incomingDir argument on the link " + str2 + ". This will become required in a later version");
            this.incomingDir = YamcsServer.getServer().getIncomingDirectory().resolve(str).resolve(XtceTmRecorder.TABLE_NAME);
        }
        try {
            Files.createDirectories(this.incomingDir, new FileAttribute[0]);
        } catch (IOException e) {
            this.log.warn("Failed to create directory: " + this.incomingDir);
        }
        this.deleteAfterImport = yConfiguration.getBoolean("deleteAfterImport");
        this.delayBetweenPackets = yConfiguration.getLong("delayBetweenPackets", -1L);
        this.headerSize = yConfiguration.getLong("headerSize", -1L);
        this.packetInputStreamArgs = YConfiguration.emptyConfig();
        if (yConfiguration.containsKey("lastPacketStream")) {
            YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(str);
            String string = yConfiguration.getString("lastPacketStream");
            this.lastPacketStream = yarchDatabase.getStream(string);
            if (this.lastPacketStream == null) {
                throw new ConfigurationException("Cannot find stream '" + string + "'");
            }
        }
        if (yConfiguration.containsKey("packetInputStreamClassName")) {
            this.packetInputStreamClassName = yConfiguration.getString("packetInputStreamClassName");
            this.packetInputStreamArgs = yConfiguration.getConfigOrEmpty("packetInputStreamArgs");
            return;
        }
        this.packetInputStreamClassName = GenericPacketInputStream.class.getName();
        HashMap hashMap = new HashMap();
        hashMap.put("maxPacketLength", 1000);
        hashMap.put("lengthFieldOffset", 4);
        hashMap.put("lengthFieldLength", 2);
        hashMap.put("lengthAdjustment", 7);
        hashMap.put("initialBytesToStrip", 0);
        this.packetInputStreamArgs = YConfiguration.wrap(hashMap);
    }

    @Override // java.lang.Runnable
    public void run() {
        File file = this.incomingDir.toFile();
        while (isRunningAndEnabled()) {
            try {
                if (file.exists()) {
                    play(file);
                }
                if (this.delayBetweenPackets < 0) {
                    Thread.sleep(YarchReplay.MAX_WAIT_TIME);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    private void play(File file) throws InterruptedException {
        Instant hresMissionTime = this.timeService.getHresMissionTime();
        File[] listFiles = file.listFiles();
        Arrays.sort(listFiles);
        for (File file2 : listFiles) {
            if (!isRunningAndEnabled()) {
                return;
            }
            if (!file2.isHidden() && file2.isFile()) {
                this.log.info("Injecting the content of {}", file2);
                long j = 0;
                long j2 = TimeEncoding.POSITIVE_INFINITY;
                long j3 = Long.MIN_VALUE;
                TmPacket tmPacket = null;
                try {
                    PacketInputStream packetInputStream = getPacketInputStream(file2.getAbsolutePath());
                    while (true) {
                        try {
                            byte[] readPacket = packetInputStream.readPacket();
                            if (readPacket == null) {
                                break;
                            }
                            updateStats(readPacket.length);
                            TmPacket tmPacket2 = new TmPacket(this.timeService.getMissionTime(), readPacket);
                            tmPacket2.setEarthReceptionTime(hresMissionTime);
                            tmPacket = this.packetPreprocessor.process(tmPacket2);
                            if (tmPacket != null) {
                                j2 = Math.min(j2, tmPacket.getGenerationTime());
                                j3 = Math.max(j3, tmPacket.getGenerationTime());
                                j++;
                                processPacket(tmPacket);
                            }
                            if (this.delayBetweenPackets > 0) {
                                Thread.sleep(this.delayBetweenPackets);
                            }
                        } catch (Throwable th) {
                            if (packetInputStream != null) {
                                try {
                                    packetInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                            break;
                        }
                    }
                    if (packetInputStream != null) {
                        packetInputStream.close();
                    }
                } catch (EOFException e) {
                    this.log.debug("{} finished", file2);
                } catch (IOException | PacketTooLongException e2) {
                    this.log.warn("Exception while reading " + file2, e2);
                }
                if (tmPacket != null && this.lastPacketStream != null) {
                    emitLastPacket(tmPacket);
                }
                this.eventProducer.sendInfo("FILE_INGESTION", String.format("Ingested %s; pkt count: %d, time range: [%s, %s]", file2, Long.valueOf(j), TimeEncoding.toString(j2), TimeEncoding.toString(j3)));
                if (this.deleteAfterImport && !file2.delete()) {
                    this.log.warn("Could not remove {}", file2);
                }
            }
        }
    }

    private void emitLastPacket(TmPacket tmPacket) {
        if (tmPacket.isInvalid()) {
            return;
        }
        Instant earthReceptionTime = tmPacket.getEarthReceptionTime();
        if (earthReceptionTime == Instant.INVALID_INSTANT) {
            earthReceptionTime = null;
        }
        this.lastPacketStream.emitTuple(new Tuple(StandardTupleDefinitions.TM, new Object[]{Long.valueOf(tmPacket.getGenerationTime()), Integer.valueOf(tmPacket.getSeqCount()), Long.valueOf(tmPacket.getReceptionTime()), Integer.valueOf(tmPacket.getStatus()), tmPacket.getPacket(), earthReceptionTime, tmPacket.getObt() == Long.MIN_VALUE ? null : Long.valueOf(tmPacket.getObt()), getName(), tmPacket.getRootContainer() != null ? tmPacket.getRootContainer().getQualifiedName() : null}));
    }

    private PacketInputStream getPacketInputStream(String str) throws IOException {
        boolean z = false;
        FileInputStream fileInputStream = new FileInputStream(str);
        try {
            byte[] bArr = new byte[2];
            if (fileInputStream.read(bArr) == 2 && bArr[0] == 31) {
                if ((bArr[1] & 255) == 139) {
                    z = true;
                }
            }
            fileInputStream.close();
            BufferedInputStream bufferedInputStream = z ? new BufferedInputStream(new GZIPInputStream(new FileInputStream(str))) : new BufferedInputStream(new FileInputStream(str));
            if (this.headerSize > 0) {
                long skip = bufferedInputStream.skip(this.headerSize);
                if (skip != this.headerSize) {
                    bufferedInputStream.close();
                    long j = this.headerSize;
                    IOException iOException = new IOException("Short read: only" + skip + " out of " + iOException + "header bytes could be skipped");
                    throw iOException;
                }
            }
            try {
                PacketInputStream packetInputStream = (PacketInputStream) YObjectLoader.loadObject(this.packetInputStreamClassName, new Object[0]);
                packetInputStream.init(bufferedInputStream, this.packetInputStreamArgs);
                return packetInputStream;
            } catch (ConfigurationException e) {
                this.log.error("Cannot instantiate the packetInput stream", e);
                bufferedInputStream.close();
                throw e;
            }
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public static InputStream getInputStream(String str) throws IOException {
        boolean z = false;
        FileInputStream fileInputStream = new FileInputStream(str);
        try {
            byte[] bArr = new byte[2];
            if (fileInputStream.read(bArr) == 2 && bArr[0] == 31) {
                if ((bArr[1] & 255) == 139) {
                    z = true;
                }
            }
            fileInputStream.close();
            return z ? new BufferedInputStream(new GZIPInputStream(new FileInputStream(str))) : new BufferedInputStream(new FileInputStream(str));
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public String getDetailedStatus() {
        return "Reading files from " + this.incomingDir;
    }

    @Override // org.yamcs.tctm.AbstractLink
    public void doDisable() {
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    @Override // org.yamcs.tctm.AbstractLink
    public void doEnable() {
        this.thread = new Thread(this);
        this.thread.setName(getClass().getSimpleName() + "-" + this.linkName);
        this.thread.start();
    }

    protected void doStart() {
        if (!isDisabled()) {
            doEnable();
        }
        notifyStarted();
    }

    protected void doStop() {
        doDisable();
        if (this.thread != null) {
            this.thread.interrupt();
            try {
                this.thread.join();
            } catch (InterruptedException e) {
                notifyFailed(e);
                return;
            }
        }
        notifyStopped();
    }

    @Override // org.yamcs.tctm.AbstractLink
    protected Link.Status connectionStatus() {
        return Link.Status.OK;
    }
}
