package org.yamcs.yarch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.yamcs.yarch.streamsql.StreamSqlException;

/* loaded from: input_file:org/yamcs/yarch/MergeStream.class */
public class MergeStream extends AbstractStream implements StreamSubscriber, Runnable {
    private Map<AbstractStream, LinkedBlockingQueue<Tuple>> tupleQueues;
    private PriorityQueue<TupleQueuePair> orderedQueue;
    Stream[] streams;
    private Tuple queueEndMark;
    private volatile boolean quitting;
    private final String mergeColumn;
    static AtomicInteger counter = new AtomicInteger();
    private static final Comparator<TupleQueuePair> REVERSE_COMPARATOR = new Comparator<TupleQueuePair>() { // from class: org.yamcs.yarch.MergeStream.1
        @Override // java.util.Comparator
        public int compare(TupleQueuePair tupleQueuePair, TupleQueuePair tupleQueuePair2) {
            return -tupleQueuePair.compareTo(tupleQueuePair2);
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/yamcs/yarch/MergeStream$TupleQueuePair.class */
    public class TupleQueuePair implements Comparable<TupleQueuePair> {
        LinkedBlockingQueue<Tuple> q;
        Tuple t;

        TupleQueuePair(LinkedBlockingQueue<Tuple> linkedBlockingQueue, Tuple tuple) {
            this.q = linkedBlockingQueue;
            this.t = tuple;
        }

        @Override // java.lang.Comparable
        public int compareTo(TupleQueuePair tupleQueuePair) {
            return DataType.compare(this.t.getColumn(MergeStream.this.mergeColumn), tupleQueuePair.t.getColumn(MergeStream.this.mergeColumn));
        }
    }

    public MergeStream(YarchDatabaseInstance yarchDatabaseInstance, AbstractStream[] abstractStreamArr, String str, boolean z) throws StreamSqlException {
        super(yarchDatabaseInstance, getStreamName(abstractStreamArr), abstractStreamArr[0].getDefinition());
        this.queueEndMark = new Tuple(new TupleDefinition(), new ArrayList());
        this.quitting = false;
        this.streams = abstractStreamArr;
        this.mergeColumn = str;
        if (z) {
            this.orderedQueue = new PriorityQueue<>();
        } else {
            this.orderedQueue = new PriorityQueue<>(REVERSE_COMPARATOR);
        }
        HashMap hashMap = new HashMap();
        for (AbstractStream abstractStream : abstractStreamArr) {
            hashMap.put(abstractStream, new LinkedBlockingQueue(50));
        }
        this.tupleQueues = Collections.unmodifiableMap(hashMap);
        for (AbstractStream abstractStream2 : abstractStreamArr) {
            abstractStream2.addSubscriber(this);
        }
    }

    private static String getStreamName(Stream[] streamArr) {
        StringBuilder sb = new StringBuilder();
        sb.append("merge").append(counter.getAndIncrement());
        return sb.toString();
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void onTuple(Stream stream, Tuple tuple) {
        try {
            this.tupleQueues.get(stream).put(tuple);
        } catch (InterruptedException e) {
            this.log.info("got InterruptedException when writing data to the queue");
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.yamcs.yarch.StreamSubscriber
    public void streamClosed(Stream stream) {
        if (this.state == 2) {
            return;
        }
        this.log.debug("Got stream closed for {}", stream);
        try {
            this.tupleQueues.get(stream).put(this.queueEndMark);
        } catch (InterruptedException e) {
            this.log.info("got InterruptedException when writing the end mark to the queue");
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.yamcs.yarch.AbstractStream, org.yamcs.yarch.Stream
    public void start() {
        this.log.info("starting the merge stream");
        for (Stream stream : this.streams) {
            stream.start();
        }
        new Thread(this).start();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            this.log.debug("waiting for at least one tuple in each queue");
            for (LinkedBlockingQueue<Tuple> linkedBlockingQueue : this.tupleQueues.values()) {
                Tuple take = linkedBlockingQueue.take();
                if (take != this.queueEndMark) {
                    this.orderedQueue.add(new TupleQueuePair(linkedBlockingQueue, take));
                }
            }
            this.log.debug("got one tuple from each stream, starting the business");
            while (this.orderedQueue.size() > 0) {
                TupleQueuePair poll = this.orderedQueue.poll();
                if (this.state == 2) {
                    break;
                }
                emitTuple(poll.t);
                Tuple take2 = poll.q.take();
                if (take2 != this.queueEndMark) {
                    if (!take2.hasColumn(this.mergeColumn)) {
                        this.log.warn("Ignoring tuple because it does not have column {}", this.mergeColumn);
                    }
                    this.orderedQueue.add(new TupleQueuePair(poll.q, take2));
                }
            }
            close();
        } catch (InterruptedException e) {
            this.log.info("Got interrupted exception, quitting");
        }
    }

    @Override // org.yamcs.yarch.AbstractStream
    protected void doClose() {
        this.quitting = true;
        for (Stream stream : this.streams) {
            stream.close();
        }
    }
}
