package io.deephaven.server.object;

import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.plugin.type.ObjectCommunicationException;
import io.deephaven.plugin.type.ObjectType;
import io.deephaven.plugin.type.ObjectTypeLookup;
import io.deephaven.proto.backplane.grpc.ClientData;
import io.deephaven.proto.backplane.grpc.FetchObjectRequest;
import io.deephaven.proto.backplane.grpc.FetchObjectResponse;
import io.deephaven.proto.backplane.grpc.ObjectServiceGrpc;
import io.deephaven.proto.backplane.grpc.ServerData;
import io.deephaven.proto.backplane.grpc.StreamRequest;
import io.deephaven.proto.backplane.grpc.StreamResponse;
import io.deephaven.proto.backplane.grpc.TypedTicket;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.grpc.GrpcErrorHelper;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.function.ThrowingRunnable;
import io.grpc.stub.StreamObserver;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/server/object/ObjectServiceGrpcImpl.class */
public class ObjectServiceGrpcImpl extends ObjectServiceGrpc.ObjectServiceImplBase {
    private final SessionService sessionService;
    private final TicketRouter ticketRouter;
    private final ObjectTypeLookup objectTypeLookup;
    private final TypeLookup typeLookup;
    private final SessionService.ErrorTransformer errorTransformer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/server/object/ObjectServiceGrpcImpl$EnqueuedState.class */
    public enum EnqueuedState {
        WAITING,
        RUNNING,
        CLOSED
    }

    /* loaded from: input_file:io/deephaven/server/object/ObjectServiceGrpcImpl$PluginMessageSender.class */
    private final class PluginMessageSender implements ObjectType.MessageStream {
        private final StreamObserver<StreamResponse> responseObserver;
        private final SessionState sessionState;

        public PluginMessageSender(StreamObserver<StreamResponse> streamObserver, SessionState sessionState) {
            this.responseObserver = streamObserver;
            this.sessionState = sessionState;
        }

        public void onData(ByteBuffer byteBuffer, Object[] objArr) throws ObjectCommunicationException {
            ArrayList arrayList = new ArrayList(objArr.length);
            try {
                ServerData.Builder payload = ServerData.newBuilder().setPayload(ByteString.copyFrom(byteBuffer));
                for (Object obj : objArr) {
                    String orElse = ObjectServiceGrpcImpl.this.typeLookup.type(obj).orElse(null);
                    SessionState.ExportObject<?> newServerSideExport = this.sessionState.newServerSideExport(obj);
                    arrayList.add(newServerSideExport);
                    payload.addExportedReferences(ticketForExport(newServerSideExport, orElse));
                }
                StreamResponse build = StreamResponse.newBuilder().setData(payload).build();
                synchronized (this.responseObserver) {
                    this.responseObserver.onNext(build);
                }
            } catch (Throwable th) {
                ObjectServiceGrpcImpl.cleanup(arrayList, th);
                throw new ObjectCommunicationException(th);
            }
        }

        private TypedTicket ticketForExport(SessionState.ExportObject<?> exportObject, String str) {
            TypedTicket.Builder ticket = TypedTicket.newBuilder().setTicket(exportObject.getExportId());
            if (str != null) {
                ticket.setType(str);
            }
            return ticket.build();
        }

