package org.yamcs.cascading;

import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.yamcs.ConfigurationException;
import org.yamcs.YConfiguration;
import org.yamcs.archive.EventRecorder;
import org.yamcs.client.ClientException;
import org.yamcs.client.EventSubscription;
import org.yamcs.client.MessageListener;
import org.yamcs.protobuf.Event;
import org.yamcs.protobuf.SubscribeEventsRequest;
import org.yamcs.tctm.AbstractLink;
import org.yamcs.tctm.AggregatedDataLink;
import org.yamcs.tctm.Link;
import org.yamcs.utils.TimeEncoding;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.TupleDefinition;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.protobuf.Db;

/* loaded from: input_file:org/yamcs/cascading/YamcsEventLink.class */
public class YamcsEventLink extends AbstractLink {
    YamcsLink parentLink;
    protected AtomicLong dataCount = new AtomicLong(0);
    EventSubscription subscription;
    Stream eventStream;

    public YamcsEventLink(YamcsLink yamcsLink) {
        this.parentLink = yamcsLink;
    }

    @Override // org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public void init(String str, String str2, YConfiguration yConfiguration) {
        super.init(str, str2, yConfiguration);
        String string = yConfiguration.getString("eventRealtimeStream", EventRecorder.REALTIME_EVENT_STREAM_NAME);
        this.eventStream = YarchDatabase.getInstance(str).getStream(string);
        if (this.eventStream == null) {
            throw new ConfigurationException("Cannto find stream " + string);
        }
    }

    static YConfiguration swapConfig(YConfiguration yConfiguration, String str, String str2, String str3) {
        HashMap hashMap = new HashMap(yConfiguration.getRoot());
        String str4 = (String) hashMap.remove(str);
        if (str4 == null) {
            str4 = str3;
        }
        hashMap.put(str2, str4);
        return YConfiguration.wrap(hashMap);
    }

    protected void doStart() {
        if (!isEffectivelyDisabled()) {
            doEnable();
        }
        notifyStarted();
    }

    @Override // org.yamcs.tctm.AbstractLink
    public void doDisable() {
        if (this.subscription != null) {
            this.subscription.cancel(true);
            this.subscription = null;
        }
    }

    @Override // org.yamcs.tctm.AbstractLink
    public void doEnable() {
        if ((this.subscription == null || this.subscription.isDone()) && this.parentLink.getClient().getWebSocketClient().isConnected()) {
            subscribeEvents();
        }
    }

    private void subscribeEvents() {
        this.subscription = this.parentLink.getClient().createEventSubscription();
        this.subscription.addMessageListener(new MessageListener<Event>() { // from class: org.yamcs.cascading.YamcsEventLink.1
            public void onMessage(Event event) {
                YamcsEventLink.this.processEvent(event);
            }

            public void onError(Throwable th) {
                if (th instanceof ClientException) {
                    YamcsEventLink.this.eventProducer.sendWarning("Got error when subscribing to containers: " + th.getMessage());
                } else {
                    YamcsEventLink.this.log.warn("Got error when subscribing to containers: " + th.getMessage());
                }
            }
        });
        this.subscription.sendMessage(SubscribeEventsRequest.newBuilder().setInstance(this.parentLink.getUpstreamInstance()).build());
    }

    private void processEvent(Event event) {
        long missionTime = this.timeService.getMissionTime();
        long fromProtobufTimestamp = event.hasGenerationTime() ? TimeEncoding.fromProtobufTimestamp(event.getGenerationTime()) : missionTime;
        String source = event.hasSource() ? event.getSource() : this.parentLink.getName();
        int seqNumber = event.hasSeqNumber() ? event.getSeqNumber() : 0;
        TupleDefinition definition = this.eventStream.getDefinition();
        Db.Event.Builder seqNumber2 = Db.Event.newBuilder().setReceptionTime(missionTime).setGenerationTime(fromProtobufTimestamp).setSource(source).setSeqNumber(seqNumber);
        if (event.hasCreatedBy()) {
            seqNumber2.setCreatedBy(event.getCreatedBy());
        }
        if (event.hasSeverity()) {
            seqNumber2.setSeverity(event.getSeverity());
        }
        if (event.hasType()) {
            seqNumber2.setType(event.getType());
        }
        if (event.hasMessage()) {
            seqNumber2.setMessage(event.getMessage());
        }
        Tuple tuple = new Tuple(definition, new Object[]{Long.valueOf(fromProtobufTimestamp), source, Integer.valueOf(seqNumber), seqNumber2.m1349build()});
        this.dataCount.incrementAndGet();
        this.eventStream.emitTuple(tuple);
    }

    protected void doStop() {
        if (!isDisabled()) {
            doDisable();
        }
        notifyStopped();
    }

    @Override // org.yamcs.tctm.AbstractLink
    protected Link.Status connectionStatus() {
        Link.Status connectionStatus = this.parentLink.connectionStatus();
        if (connectionStatus == Link.Status.OK) {
            return this.subscription != null && !this.subscription.isDone() ? Link.Status.OK : Link.Status.UNAVAIL;
        }
        return connectionStatus;
    }

    @Override // org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public AggregatedDataLink getParent() {
        return this.parentLink;
    }

    @Override // org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public long getDataInCount() {
        return this.dataCount.get();
    }

    @Override // org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public long getDataOutCount() {
        return 0L;
    }

    @Override // org.yamcs.tctm.AbstractLink, org.yamcs.tctm.Link
    public void resetCounters() {
        this.dataCount.set(0L);
    }
}
