package org.yamcs.archive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.yamcs.AbstractYamcsService;
import org.yamcs.ConfigurationException;
import org.yamcs.ContainerExtractionResult;
import org.yamcs.InitException;
import org.yamcs.Spec;
import org.yamcs.StandardTupleDefinitions;
import org.yamcs.StreamConfig;
import org.yamcs.YConfiguration;
import org.yamcs.YamcsServer;
import org.yamcs.mdb.ContainerProcessingResult;
import org.yamcs.mdb.XtceDbFactory;
import org.yamcs.mdb.XtceTmExtractor;
import org.yamcs.time.TimeService;
import org.yamcs.utils.parser.ParseException;
import org.yamcs.xtce.SequenceContainer;
import org.yamcs.xtce.XtceDb;
import org.yamcs.yarch.DataType;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.YarchDatabaseInstance;
import org.yamcs.yarch.streamsql.StreamSqlException;

/* loaded from: input_file:org/yamcs/archive/XtceTmRecorder.class */
public class XtceTmRecorder extends AbstractYamcsService {
    public static final String REC_STREAM_NAME = "xtce_tm_recorder_stream";
    public static final String TABLE_NAME = "tm";
    public static final String PNAME_COLUMN = "pname";
    public static final TupleDefinition RECORDED_TM_TUPLE_DEFINITION = StandardTupleDefinitions.TM.copy();
    private long totalNumPackets;
    XtceDb xtceDb;
    TimeService timeService;
    final Tuple END_MARK = new Tuple(StandardTupleDefinitions.TM, new Object[]{null, null, null, null, null, null, null, null});
    private final List<StreamRecorder> recorders = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/archive/XtceTmRecorder$StreamRecorder.class */
    public class StreamRecorder implements StreamSubscriber, Runnable {
        SequenceContainer rootSequenceContainer;
        boolean async;
        Stream inputStream;
        Stream outputStream;
        LinkedBlockingQueue<Tuple> tmQueue;
        XtceTmExtractor tmExtractor;

