/*
 * Decompiled with CFR 0.152.
 */
package ai.konduit.serving.vertx.protocols.grpc.verticle;

import ai.konduit.serving.pipeline.api.data.Data;
import ai.konduit.serving.pipeline.impl.data.ProtoData;
import ai.konduit.serving.pipeline.impl.data.protobuf.DataProtoMessage;
import ai.konduit.serving.vertx.protocols.grpc.api.InferenceGrpc;
import ai.konduit.serving.vertx.verticle.InferenceVerticle;
import io.grpc.BindableService;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.grpc.VertxServer;
import io.vertx.grpc.VertxServerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InferenceVerticleGrpc
extends InferenceVerticle {
    private static final Logger log = LoggerFactory.getLogger(InferenceVerticleGrpc.class);

    public void start(Promise<Void> startPromise) {
        this.vertx.executeBlocking(handler -> {
            try {
                this.initialize();
                handler.complete();
            }
            catch (Exception exception) {
                handler.fail((Throwable)exception);
                startPromise.fail((Throwable)exception);
            }
        }, resultHandler -> {
            if (resultHandler.failed()) {
                if (resultHandler.cause() != null) {
                    startPromise.fail(resultHandler.cause());
                } else {
                    startPromise.fail("Failed to start. Unknown cause.");
                }
            } else {
                int port;
                String portEnvValue = System.getenv("KONDUIT_SERVING_PORT");
                if (portEnvValue != null) {
                    try {
                        port = Integer.parseInt(portEnvValue);
                    }
                    catch (NumberFormatException exception) {
                        log.error("Environment variable \"{}={}\" isn't a valid port number.", (Object)"KONDUIT_SERVING_PORT", (Object)portEnvValue);
                        startPromise.fail((Throwable)exception);
                        return;
                    }
                } else {
                    port = this.inferenceConfiguration.port();
                }
                if (port < 0 || port > 65535) {
                    startPromise.fail((Throwable)new Exception("Valid port range is 0 <= port <= 65535. The given port was " + port));
                    return;
                }
                VertxServer rpcServer = VertxServerBuilder.forAddress((Vertx)this.vertx, (String)this.inferenceConfiguration.host(), (int)this.inferenceConfiguration.port()).addService((BindableService)new InferenceGrpc.InferenceImplBase(){

                    @Override
                    public void predict(DataProtoMessage.DataScheme request, StreamObserver<DataProtoMessage.DataScheme> responseObserver) {
                        try {
                            Data output = InferenceVerticleGrpc.this.pipelineExecutor.exec(ProtoData.fromBytes((byte[])request.toByteArray()));
                            responseObserver.onNext((Object)DataProtoMessage.DataScheme.parseFrom((byte[])output.asBytes()));
                            responseObserver.onCompleted();
                        }
                        catch (Throwable throwable) {
                            log.error("Failed to process the pipeline with the input data", throwable);
                            responseObserver.onError(throwable);
                        }
                    }
                }).build();
                rpcServer.start(handler -> {
                    if (handler.succeeded()) {
                        int actualPort = rpcServer.getPort();
                        this.inferenceConfiguration.port(actualPort);
                        try {
                            ((ContextInternal)this.context).getDeployment().deploymentOptions().setConfig(new JsonObject(this.inferenceConfiguration.toJson()));
                            long pid = this.getPid();
                            this.saveInspectionDataIfRequired(pid);
                            log.info("Inference gRPC server is listening on host: '{}'", (Object)this.inferenceConfiguration.host());
                            log.info("Inference gRPC server started on port {} with {} pipeline steps", (Object)actualPort, (Object)this.pipeline.size());
                            startPromise.complete();
                        }
                        catch (Throwable throwable) {
                            startPromise.fail(throwable);
                        }
                    } else {
                        startPromise.fail(handler.cause());
                    }
                });
            }
        });
    }
}

