package ai.konduit.serving.vertx.protocols.grpc.verticle;

import ai.konduit.serving.pipeline.impl.data.ProtoData;
import ai.konduit.serving.pipeline.impl.data.protobuf.DataProtoMessage;
import ai.konduit.serving.vertx.protocols.grpc.api.InferenceGrpc;
import ai.konduit.serving.vertx.verticle.InferenceVerticle;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.grpc.VertxServer;
import io.vertx.grpc.VertxServerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/konduit/serving/vertx/protocols/grpc/verticle/InferenceVerticleGrpc.class */
public class InferenceVerticleGrpc extends InferenceVerticle {
    private static final Logger log = LoggerFactory.getLogger(InferenceVerticleGrpc.class);

    public void start(Promise<Void> promise) {
        this.vertx.executeBlocking(promise2 -> {
            try {
                initialize();
                promise2.complete();
            } catch (Exception e) {
                promise2.fail(e);
                promise.fail(e);
            }
        }, asyncResult -> {
            int parseInt;
            if (asyncResult.failed()) {
                if (asyncResult.cause() != null) {
                    promise.fail(asyncResult.cause());
                    return;
                } else {
                    promise.fail("Failed to start. Unknown cause.");
                    return;
                }
            }
            String str = System.getenv("KONDUIT_SERVING_PORT");
            if (str != null) {
                try {
                    parseInt = Integer.parseInt(str);
                } catch (NumberFormatException e) {
                    log.error("Environment variable \"{}={}\" isn't a valid port number.", "KONDUIT_SERVING_PORT", str);
                    promise.fail(e);
                    return;
                }
            } else {
                parseInt = this.inferenceConfiguration.port();
            }
            if (parseInt < 0 || parseInt > 65535) {
                promise.fail(new Exception("Valid port range is 0 <= port <= 65535. The given port was " + parseInt));
            } else {
                VertxServer build = VertxServerBuilder.forAddress(this.vertx, this.inferenceConfiguration.host(), this.inferenceConfiguration.port()).addService(new InferenceGrpc.InferenceImplBase() { // from class: ai.konduit.serving.vertx.protocols.grpc.verticle.InferenceVerticleGrpc.1
                    @Override // ai.konduit.serving.vertx.protocols.grpc.api.InferenceGrpc.InferenceImplBase
                    public void predict(DataProtoMessage.DataScheme dataScheme, StreamObserver<DataProtoMessage.DataScheme> streamObserver) {
                        try {
                            streamObserver.onNext(DataProtoMessage.DataScheme.parseFrom(InferenceVerticleGrpc.this.pipelineExecutor.exec(ProtoData.fromBytes(dataScheme.toByteArray())).asBytes()));
                            streamObserver.onCompleted();
                        } catch (Throwable th) {
                            InferenceVerticleGrpc.log.error("Failed to process the pipeline with the input data", th);
                            streamObserver.onError(th);
                        }
                    }
                }).build();
                build.start(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        promise.fail(asyncResult.cause());
                        return;
                    }
                    int port = build.getPort();
                    this.inferenceConfiguration.port(port);
                    try {
                        this.context.getDeployment().deploymentOptions().setConfig(new JsonObject(this.inferenceConfiguration.toJson()));
                        saveInspectionDataIfRequired(getPid());
                        log.info("Inference gRPC server is listening on host: '{}'", this.inferenceConfiguration.host());
                        log.info("Inference gRPC server started on port {} with {} pipeline steps", Integer.valueOf(port), Integer.valueOf(this.pipeline.size()));
                        promise.complete();
                    } catch (Throwable th) {
                        promise.fail(th);
                    }
                });
            }
        });
    }
}
