package org.yamcs.yarch;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.yamcs.utils.LoggingUtils;
import org.yamcs.yarch.Stream;

/* loaded from: input_file:org/yamcs/yarch/AbstractStream.class */
public abstract class AbstractStream implements Stream {
    protected String name;
    protected TupleDefinition outputDefinition;
    protected Logger log;
    protected YarchDatabaseInstance ydb;
    private Stream.ExceptionHandler handler;
    protected final Collection<StreamSubscriber> subscribers = new ConcurrentLinkedQueue();
    protected volatile int state = 0;
    private volatile AtomicLong emitedTuples = new AtomicLong();
    private volatile AtomicInteger subscriberCount = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractStream(YarchDatabaseInstance yarchDatabaseInstance, String str, TupleDefinition tupleDefinition) {
        this.name = str;
        this.outputDefinition = tupleDefinition;
        this.ydb = yarchDatabaseInstance;
        this.log = LoggingUtils.getLogger(getClass(), yarchDatabaseInstance.getName(), this);
    }

    @Override // org.yamcs.yarch.Stream
    public abstract void start();

    @Override // org.yamcs.yarch.Stream
    public TupleDefinition getDefinition() {
        return this.outputDefinition;
    }

    @Override // org.yamcs.yarch.Stream
    public void emitTuple(Tuple tuple) {
        this.emitedTuples.incrementAndGet();
        for (StreamSubscriber streamSubscriber : this.subscribers) {
            try {
                streamSubscriber.onTuple(this, tuple);
            } catch (Exception e) {
                if (this.handler == null) {
                    this.log.warn("Exception received when emitting tuple to subscriber " + streamSubscriber + ": {}", e);
                    throw e;
                }
                this.handler.handle(tuple, streamSubscriber, e);
            }
        }
    }

    @Override // org.yamcs.yarch.Stream
    public String getName() {
        return this.name;
    }

    @Override // org.yamcs.yarch.Stream
    public void setName(String str) {
        this.name = str;
    }

    @Override // org.yamcs.yarch.Stream
    public void addSubscriber(StreamSubscriber streamSubscriber) {
        this.subscribers.add(streamSubscriber);
        this.subscriberCount.incrementAndGet();
    }

    @Override // org.yamcs.yarch.Stream
    public void removeSubscriber(StreamSubscriber streamSubscriber) {
        this.subscribers.remove(streamSubscriber);
        this.subscriberCount.decrementAndGet();
    }

    @Override // org.yamcs.yarch.Stream
    public ColumnDefinition getColumnDefinition(String str) {
        return this.outputDefinition.getColumn(str);
    }

    @Override // org.yamcs.yarch.Stream
    public final void close() {
        if (this.state == 2) {
            return;
        }
        this.state = 2;
        this.ydb.removeStream(this.name);
        this.log.debug("Closed stream {} num emitted tuples: {}", this.name, Long.valueOf(getNumEmittedTuples()));
        doClose();
        Iterator<StreamSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            it.next().streamClosed(this);
        }
    }

    protected abstract void doClose();

    public String toString() {
        return this.name;
    }

    @Override // org.yamcs.yarch.Stream
    public int getState() {
        return this.state;
    }

    @Override // org.yamcs.yarch.Stream
    public long getNumEmittedTuples() {
        return this.emitedTuples.get();
    }

    @Override // org.yamcs.yarch.Stream
    public int getSubscriberCount() {
        return this.subscriberCount.get();
    }

    @Override // org.yamcs.yarch.Stream
    public Collection<StreamSubscriber> getSubscribers() {
        return Collections.unmodifiableCollection(this.subscribers);
    }

    @Override // org.yamcs.yarch.Stream
    public void exceptionHandler(Stream.ExceptionHandler exceptionHandler) {
        this.handler = exceptionHandler;
    }
}
