package io.deephaven.server.arrow;

import com.google.protobuf.ByteString;
import com.google.protobuf.ByteStringAccess;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.Code;
import io.deephaven.auth.AuthContext;
import io.deephaven.auth.AuthenticationException;
import io.deephaven.auth.AuthenticationRequestHandler;
import io.deephaven.auth.BasicAuthMarshaller;
import io.deephaven.extensions.barrage.BarrageSnapshotOptions;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.backplane.grpc.WrappedAuthenticationRequest;
import io.deephaven.server.arrow.ArrowFlightUtil;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.session.SessionState;
import io.deephaven.server.session.TicketRouter;
import io.grpc.stub.StreamObserver;
import java.io.InputStream;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.arrow.flight.impl.Flight;
import org.apache.arrow.flight.impl.FlightServiceGrpc;
import org.jetbrains.annotations.Nullable;

@Singleton
/* loaded from: input_file:io/deephaven/server/arrow/FlightServiceGrpcImpl.class */
public class FlightServiceGrpcImpl extends FlightServiceGrpc.FlightServiceImplBase {
    static final BarrageSnapshotOptions DEFAULT_SNAPSHOT_DESER_OPTIONS = BarrageSnapshotOptions.builder().build();
    private static final Logger log = LoggerFactory.getLogger(FlightServiceGrpcImpl.class);
    private final ScheduledExecutorService executorService;
    private final SessionService sessionService;
    private final TicketRouter ticketRouter;
    private final ArrowFlightUtil.DoExchangeMarshaller.Factory doExchangeFactory;
    private final Map<String, AuthenticationRequestHandler> authRequestHandlers;

    /* loaded from: input_file:io/deephaven/server/arrow/FlightServiceGrpcImpl$HandshakeObserver.class */
    private final class HandshakeObserver implements StreamObserver<Flight.HandshakeRequest> {
        private boolean isComplete = false;
        private final StreamObserver<Flight.HandshakeResponse> responseObserver;

        private HandshakeObserver(StreamObserver<Flight.HandshakeResponse> streamObserver) {
            this.responseObserver = streamObserver;
        }

        public void onNext(Flight.HandshakeRequest handshakeRequest) {
            Optional<AuthContext> empty;
            SessionState optionalSession = FlightServiceGrpcImpl.this.sessionService.getOptionalSession();
            if (optionalSession != null) {
                respondWithAuthTokenBin(optionalSession);
                return;
            }
            AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener = (j, byteBuffer) -> {
                GrpcUtil.safelyExecute(() -> {
                    this.responseObserver.onNext(Flight.HandshakeResponse.newBuilder().setProtocolVersion(j).setPayload(ByteStringAccess.wrap(byteBuffer)).build());
                });
            };
            ByteString payload = handshakeRequest.getPayload();
            long protocolVersion = handshakeRequest.getProtocolVersion();
            try {
                empty = login(BasicAuthMarshaller.AUTH_TYPE, protocolVersion, payload, handshakeResponseListener);
                if (empty.isEmpty()) {
                    WrappedAuthenticationRequest parseFrom = WrappedAuthenticationRequest.parseFrom(payload);
                    empty = login(parseFrom.getType(), protocolVersion, parseFrom.getPayload(), handshakeResponseListener);
                }
            } catch (AuthenticationException | InvalidProtocolBufferException e) {
                FlightServiceGrpcImpl.log.error().append("Authentication failed: ").append(e).endl();
                empty = Optional.empty();
            }
            if (empty.isEmpty()) {
                this.responseObserver.onError(GrpcUtil.statusRuntimeException(Code.UNAUTHENTICATED, "authentication details invalid"));
            } else {
                respondWithAuthTokenBin(FlightServiceGrpcImpl.this.sessionService.newSession(empty.get()));
            }
        }

        private Optional<AuthContext> login(String str, long j, ByteString byteString, AuthenticationRequestHandler.HandshakeResponseListener handshakeResponseListener) throws AuthenticationException {
            AuthenticationRequestHandler authenticationRequestHandler = FlightServiceGrpcImpl.this.authRequestHandlers.get(str);
            return authenticationRequestHandler == null ? Optional.empty() : authenticationRequestHandler.login(j, byteString.asReadOnlyByteBuffer(), handshakeResponseListener);
        }

        private void respondWithAuthTokenBin(SessionState sessionState) {
            this.isComplete = true;
            this.responseObserver.onNext(Flight.HandshakeResponse.newBuilder().setPayload(sessionState.getExpiration().getTokenAsByteString()).build());
            this.responseObserver.onCompleted();
        }

