package io.deephaven.server.arrow;

import com.github.f4b6a3.uuid.UuidCreator;
import com.github.f4b6a3.uuid.exception.InvalidUuidException;
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteStringAccess;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
import io.deephaven.auth.AuthContext;
import io.deephaven.auth.AuthenticationException;
import io.deephaven.auth.AuthenticationRequestHandler;
import io.deephaven.auth.BasicAuthMarshaller;
import io.deephaven.engine.table.impl.perf.QueryPerformanceNugget;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.table.impl.util.EngineMetrics;
import io.deephaven.extensions.barrage.BarrageStreamGenerator;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.server.arrow.ArrowFlightUtil;
import io.deephaven.server.session.ActionRouter;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.deephaven.util.SafeCloseable;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.arrow.flight.ProtocolExposer;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Singleton
/* loaded from: input_file:io/deephaven/server/arrow/FlightServiceGrpcImpl.class */
public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(FlightServiceGrpcImpl.class);
    private final ScheduledExecutorService executorService;
    private final BarrageStreamGenerator.Factory streamGeneratorFactory;
    private final SessionService sessionService;
    private final SessionService.ErrorTransformer errorTransformer;
    private final TicketRouter ticketRouter;
    private final ActionRouter actionRouter;
    private final ArrowFlightUtil.DoExchangeMarshaller.Factory doExchangeFactory;
    private final Map<String, AuthenticationRequestHandler> authRequestHandlers;

    /* loaded from: input_file:io/deephaven/server/arrow/FlightServiceGrpcImpl$HandshakeObserver.class */
    private final class HandshakeObserver implements StreamObserver<Flight.HandshakeRequest> {
        private boolean isComplete = false;
        private final StreamObserver<Flight.HandshakeResponse> responseObserver;

        private HandshakeObserver(StreamObserver<Flight.HandshakeResponse> streamObserver) {
            this.responseObserver = streamObserver;
        }

        public void onNext(Flight.HandshakeRequest handshakeRequest) {
            Optional<AuthContext> empty;
            SessionService.TokenExpiration expiration;
            AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener = (j, byteBuffer) -> {
                GrpcUtil.safelyOnNextAndComplete(this.responseObserver, Flight.HandshakeResponse.newBuilder().setProtocolVersion(j).setPayload(ByteStringAccess.wrap(byteBuffer)).build());
            };
            ByteString payload = handshakeRequest.getPayload();
            long protocolVersion = handshakeRequest.getProtocolVersion();
            try {
                empty = login(BasicAuthMarshaller.AUTH_TYPE, protocolVersion, payload, handshakeResponseListener);
                if (empty.isEmpty()) {
                    WrappedAuthenticationRequest parseFrom = WrappedAuthenticationRequest.parseFrom(payload);
                    if (parseFrom.getType().equals("Bearer ".trim())) {
                        try {
                            SessionState sessionForToken = FlightServiceGrpcImpl.this.sessionService.getSessionForToken(UuidCreator.fromString(parseFrom.getPayload().toString(StandardCharsets.US_ASCII)));
                            if (sessionForToken == null || (expiration = sessionForToken.getExpiration()) == null) {
                                return;
                            }
                            respondWithAuthTokenBin(expiration);
                            return;
                        } catch (IllegalArgumentException | InvalidUuidException e) {
                        }
                    }
                    empty = login(parseFrom.getType(), protocolVersion, parseFrom.getPayload(), handshakeResponseListener);
                }
            } catch (AuthenticationException | InvalidProtocolBufferException e2) {
                FlightServiceGrpcImpl.log.error().append("Authentication failed: ").append(e2).endl();
                empty = Optional.empty();
            }
            if (empty.isEmpty()) {
                this.responseObserver.onError(Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "Authentication details invalid"));
            } else {
                respondWithAuthTokenBin(FlightServiceGrpcImpl.this.sessionService.newSession(empty.get()).getExpiration());
            }
        }

        private Optional<AuthContext> login(String str, long j, ByteString byteString, AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener) throws AuthenticationException {
            AuthenticationRequestHandler authenticationRequestHandler = FlightServiceGrpcImpl.this.authRequestHandlers.get(str);
            if (authenticationRequestHandler != null) {
                return authenticationRequestHandler.login(j, byteString.asReadOnlyByteBuffer(), handshakeResponseListener);
            }
            FlightServiceGrpcImpl.log.info().append("No AuthenticationRequestHandler registered for type ").append(str).endl();
            return Optional.empty();
        }

        private void respondWithAuthTokenBin(SessionService.TokenExpiration tokenExpiration) {
            this.isComplete = true;
            this.responseObserver.onNext(Flight.HandshakeResponse.newBuilder().setPayload(tokenExpiration.getTokenAsByteString()).build());
            this.responseObserver.onCompleted();
        }

        public void onError(Throwable th) {
        }

        public void onCompleted() {
            if (this.isComplete) {
                return;
            }
            this.responseObserver.onError(Exceptions.statusRuntimeException(Code.UNAUTHENTICATED, "no authentication details provided"));
        }
    }

    @Inject
    public FlightServiceGrpcImpl(@Nullable ScheduledExecutorService scheduledExecutorService, BarrageStreamGenerator.Factory factory, SessionService sessionService, SessionService.ErrorTransformer errorTransformer, TicketRouter ticketRouter, ActionRouter actionRouter, ArrowFlightUtil.DoExchangeMarshaller.Factory factory2, Map<String, AuthenticationRequestHandler> map) {
        this.executorService = scheduledExecutorService;
        this.streamGeneratorFactory = factory;
        this.sessionService = sessionService;
        this.errorTransformer = errorTransformer;
        this.ticketRouter = ticketRouter;
        this.actionRouter = actionRouter;
        this.doExchangeFactory = factory2;
        this.authRequestHandlers = map;
    }

    public StreamObserver<Flight.HandshakeRequest> handshake(@NotNull final StreamObserver<Flight.HandshakeResponse> streamObserver) {
        return this.sessionService.getOptionalSession() != null ? new StreamObserver<Flight.HandshakeRequest>() { // from class: io.deephaven.server.arrow.FlightServiceGrpcImpl.1
            public void onNext(Flight.HandshakeRequest handshakeRequest) {
            }

            public void onError(Throwable th) {
            }

            public void onCompleted() {
                GrpcUtil.safelyComplete(streamObserver);
            }
        } : new HandshakeObserver(streamObserver);
    }

    public void doAction(Flight.Action action, StreamObserver<Flight.Result> streamObserver) {
        this.actionRouter.doAction(this.sessionService.getOptionalSession(), ProtocolExposer.fromProtocol(action), new ServerCallStreamObserverAdapter((ServerCallStreamObserver) streamObserver, ProtocolExposer::toProtocol));
    }

    public void listFlights(@NotNull Flight.Criteria criteria, @NotNull StreamObserver<Flight.FlightInfo> streamObserver) {
        if (!criteria.getExpression().isEmpty()) {
            streamObserver.onError(Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Criteria expressions are not supported"));
            return;
        }
        TicketRouter ticketRouter = this.ticketRouter;
        SessionState optionalSession = this.sessionService.getOptionalSession();
        Objects.requireNonNull(streamObserver);
        ticketRouter.visitFlightInfo(optionalSession, (v1) -> {
            r2.onNext(v1);
        });
        streamObserver.onCompleted();
    }

    public void listActions(Flight.Empty empty, StreamObserver<Flight.ActionType> streamObserver) {
        ActionRouter actionRouter = this.actionRouter;
        SessionState optionalSession = this.sessionService.getOptionalSession();
        Objects.requireNonNull(streamObserver);
        actionRouter.listActions(optionalSession, adapt((v1) -> {
            r2.onNext(v1);
        }, ProtocolExposer::toProtocol));
        streamObserver.onCompleted();
    }

    /* JADX WARN: Finally extract failed */
    public void getFlightInfo(@NotNull Flight.FlightDescriptor flightDescriptor, @NotNull StreamObserver<Flight.FlightInfo> streamObserver) {
        SessionState optionalSession = this.sessionService.getOptionalSession();
        QueryPerformanceRecorder newQuery = QueryPerformanceRecorder.newQuery("FlightService#getFlightInfo(request=" + String.valueOf(flightDescriptor) + ")", optionalSession == null ? null : optionalSession.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY);
        SafeCloseable startQuery = newQuery.startQuery();
        try {
            SessionState.ExportObject<?> flightInfoFor = this.ticketRouter.flightInfoFor(optionalSession, flightDescriptor, "request");
            if (optionalSession != null) {
                SessionState.ExportBuilder onSuccess = optionalSession.nonExport().queryPerformanceRecorder(newQuery).require(flightInfoFor).onError((StreamObserver<?>) streamObserver).onSuccess(flightInfo -> {
                    GrpcUtil.safelyOnNextAndComplete(streamObserver, flightInfo);
                });
                Objects.requireNonNull(flightInfoFor);
                onSuccess.submit(flightInfoFor::get);
                if (startQuery != null) {
                    startQuery.close();
                    return;
                }
                return;
            }
            StatusRuntimeException statusRuntimeException = null;
            if (flightInfoFor.tryRetainReference()) {
                try {
                    if (flightInfoFor.getState() == ExportNotification.State.EXPORTED) {
                        GrpcUtil.safelyOnNextAndComplete(streamObserver, flightInfoFor.get());
                    }
                    flightInfoFor.dropReference();
                } catch (Throwable th) {
                    flightInfoFor.dropReference();
                    throw th;
                }
            } else {
                statusRuntimeException = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info");
                GrpcUtil.safelyError(streamObserver, statusRuntimeException);
            }
            if (newQuery.endQuery() || statusRuntimeException != null) {
                EngineMetrics.getInstance().logQueryProcessingResults(newQuery, statusRuntimeException);
            }
            if (startQuery != null) {
                startQuery.close();
            }
        } catch (Throwable th2) {
            if (startQuery != null) {
                try {
                    startQuery.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    /* JADX WARN: Finally extract failed */
    public void getSchema(@NotNull Flight.FlightDescriptor flightDescriptor, @NotNull StreamObserver<Flight.SchemaResult> streamObserver) {
        SessionState optionalSession = this.sessionService.getOptionalSession();
        QueryPerformanceRecorder newQuery = QueryPerformanceRecorder.newQuery("FlightService#getSchema(request=" + String.valueOf(flightDescriptor) + ")", optionalSession == null ? null : optionalSession.getSessionId(), QueryPerformanceNugget.DEFAULT_FACTORY);
        SafeCloseable startQuery = newQuery.startQuery();
        try {
            SessionState.ExportObject<?> flightInfoFor = this.ticketRouter.flightInfoFor(optionalSession, flightDescriptor, "request");
            if (optionalSession != null) {
                optionalSession.nonExport().queryPerformanceRecorder(newQuery).require(flightInfoFor).onError((StreamObserver<?>) streamObserver).onSuccess(schemaResult -> {
                    GrpcUtil.safelyOnNextAndComplete(streamObserver, schemaResult);
                }).submit(() -> {
                    return Flight.SchemaResult.newBuilder().setSchema(((Flight.FlightInfo) flightInfoFor.get()).getSchema()).build();
                });
                if (startQuery != null) {
                    startQuery.close();
                    return;
                }
                return;
            }
            StatusRuntimeException statusRuntimeException = null;
            if (flightInfoFor.tryRetainReference()) {
                try {
                    if (flightInfoFor.getState() == ExportNotification.State.EXPORTED) {
                        GrpcUtil.safelyOnNextAndComplete(streamObserver, Flight.SchemaResult.newBuilder().setSchema(flightInfoFor.get().getSchema()).build());
                    }
                    flightInfoFor.dropReference();
                } catch (Throwable th) {
                    flightInfoFor.dropReference();
                    throw th;
                }
            } else {
                statusRuntimeException = Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info");
                streamObserver.onError(statusRuntimeException);
            }
            if (newQuery.endQuery() || statusRuntimeException != null) {
                EngineMetrics.getInstance().logQueryProcessingResults(newQuery, statusRuntimeException);
            }
            if (startQuery != null) {
                startQuery.close();
            }
        } catch (Throwable th2) {
            if (startQuery != null) {
                try {
                    startQuery.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<InputStream> streamObserver) {
        ArrowFlightUtil.DoGetCustom(this.streamGeneratorFactory, this.sessionService.getCurrentSession(), this.ticketRouter, ticket, streamObserver);
    }

    public StreamObserver<InputStream> doPutCustom(StreamObserver<Flight.PutResult> streamObserver) {
        return new ArrowFlightUtil.DoPutObserver(this.sessionService.getCurrentSession(), this.ticketRouter, this.errorTransformer, streamObserver);
    }

    public StreamObserver<InputStream> doExchangeCustom(StreamObserver<InputStream> streamObserver) {
        return this.doExchangeFactory.openExchange(this.sessionService.getCurrentSession(), streamObserver);
    }

    private static <T, R> Consumer<T> adapt(Consumer<R> consumer, Function<? super T, ? extends R> function) {
        return obj -> {
            consumer.accept(function.apply(obj));
        };
    }
}