        StreamRecorder(Stream stream, Stream stream2, SequenceContainer sequenceContainer, boolean z) {
            this.outputStream = stream2;
            this.inputStream = stream;
            this.rootSequenceContainer = sequenceContainer;
            this.async = z;
            if (z) {
                this.tmQueue = new LinkedBlockingQueue<>(100000);
            }
            this.tmExtractor = new XtceTmExtractor(XtceTmRecorder.this.xtceDb);
            this.tmExtractor.getOptions().setSubcontainerPartOfResult(false);
            subscribeContainers(this.rootSequenceContainer);
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(getClass().getSimpleName() + "[" + XtceTmRecorder.this.yamcsInstance + "]");
            while (true) {
                try {
                    Tuple take = this.tmQueue.take();
                    if (take == XtceTmRecorder.this.END_MARK) {
                        return;
                    } else {
                        saveTuple(take);
                    }
                } catch (InterruptedException e) {
                    XtceTmRecorder.this.log.warn("Got InteruptedException when waiting for the next tuple ", e);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }

        private void subscribeContainers(SequenceContainer sequenceContainer) {
            if (sequenceContainer == null) {
                return;
            }
            if (sequenceContainer.useAsArchivePartition()) {
                this.tmExtractor.startProviding(sequenceContainer);
            }
            if (XtceTmRecorder.this.xtceDb.getInheritingContainers(sequenceContainer) != null) {
                Iterator it = XtceTmRecorder.this.xtceDb.getInheritingContainers(sequenceContainer).iterator();
                while (it.hasNext()) {
                    subscribeContainers((SequenceContainer) it.next());
                }
            }
        }

        @Override // org.yamcs.yarch.StreamSubscriber
        public void onTuple(Stream stream, Tuple tuple) {
            if ((((Integer) tuple.getColumn(3)).intValue() & 4) != 0) {
                XtceTmRecorder.this.log.trace("Dropping tm tuple {} because the no archive flag is set", tuple);
                return;
            }
            try {
                if (this.async) {
                    this.tmQueue.put(tuple);
                } else {
                    synchronized (this) {
                        saveTuple(tuple);
                    }
                }
            } catch (InterruptedException e) {
                XtceTmRecorder.this.log.warn("Got interrupted exception while putting data in the queue");
                Thread.currentThread().interrupt();
            }
        }

        @Override // org.yamcs.yarch.StreamSubscriber
        public void streamClosed(Stream stream) {
            XtceTmRecorder.this.log.error("stream {} closed", stream);
        }

        public void quit() {
            if (this.async) {
                try {
                    this.tmQueue.put(XtceTmRecorder.this.END_MARK);
                } catch (InterruptedException e) {
                    XtceTmRecorder.this.log.warn("got interrupted while putting the empty buffer in the queue");
                    Thread.currentThread().interrupt();
                }
            }
        }

        protected void saveTuple(Tuple tuple) {
            long longValue = ((Long) tuple.getColumn(0)).longValue();
            byte[] bArr = (byte[]) tuple.getColumn(4);
            int intValue = ((Integer) tuple.getColumn(1)).intValue();
            XtceTmRecorder.this.totalNumPackets++;
            String deriveArchivePartition = XtceTmRecorder.deriveArchivePartition(this.tmExtractor.processPacket(bArr, longValue, XtceTmRecorder.this.timeService.getMissionTime(), intValue, this.rootSequenceContainer));
            try {
                List<Object> columns = tuple.getColumns();
                ArrayList arrayList = new ArrayList(columns.size() + 1);
                arrayList.addAll(columns);
                arrayList.add(columns.size(), deriveArchivePartition);
                TupleDefinition copy = tuple.getDefinition().copy();
                copy.addColumn(XtceTmRecorder.PNAME_COLUMN, DataType.ENUM);
                this.outputStream.emitTuple(new Tuple(copy, arrayList));
            } catch (Exception e) {
                XtceTmRecorder.this.log.error("got exception when saving packet ", e);
            }
        }
    }

    @Override // org.yamcs.YamcsService
    public Spec getSpec() {
        Spec spec = new Spec();
        spec.addOption("streams", Spec.OptionType.LIST).withElementType(Spec.OptionType.STRING);
        return spec;
    }

    @Override // org.yamcs.AbstractYamcsService, org.yamcs.YamcsService
    public void init(String str, String str2, YConfiguration yConfiguration) throws InitException {
        super.init(str, str2, yConfiguration);
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(str);
        try {
            if (yarchDatabase.getTable(TABLE_NAME) == null) {
                yarchDatabase.execute("create table tm(" + RECORDED_TM_TUPLE_DEFINITION.getStringDefinition1() + ", primary key(gentime, seqNum)) histogram(pname) partition by value(pname) table_format=compressed", new Object[0]);
            }
            yarchDatabase.execute("create stream xtce_tm_recorder_stream" + RECORDED_TM_TUPLE_DEFINITION.getStringDefinition(), new Object[0]);
            yarchDatabase.execute("insert into tm select * from xtce_tm_recorder_stream", new Object[0]);
            this.xtceDb = XtceDbFactory.getInstance(str);
            StreamConfig streamConfig = StreamConfig.getInstance(str);
            if (yConfiguration.containsKey("streams")) {
                for (String str3 : yConfiguration.getList("streams")) {
                    StreamConfig.TmStreamConfigEntry tmEntry = streamConfig.getTmEntry(str3);
                    if (tmEntry == null) {
                        throw new ConfigurationException("No stream config found for '" + str3 + "'");
                    }
                    createRecorder(tmEntry);
                }
            } else {
                Iterator<StreamConfig.TmStreamConfigEntry> it = streamConfig.getTmEntries().iterator();
                while (it.hasNext()) {
                    createRecorder(it.next());
                }
            }
            this.timeService = YamcsServer.getTimeService(str);
        } catch (ParseException | StreamSqlException e) {
            throw new InitException(e);
        }
    }

    private void createRecorder(StreamConfig.TmStreamConfigEntry tmStreamConfigEntry) {
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(this.yamcsInstance);
        SequenceContainer rootContainer = tmStreamConfigEntry.getRootContainer();
        if (rootContainer == null) {
            rootContainer = this.xtceDb.getRootSequenceContainer();
        }
        if (rootContainer == null) {
            throw new ConfigurationException("XtceDb does not have a root sequence container and no container was specified for decoding packets from " + tmStreamConfigEntry.getName() + " stream");
        }
        Stream stream = yarchDatabase.getStream(tmStreamConfigEntry.getName());
        if (stream == null) {
            throw new ConfigurationException("Cannot find stream '" + tmStreamConfigEntry.getName() + "'");
        }
        this.recorders.add(new StreamRecorder(stream, yarchDatabase.getStream(REC_STREAM_NAME), rootContainer, tmStreamConfigEntry.isAsync()));
    }

    protected void doStart() {
        for (StreamRecorder streamRecorder : this.recorders) {
            streamRecorder.inputStream.addSubscriber(streamRecorder);
            if (streamRecorder.async) {
                new Thread(streamRecorder).start();
            }
        }
        notifyStarted();
    }

    protected void doStop() {
        for (StreamRecorder streamRecorder : this.recorders) {
            streamRecorder.quit();
            streamRecorder.inputStream.removeSubscriber(streamRecorder);
        }
        YarchDatabaseInstance yarchDatabase = YarchDatabase.getInstance(this.yamcsInstance);
        yarchDatabase.getStream(REC_STREAM_NAME).close();
        Utils.closeTableWriters(yarchDatabase, Arrays.asList(REC_STREAM_NAME));
        notifyStopped();
    }

    public long getNumProcessedPackets() {
        return this.totalNumPackets;
    }

    public static String deriveArchivePartition(ContainerProcessingResult containerProcessingResult) {
        List<ContainerExtractionResult> containerResult = containerProcessingResult.getContainerResult();
        ContainerExtractionResult containerExtractionResult = containerResult.get(0);
        String str = null;
        int size = containerResult.size() - 1;
        while (true) {
            if (size < 0) {
                break;
            }
            ContainerExtractionResult containerExtractionResult2 = containerResult.get(size);
            if (containerExtractionResult2.isDerivedFromRoot()) {
                SequenceContainer container = containerExtractionResult2.getContainer();
                if (container.useAsArchivePartition()) {
                    str = container.getQualifiedName();
                    break;
                }
            }
            size--;
        }
        if (str == null) {
            str = containerExtractionResult.getContainer().getQualifiedName();
        }
        return str;
    }

    static {
        RECORDED_TM_TUPLE_DEFINITION.addColumn(PNAME_COLUMN, DataType.ENUM);
    }
}