        public void onError(Throwable th) {
        }

        public void onCompleted() {
            if (this.isComplete) {
                return;
            }
            this.responseObserver.onError(GrpcUtil.statusRuntimeException(Code.UNAUTHENTICATED, "no authentication details provided"));
        }
    }

    @Inject
    public FlightServiceGrpcImpl(@Nullable ScheduledExecutorService scheduledExecutorService, SessionService sessionService, TicketRouter ticketRouter, ArrowFlightUtil.DoExchangeMarshaller.Factory factory, Map<String, AuthenticationRequestHandler> map) {
        this.executorService = scheduledExecutorService;
        this.sessionService = sessionService;
        this.ticketRouter = ticketRouter;
        this.doExchangeFactory = factory;
        this.authRequestHandlers = map;
    }

    public StreamObserver<Flight.HandshakeRequest> handshake(StreamObserver<Flight.HandshakeResponse> streamObserver) {
        return (StreamObserver) GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            return new HandshakeObserver(streamObserver);
        });
    }

    public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightInfo> streamObserver) {
        GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            TicketRouter ticketRouter = this.ticketRouter;
            SessionState optionalSession = this.sessionService.getOptionalSession();
            Objects.requireNonNull(streamObserver);
            ticketRouter.visitFlightInfo(optionalSession, (v1) -> {
                r2.onNext(v1);
            });
            streamObserver.onCompleted();
        });
    }

    public void getFlightInfo(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.FlightInfo> streamObserver) {
        GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            SessionState optionalSession = this.sessionService.getOptionalSession();
            SessionState.ExportObject<?> flightInfoFor = this.ticketRouter.flightInfoFor(optionalSession, flightDescriptor, "request");
            if (optionalSession != null) {
                optionalSession.nonExport().require(flightInfoFor).onError((StreamObserver<?>) streamObserver).submit(() -> {
                    streamObserver.onNext((Flight.FlightInfo) flightInfoFor.get());
                    streamObserver.onCompleted();
                });
                return;
            }
            if (!flightInfoFor.tryRetainReference()) {
                streamObserver.onError(GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info"));
                return;
            }
            try {
                if (flightInfoFor.getState() == ExportNotification.State.EXPORTED) {
                    streamObserver.onNext(flightInfoFor.get());
                    streamObserver.onCompleted();
                }
            } finally {
                flightInfoFor.dropReference();
            }
        });
    }

    public void getSchema(Flight.FlightDescriptor flightDescriptor, StreamObserver<Flight.SchemaResult> streamObserver) {
        GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            SessionState optionalSession = this.sessionService.getOptionalSession();
            SessionState.ExportObject<?> flightInfoFor = this.ticketRouter.flightInfoFor(optionalSession, flightDescriptor, "request");
            if (optionalSession != null) {
                optionalSession.nonExport().require(flightInfoFor).onError((StreamObserver<?>) streamObserver).submit(() -> {
                    streamObserver.onNext(Flight.SchemaResult.newBuilder().setSchema(((Flight.FlightInfo) flightInfoFor.get()).getSchema()).build());
                    streamObserver.onCompleted();
                });
                return;
            }
            if (!flightInfoFor.tryRetainReference()) {
                streamObserver.onError(GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Could not find flight info"));
                return;
            }
            try {
                if (flightInfoFor.getState() == ExportNotification.State.EXPORTED) {
                    streamObserver.onNext(Flight.SchemaResult.newBuilder().setSchema(flightInfoFor.get().getSchema()).build());
                    streamObserver.onCompleted();
                }
            } finally {
                flightInfoFor.dropReference();
            }
        });
    }

    public void doGetCustom(Flight.Ticket ticket, StreamObserver<InputStream> streamObserver) {
        GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            ArrowFlightUtil.DoGetCustom(this.executorService, this.sessionService.getCurrentSession(), this.ticketRouter, ticket, streamObserver);
        });
    }

    public StreamObserver<InputStream> doPutCustom(StreamObserver<Flight.PutResult> streamObserver) {
        return (StreamObserver) GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            return new ArrowFlightUtil.DoPutObserver(this.executorService, this.sessionService.getCurrentSession(), this.ticketRouter, streamObserver);
        });
    }

    public StreamObserver<InputStream> doExchangeCustom(StreamObserver<InputStream> streamObserver) {
        return (StreamObserver) GrpcUtil.rpcWrapper(log, streamObserver, () -> {
            return this.doExchangeFactory.openExchange(this.sessionService.getCurrentSession(), streamObserver);
        });
    }
}
