package io.deephaven.server.browserstreaming;

import com.google.rpc.Code;
import io.deephaven.base.RAPriQueue;
import io.deephaven.base.verify.Assert;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.util.GrpcServiceOverrideBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;

/* loaded from: input_file:io/deephaven/server/browserstreaming/BrowserStream.class */
public class BrowserStream<T> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(BrowserStream.class);
    private final Mode mode;
    private final SessionState session;
    private final Marshaller<T> marshaller;
    private RAPriQueue<Message<T>> pendingSeq;
    private StreamData queuedStreamData;
    private T queuedMessage;
    private long nextSeq = 0;
    private boolean processingMessage = false;
    private long halfClosedSeq = -1;
    private final String logIdentity = "BrowserStream(" + Integer.toHexString(System.identityHashCode(this)) + "): ";

    /* loaded from: input_file:io/deephaven/server/browserstreaming/BrowserStream$Factory.class */
    public interface Factory<ReqT, RespT> {
        BrowserStream<ReqT> create(SessionState sessionState, StreamObserver<RespT> streamObserver);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/server/browserstreaming/BrowserStream$Marshaller.class */
    public interface Marshaller<T> {
        void onMessageReceived(T t);

        void onCancel();

        void onError(Throwable th);

        void onCompleted();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/server/browserstreaming/BrowserStream$Message.class */
    public static class Message<T> {
        private int pos;
        private final T message;
        private final StreamData streamData;

        public Message(T t, StreamData streamData) {
            this.message = t;
            this.streamData = streamData;
        }

        public T getMessage() {
            return this.message;
        }

        public StreamData getStreamData() {
            return this.streamData;
        }
    }

    /* loaded from: input_file:io/deephaven/server/browserstreaming/BrowserStream$MessageInfoQueueAdapter.class */
    private static class MessageInfoQueueAdapter implements RAPriQueue.Adapter<Message<?>> {
        private static final MessageInfoQueueAdapter INSTANCE = new MessageInfoQueueAdapter();

        private MessageInfoQueueAdapter() {
        }

        private static <T extends Message<?>> RAPriQueue.Adapter<T> getInstance() {
            return INSTANCE;
        }

        public boolean less(Message<?> message, Message<?> message2) {
            return message.getStreamData().getSequence() < message2.getStreamData().getSequence();
        }

        public void setPos(Message<?> message, int i) {
            ((Message) message).pos = i;
        }

        public int getPos(Message<?> message) {
            return ((Message) message).pos;
        }
    }

    /* loaded from: input_file:io/deephaven/server/browserstreaming/BrowserStream$Mode.class */
    public enum Mode {
        IN_ORDER,
        MOST_RECENT
    }

    public static <ReqT, RespT> Factory<ReqT, RespT> factory(Mode mode, GrpcServiceOverrideBuilder.BidiDelegate<ReqT, RespT> bidiDelegate) {
        return (sessionState, streamObserver) -> {
            return new BrowserStream(mode, sessionState, new Marshaller<ReqT>() { // from class: io.deephaven.server.browserstreaming.BrowserStream.1
                private final StreamObserver requestObserver;

                {
                    this.requestObserver = GrpcServiceOverrideBuilder.BidiDelegate.this.doInvoke(streamObserver);
                }

                @Override // io.deephaven.server.browserstreaming.BrowserStream.Marshaller
                public void onMessageReceived(ReqT reqt) {
                    this.requestObserver.onNext(reqt);
                }

                @Override // io.deephaven.server.browserstreaming.BrowserStream.Marshaller
                public void onCancel() {
                    StatusRuntimeException statusRuntimeException = Exceptions.statusRuntimeException(Code.CANCELLED, "Stream canceled on the server");
                    GrpcUtil.safelyError(streamObserver, statusRuntimeException);
                    GrpcUtil.safelyError(this.requestObserver, statusRuntimeException);
                }

                @Override // io.deephaven.server.browserstreaming.BrowserStream.Marshaller
                public void onError(Throwable th) {
                    this.requestObserver.onError(th);
                }

                @Override // io.deephaven.server.browserstreaming.BrowserStream.Marshaller
                public void onCompleted() {
                    this.requestObserver.onCompleted();
                }
            });
        };
    }

    private BrowserStream(Mode mode, SessionState sessionState, Marshaller<T> marshaller) {
        this.mode = mode;
        this.session = sessionState;
        this.marshaller = marshaller;
        this.session.addOnCloseCallback(this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onMessageReceived(T t, StreamData streamData) {
        synchronized (this) {
            if (this.halfClosedSeq != -1 && streamData.getSequence() > this.halfClosedSeq) {
                Code code = Code.ABORTED;
                long j = this.halfClosedSeq;
                streamData.getSequence();
                throw Exceptions.statusRuntimeException(code, "Sequence sent after half close: closed seq=" + j + " recv seq=" + code);
            }
            if (streamData.isHalfClose()) {
                if (this.halfClosedSeq != -1) {
                    Code code2 = Code.INVALID_ARGUMENT;
                    long j2 = this.halfClosedSeq;
                    streamData.getSequence();
                    throw Exceptions.statusRuntimeException(code2, "Already half closed: closed seq=" + j2 + " recv seq=" + code2);
                }
                this.halfClosedSeq = streamData.getSequence();
            }
            if (this.mode == Mode.IN_ORDER) {
                if (streamData.getSequence() < this.nextSeq) {
                    Code code3 = Code.OUT_OF_RANGE;
                    long j3 = this.nextSeq;
                    streamData.getSequence();
                    throw Exceptions.statusRuntimeException(code3, "Duplicate sequence sent: next seq=" + j3 + " recv seq=" + code3);
                }
                boolean z = false;
                if (this.processingMessage) {
                    z = true;
                    log.debug().append(this.logIdentity).append("queueing; next seq=").append(this.nextSeq).append(" recv seq=").append(streamData.getSequence()).endl();
                } else if (streamData.getSequence() != this.nextSeq) {
                    z = true;
                    log.debug().append(this.logIdentity).append("queueing; waiting seq=").append(this.nextSeq).append(" recv seq=").append(streamData.getSequence()).endl();
                }
                if (z) {
                    if (this.pendingSeq == null) {
                        this.pendingSeq = new RAPriQueue<>(1, MessageInfoQueueAdapter.getInstance(), Message.class);
                    }
                    this.pendingSeq.enter(new Message(t, streamData));
                    return;
                }
            } else if (streamData.getSequence() < this.nextSeq || ((streamData.getSequence() == this.nextSeq && this.processingMessage) || (this.queuedStreamData != null && streamData.getSequence() < this.queuedStreamData.getSequence()))) {
                log.debug().append(this.logIdentity).append("dropping; next seq=").append(this.nextSeq).append(" queued seq=").append(this.queuedStreamData != null ? this.queuedStreamData.getSequence() : -1).append(" recv seq=").append(streamData.getSequence()).endl();
                return;
            } else if (this.processingMessage) {
                log.debug().append(this.logIdentity).append("queueing; processing seq=").append(this.nextSeq).append(" recv seq=").append(streamData.getSequence()).endl();
                this.queuedStreamData = streamData;
                this.queuedMessage = t;
                return;
            }
            this.nextSeq = streamData.getSequence() + 1;
            this.processingMessage = true;
            while (true) {
                synchronized (this) {
                    if (streamData.isHalfClose()) {
                        onComplete();
                        this.processingMessage = false;
                        return;
                    }
                    try {
                        this.marshaller.onMessageReceived(t);
                        synchronized (this) {
                            if (this.mode == Mode.IN_ORDER) {
                                if (this.pendingSeq == null || this.pendingSeq.top() == null) {
                                    t = null;
                                    streamData = null;
                                } else {
                                    Message message = (Message) this.pendingSeq.top();
                                    t = message.getMessage();
                                    streamData = message.getStreamData();
                                }
                                if (streamData == null || streamData.getSequence() != this.nextSeq) {
                                    break;
                                } else {
                                    Assert.eq(((Message) this.pendingSeq.removeTop()).getMessage(), "pendingSeq.remoteTop()", t, "message");
                                }
                            } else {
                                t = this.queuedMessage;
                                streamData = this.queuedStreamData;
                                if (t == null) {
                                    this.processingMessage = false;
                                    return;
                                }
                                this.queuedStreamData = null;
                            }
                            log.debug().append(this.logIdentity).append("processing queued seq=").append(streamData.getSequence()).endl();
                            this.nextSeq = streamData.getSequence() + 1;
                        }
                    } catch (RuntimeException e) {
                        onError(e);
                        return;
                    }
                }
            }
            this.processingMessage = false;
        }
    }

    public void onError(RuntimeException runtimeException) {
        if (this.session.removeOnCloseCallback(this)) {
            log.error().append(this.logIdentity).append("closing browser stream on unexpected exception: ").append(runtimeException).endl();
            this.marshaller.onError(runtimeException);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.marshaller.onCancel();
    }

    private void onComplete() {
        if (this.session.removeOnCloseCallback(this)) {
            log.debug().append(this.logIdentity).append("browser stream completed").endl();
            this.marshaller.onCompleted();
        }
    }
}
