package org.yamcs.http.api;

import org.yamcs.StandardTupleDefinitions;
import org.yamcs.api.Observer;
import org.yamcs.archive.EventRecorder;
import org.yamcs.protobuf.Event;
import org.yamcs.protobuf.SubscribeEventsRequest;
import org.yamcs.yarch.Stream;
import org.yamcs.yarch.StreamSubscriber;
import org.yamcs.yarch.Tuple;
import org.yamcs.yarch.YarchDatabase;
import org.yamcs.yarch.protobuf.Db;

/* loaded from: input_file:org/yamcs/http/api/SubscribeEventsObserver.class */
public class SubscribeEventsObserver implements Observer<SubscribeEventsRequest>, StreamSubscriber {
    private Observer<Event> responseObserver;
    private Stream stream;
    private EventFilter filter;

    public SubscribeEventsObserver(Observer<Event> observer) {
        this.responseObserver = observer;
    }

    public void next(SubscribeEventsRequest subscribeEventsRequest) {
        if (this.stream != null) {
            this.stream.removeSubscriber(this);
            this.stream = null;
        }
        this.filter = subscribeEventsRequest.hasFilter() ? EventFilterFactory.create(subscribeEventsRequest.getFilter()) : null;
        this.stream = YarchDatabase.getInstance(InstancesApi.verifyInstance(subscribeEventsRequest.getInstance())).getStream(EventRecorder.REALTIME_EVENT_STREAM_NAME);
        if (this.stream == null) {
            return;
        }
        this.stream.addSubscriber(this);
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void onTuple(Stream stream, Tuple tuple) {
        if (this.filter == null || this.filter.matches(tuple)) {
            this.responseObserver.next(EventsApi.fromDbEvent((Db.Event) tuple.getColumn(StandardTupleDefinitions.BODY_COLUMN)));
        }
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void streamClosed(Stream stream) {
    }

    public void completeExceptionally(Throwable th) {
        if (this.stream != null) {
            this.stream.removeSubscriber(this);
        }
    }

    public void complete() {
        if (this.stream != null) {
            this.stream.removeSubscriber(this);
        }
    }
}