        public void onClose() {
            GrpcUtil.safelyComplete(this.responseObserver);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/server/object/ObjectServiceGrpcImpl$SendMessageObserver.class */
    public final class SendMessageObserver implements StreamObserver<StreamRequest> {
        private final SessionState session;
        private final StreamObserver<StreamResponse> responseObserver;
        private ObjectType.MessageStream messageStream;
        private boolean seenConnect = false;
        private final Queue<EnqueuedStreamOperation> operations = new ConcurrentLinkedQueue();
        private final AtomicReference<EnqueuedState> runState = new AtomicReference<>(EnqueuedState.WAITING);

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/deephaven/server/object/ObjectServiceGrpcImpl$SendMessageObserver$EnqueuedStreamOperation.class */
        public class EnqueuedStreamOperation {
            private final StreamOperation wrapped;
            private final List<SessionState.ExportObject<?>> requirements;

            EnqueuedStreamOperation(Collection<? extends SessionState.ExportObject<?>> collection, StreamOperation streamOperation) {
                this.wrapped = streamOperation;
                this.requirements = List.copyOf(collection);
            }

            public void run() {
                SessionState.ExportBuilder nonExport = SendMessageObserver.this.session.nonExport();
                SendMessageObserver sendMessageObserver = SendMessageObserver.this;
                nonExport.onErrorHandler((v1) -> {
                    r1.onError(v1);
                }).require((List<? extends SessionState.ExportObject<?>>) this.requirements).submit(() -> {
                    if (SendMessageObserver.this.runState.get() == EnqueuedState.CLOSED) {
                        return;
                    }
                    try {
                        this.wrapped.run();
                        if (SendMessageObserver.this.runState.compareAndSet(EnqueuedState.RUNNING, EnqueuedState.WAITING)) {
                            SendMessageObserver.this.doWork();
                        }
                    } catch (ObjectCommunicationException e) {
                        throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Error performing MessageStream operation");
                    }
                });
            }
        }

        private SendMessageObserver(SessionState sessionState, StreamObserver<StreamResponse> streamObserver) {
            this.session = sessionState;
            this.responseObserver = streamObserver;
        }

        public void onNext(StreamRequest streamRequest) {
            GrpcErrorHelper.checkHasOneOf(streamRequest, "message");
            if (streamRequest.hasConnect()) {
                if (this.seenConnect) {
                    throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Already sent a connect request, cannot send another");
                }
                this.seenConnect = true;
                TypedTicket sourceId = streamRequest.getConnect().getSourceId();
                String type = sourceId.getType();
                if (type.isEmpty()) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "No type supplied");
                }
                if (sourceId.getTicket().getTicket().isEmpty()) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "No ticket supplied");
                }
                SessionState.ExportObject resolve = ObjectServiceGrpcImpl.this.ticketRouter.resolve(this.session, sourceId.getTicket(), "sourceId");
                runOrEnqueue(Collections.singleton(resolve), () -> {
                    Object obj = resolve.get();
                    this.messageStream = ObjectServiceGrpcImpl.this.getObjectTypeInstance(type, obj).clientConnection(obj, new PluginMessageSender(this.responseObserver, this.session));
                });
                return;
            }
            if (streamRequest.hasData()) {
                if (!this.seenConnect) {
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Data message sent before Connect message");
                }
                ClientData data = streamRequest.getData();
                LivenessScope livenessScope = new LivenessScope();
                SafeCloseable open = LivenessScopeStack.open(livenessScope, false);
                try {
                    List list = (List) data.getReferencesList().stream().map(typedTicket -> {
                        return ObjectServiceGrpcImpl.this.ticketRouter.resolve(this.session, typedTicket.getTicket(), "ticket");
                    }).collect(Collectors.toList());
                    if (open != null) {
                        open.close();
                    }
                    runOrEnqueue(list, () -> {
                        try {
                            Object[] array = list.stream().map((v0) -> {
                                return v0.get();
                            }).toArray();
                            livenessScope.release();
                            this.messageStream.onData(data.getPayload().asReadOnlyByteBuffer(), array);
                        } catch (Throwable th) {
                            livenessScope.release();
                            throw th;
                        }
                    });
                } catch (Throwable th) {
                    if (open != null) {
                        try {
                            open.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }

        private void runOrEnqueue(Collection<? extends SessionState.ExportObject<?>> collection, StreamOperation streamOperation) {
            this.operations.add(new EnqueuedStreamOperation(collection, streamOperation));
            doWork();
        }

        private void doWork() {
            EnqueuedStreamOperation peek = this.operations.peek();
            if (peek == null || !this.runState.compareAndSet(EnqueuedState.WAITING, EnqueuedState.RUNNING)) {
                return;
            }
            Assert.eq(peek, "next", this.operations.poll(), "actualNext");
            peek.run();
        }

        public void onError(Throwable th) {
            this.runState.set(EnqueuedState.CLOSED);
            GrpcUtil.safelyError(this.responseObserver, ObjectServiceGrpcImpl.this.errorTransformer.transform(th));
            if (this.messageStream != null) {
                closeMessageStream();
            }
            this.operations.clear();
        }

        private void closeMessageStream() {
            try {
                this.messageStream.onClose();
            } catch (Exception e) {
            }
        }

        public void onCompleted() {
            runOrEnqueue(Collections.emptyList(), () -> {
                this.runState.set(EnqueuedState.CLOSED);
                GrpcUtil.safelyComplete(this.responseObserver);
                if (this.messageStream != null) {
                    closeMessageStream();
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:io/deephaven/server/object/ObjectServiceGrpcImpl$StreamOperation.class */
    public interface StreamOperation extends ThrowingRunnable<ObjectCommunicationException> {
    }

    @Inject
    public ObjectServiceGrpcImpl(SessionService sessionService, TicketRouter ticketRouter, ObjectTypeLookup objectTypeLookup, TypeLookup typeLookup, SessionService.ErrorTransformer errorTransformer) {
        this.sessionService = (SessionService) Objects.requireNonNull(sessionService);
        this.ticketRouter = (TicketRouter) Objects.requireNonNull(ticketRouter);
        this.objectTypeLookup = (ObjectTypeLookup) Objects.requireNonNull(objectTypeLookup);
        this.typeLookup = (TypeLookup) Objects.requireNonNull(typeLookup);
        this.errorTransformer = (SessionService.ErrorTransformer) Objects.requireNonNull(errorTransformer);
    }

    public void fetchObject(@NotNull FetchObjectRequest fetchObjectRequest, @NotNull StreamObserver<FetchObjectResponse> streamObserver) {
        SessionState currentSession = this.sessionService.getCurrentSession();
        String type = fetchObjectRequest.getSourceId().getType();
        if (type.isEmpty()) {
            throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "No type supplied");
        }
        if (fetchObjectRequest.getSourceId().getTicket().getTicket().isEmpty()) {
            throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "No ticket supplied");
        }
        QueryPerformanceRecorder newQuery = QueryPerformanceRecorder.newQuery("ObjectService#fetchObject(object=" + this.ticketRouter.getLogNameFor(fetchObjectRequest.getSourceId().getTicket(), "sourceId") + ")", currentSession.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY);
        SafeCloseable startQuery = newQuery.startQuery();
        try {
            SessionState.ExportObject<?> resolve = this.ticketRouter.resolve(currentSession, fetchObjectRequest.getSourceId().getTicket(), "sourceId");
            currentSession.nonExport().queryPerformanceRecorder(newQuery).require(resolve).onError((StreamObserver<?>) streamObserver).submit(() -> {
                Object obj = resolve.get();
                ObjectType objectTypeInstance = getObjectTypeInstance(type, obj);
                final AtomicReference atomicReference = new AtomicReference();
                final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                PluginMessageSender pluginMessageSender = new PluginMessageSender(new StreamObserver<StreamResponse>() { // from class: io.deephaven.server.object.ObjectServiceGrpcImpl.1
                    public void onNext(StreamResponse streamResponse) {
                        atomicReference.set(FetchObjectResponse.newBuilder().setType(type).setData(streamResponse.getData().getPayload()).addAllTypedExportIds(streamResponse.getData().getExportedReferencesList()).build());
                    }

                    public void onError(Throwable th) {
                        streamObserver.onError(th);
                    }

                    public void onCompleted() {
                        atomicBoolean.set(true);
                    }
                }, currentSession);
                objectTypeInstance.clientConnection(obj, pluginMessageSender);
                FetchObjectResponse fetchObjectResponse = (FetchObjectResponse) atomicReference.get();
                if (fetchObjectResponse == null) {
                    pluginMessageSender.onClose();
                    throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Plugin didn't send a response before returning from clientConnection()");
                }
                if (atomicBoolean.get()) {
                    GrpcUtil.safelyComplete(streamObserver, fetchObjectResponse);
                    return null;
                }
                pluginMessageSender.onClose();
                throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Plugin didn't close response, use MessageStream instead for this object");
            });
            if (startQuery != null) {
                startQuery.close();
            }
        } catch (Throwable th) {
            if (startQuery != null) {
                try {
                    startQuery.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public StreamObserver<StreamRequest> messageStream(StreamObserver<StreamResponse> streamObserver) {
        return new SendMessageObserver(this.sessionService.getCurrentSession(), streamObserver);
    }

    @NotNull
    private ObjectType getObjectTypeInstance(String str, Object obj) {
        Optional findObjectType = this.objectTypeLookup.findObjectType(obj);
        if (findObjectType.isEmpty()) {
            throw Exceptions.statusRuntimeException(Code.NOT_FOUND, String.format("No ObjectType found, expected type '%s'", str));
        }
        ObjectType objectType = (ObjectType) findObjectType.get();
        if (str.equals(objectType.name())) {
            return objectType;
        }
        throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, String.format("Unexpected ObjectType, expected type '%s', actual type '%s'", str, objectType.name()));
    }

    private static void cleanup(Collection<SessionState.ExportObject<?>> collection, Throwable th) {
        Iterator<SessionState.ExportObject<?>> it = collection.iterator();
        while (it.hasNext()) {
            try {
                it.next().release();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
        }
    }
}
