package com._4paradigm.openmldb.taskmanager.server.impl;

import com._4paradigm.openmldb.common.zk.ZKClient;
import com._4paradigm.openmldb.common.zk.ZKConfig;
import com._4paradigm.openmldb.proto.Common;
import com._4paradigm.openmldb.proto.TaskManager;
import com._4paradigm.openmldb.taskmanager.JobInfoManager;
import com._4paradigm.openmldb.taskmanager.LogManager;
import com._4paradigm.openmldb.taskmanager.OpenmldbBatchjobManager;
import com._4paradigm.openmldb.taskmanager.config.ConfigException;
import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig;
import com._4paradigm.openmldb.taskmanager.dao.JobInfo;
import com._4paradigm.openmldb.taskmanager.server.JobResultSaver;
import com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface;
import com._4paradigm.openmldb.taskmanager.udf.ExternalFunctionManager;
import com._4paradigm.openmldb.taskmanager.util.VersionUtil;
import com._4paradigm.openmldb.taskmanager.utils.VersionCli;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import scala.Option;

/* loaded from: input_file:com/_4paradigm/openmldb/taskmanager/server/impl/TaskManagerImpl.class */
public class TaskManagerImpl implements TaskManagerInterface {
    private static final Log logger = LogFactory.getLog(TaskManagerImpl.class);
    private volatile JobResultSaver jobResultSaver = new JobResultSaver();

    public TaskManagerImpl() throws InterruptedException, ConfigException {
        TaskManagerConfig.parse();
        initExternalFunction();
    }

    private void initExternalFunction() throws InterruptedException {
        ZKClient zKClient = new ZKClient(ZKConfig.builder().cluster(TaskManagerConfig.getZkCluster()).namespace(TaskManagerConfig.getZkRootPath()).sessionTimeout(TaskManagerConfig.getZkSessionTimeout()).baseSleepTime(TaskManagerConfig.getZkBaseSleepTime()).connectionTimeout(TaskManagerConfig.getZkConnectionTimeout()).maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime()).maxRetries(TaskManagerConfig.getZkMaxRetries()).cert(TaskManagerConfig.getZkCert()).build());
        zKClient.connect();
        String str = TaskManagerConfig.getZkRootPath() + "/data/function";
        try {
            for (String str2 : zKClient.getChildren(str)) {
                try {
                    Common.ExternalFun parseFrom = Common.ExternalFun.parseFrom(zKClient.getNodeValue(str + "/" + str2).getBytes());
                    ExternalFunctionManager.addFunction(parseFrom.getName(), parseFrom.getFile().substring(parseFrom.getFile().lastIndexOf("/") + 1));
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("Fail to parse protobuf of function: " + str2);
                }
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            logger.error("Fail to init external function from ZooKeeper");
        }
    }

    public TaskManager.JobInfo jobInfoToProto(JobInfo jobInfo) {
        TaskManager.JobInfo.Builder newBuilder = TaskManager.JobInfo.newBuilder();
        newBuilder.setId(jobInfo.getId());
        if (jobInfo.getJobType() != null) {
            newBuilder.setJobType(jobInfo.getJobType());
        }
        if (jobInfo.getState() != null) {
            newBuilder.setState(jobInfo.getState());
        }
        if (jobInfo.getStartTime() != null) {
            newBuilder.setStartTime(jobInfo.getStartTime().getTime());
        }
        if (jobInfo.getEndTime() != null) {
            newBuilder.setEndTime(jobInfo.getEndTime().getTime());
        }
        if (jobInfo.getParameter() != null) {
            newBuilder.setParameter(jobInfo.getParameter());
        }
        if (jobInfo.getCluster() != null) {
            newBuilder.setCluster(jobInfo.getCluster());
        }
        if (jobInfo.getApplicationId() != null) {
            newBuilder.setApplicationId(jobInfo.getApplicationId());
        }
        if (jobInfo.getError() != null) {
            newBuilder.setError(jobInfo.getError());
        }
        return newBuilder.build();
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobsResponse ShowJobs(TaskManager.ShowJobsRequest showJobsRequest) {
        try {
            List<JobInfo> jobs = JobInfoManager.getJobs(showJobsRequest.getUnfinished());
            TaskManager.ShowJobsResponse.Builder newBuilder = TaskManager.ShowJobsResponse.newBuilder();
            newBuilder.setCode(0);
            for (int i = 0; i < jobs.size(); i++) {
                newBuilder.addJobs(i, jobInfoToProto(jobs.get(i)));
            }
            return newBuilder.build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobsResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse ShowJob(TaskManager.ShowJobRequest showJobRequest) {
        try {
            TaskManager.ShowJobResponse.Builder newBuilder = TaskManager.ShowJobResponse.newBuilder();
            Option<JobInfo> job = JobInfoManager.getJob(showJobRequest.getId());
            if (job.isEmpty()) {
                newBuilder.setCode(-1).setMsg("Fail to get job with id: " + showJobRequest.getId());
            } else {
                newBuilder.setCode(0).setJob(jobInfoToProto((JobInfo) job.get()));
            }
            return newBuilder.build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.StopJobResponse StopJob(TaskManager.StopJobRequest stopJobRequest) {
        try {
            return TaskManager.StopJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(JobInfoManager.stopJob(stopJobRequest.getId()))).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.StopJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.DeleteJobResponse DeleteJob(TaskManager.DeleteJobRequest deleteJobRequest) {
        try {
            JobInfoManager.deleteJob(deleteJobRequest.getId());
            return TaskManager.DeleteJobResponse.newBuilder().setCode(0).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.DeleteJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse ShowBatchVersion(TaskManager.ShowBatchVersionRequest showBatchVersionRequest) {
        try {
            return TaskManager.ShowJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(OpenmldbBatchjobManager.showBatchVersion(showBatchVersionRequest.getSyncJob()))).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlRequest runBatchSqlRequest) {
        try {
            HashMap hashMap = new HashMap(runBatchSqlRequest.getConfMap());
            hashMap.put("spark.openmldb.savejobresult.http", String.format("http://%s:%d/openmldb.taskmanager.TaskManagerServer/SaveJobResult", TaskManagerConfig.getServerHost(), Integer.valueOf(TaskManagerConfig.getServerPort())));
            int genResultId = this.jobResultSaver.genResultId();
            hashMap.put("spark.openmldb.savejobresult.resultid", String.valueOf(genResultId));
            int id = OpenmldbBatchjobManager.runBatchSql(runBatchSqlRequest.getSql(), hashMap, runBatchSqlRequest.getDefaultDb()).getId();
            return ((JobInfo) JobInfoManager.getJob(id).get()).isSuccess() ? TaskManager.RunBatchSqlResponse.newBuilder().setCode(0).setOutput(this.jobResultSaver.readResult(genResultId, TaskManagerConfig.getBatchJobResultMaxWaitTime())).build() : TaskManager.RunBatchSqlResponse.newBuilder().setCode(-1).setMsg(String.format("The job %d fail and use 'SHOW JOBLOG %d' for more info", Integer.valueOf(id), Integer.valueOf(id))).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.RunBatchSqlResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    private JobInfo busyWaitJobInfo(int i, int i2) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + ((i2 == 0 ? TaskManagerConfig.getChannelKeepAliveTime() : i2) * 1000);
        while (System.currentTimeMillis() < currentTimeMillis) {
            Option<JobInfo> job = JobInfoManager.getJob(i);
            if (job.isEmpty()) {
                throw new RuntimeException("job " + i + " not found in job_info table");
            }
            if (((JobInfo) job.get()).isFinished()) {
                return (JobInfo) job.get();
            }
            Thread.sleep(10000L);
        }
        throw new RuntimeException("wait for job " + i + " timeout");
    }

    private JobInfo waitJobInfoWrapper(int i) throws Exception {
        try {
            busyWaitJobInfo(i, 0);
            Thread.sleep(2000L);
            return busyWaitJobInfo(i, 2);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("wait for job failed, use show job to get the job status. Job " + i, e);
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse RunBatchAndShow(TaskManager.RunBatchAndShowRequest runBatchAndShowRequest) {
        try {
            JobInfo runBatchAndShow = OpenmldbBatchjobManager.runBatchAndShow(runBatchAndShowRequest.getSql(), runBatchAndShowRequest.getConfMap(), runBatchAndShowRequest.getDefaultDb());
            if (runBatchAndShowRequest.getSyncJob()) {
                runBatchAndShow = waitJobInfoWrapper(runBatchAndShow.getId());
            }
            return TaskManager.ShowJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(runBatchAndShow)).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse ImportOnlineData(TaskManager.ImportOnlineDataRequest importOnlineDataRequest) {
        try {
            JobInfo importOnlineData = OpenmldbBatchjobManager.importOnlineData(importOnlineDataRequest.getSql(), importOnlineDataRequest.getConfMap(), importOnlineDataRequest.getDefaultDb());
            if (importOnlineDataRequest.getSyncJob()) {
                importOnlineData = waitJobInfoWrapper(importOnlineData.getId());
            }
            return TaskManager.ShowJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(importOnlineData)).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse ImportOfflineData(TaskManager.ImportOfflineDataRequest importOfflineDataRequest) {
        try {
            JobInfo importOfflineData = OpenmldbBatchjobManager.importOfflineData(importOfflineDataRequest.getSql(), importOfflineDataRequest.getConfMap(), importOfflineDataRequest.getDefaultDb());
            if (importOfflineDataRequest.getSyncJob()) {
                importOfflineData = waitJobInfoWrapper(importOfflineData.getId());
            }
            return TaskManager.ShowJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(importOfflineData)).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse ExportOfflineData(TaskManager.ExportOfflineDataRequest exportOfflineDataRequest) {
        try {
            JobInfo exportOfflineData = OpenmldbBatchjobManager.exportOfflineData(exportOfflineDataRequest.getSql(), exportOfflineDataRequest.getConfMap(), exportOfflineDataRequest.getDefaultDb());
            if (exportOfflineDataRequest.getSyncJob()) {
                exportOfflineData = waitJobInfoWrapper(exportOfflineData.getId());
            }
            return TaskManager.ShowJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(exportOfflineData)).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.ShowJobResponse InsertOfflineData(TaskManager.InsertOfflineDataRequest insertOfflineDataRequest) {
        try {
            JobInfo insertOfflineData = OpenmldbBatchjobManager.insertOfflineData(insertOfflineDataRequest.getSql(), insertOfflineDataRequest.getConfMap(), insertOfflineDataRequest.getDefaultDb());
            if (insertOfflineDataRequest.getSyncJob()) {
                insertOfflineData = waitJobInfoWrapper(insertOfflineData.getId());
            }
            return TaskManager.ShowJobResponse.newBuilder().setCode(0).setJob(jobInfoToProto(insertOfflineData)).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.ShowJobResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.DropOfflineTableResponse DropOfflineTable(TaskManager.DropOfflineTableRequest dropOfflineTableRequest) {
        try {
            JobInfoManager.dropOfflineTable(dropOfflineTableRequest.getDb(), dropOfflineTableRequest.getTable());
            return TaskManager.DropOfflineTableResponse.newBuilder().setCode(0).build();
        } catch (Exception e) {
            e.printStackTrace();
            return TaskManager.DropOfflineTableResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.GetJobLogResponse GetJobLog(TaskManager.GetJobLogRequest getJobLogRequest) {
        String str = "";
        try {
            try {
                str = LogManager.getJobLog(getJobLogRequest.getId());
            } catch (Exception e) {
                logger.warn(String.format("Fail to to get job log of job %s", Integer.valueOf(getJobLogRequest.getId())));
            }
            String str2 = "";
            try {
                str2 = LogManager.getJobErrorLog(getJobLogRequest.getId());
            } catch (Exception e2) {
                logger.warn(String.format("Fail to to get job error log of job %s", Integer.valueOf(getJobLogRequest.getId())));
            }
            return TaskManager.GetJobLogResponse.newBuilder().setCode(0).setLog(String.format("Stdout:\n%s\n\nStderr:\n%s", str, str2)).build();
        } catch (Exception e3) {
            e3.printStackTrace();
            return TaskManager.GetJobLogResponse.newBuilder().setCode(-1).setMsg(e3.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.GetVersionResponse GetVersion(TaskManager.EmptyMessage emptyMessage) {
        String str = "unknown";
        String str2 = "unknown";
        try {
            str = VersionCli.getVersion();
        } catch (Exception e) {
            logger.warn("Fail to get TaskManager version, message: " + e.getMessage());
        }
        try {
            str2 = VersionUtil.getBatchVersion();
        } catch (Exception e2) {
            logger.warn("Fail to get batch engine version, message: " + e2.getMessage());
        }
        return TaskManager.GetVersionResponse.newBuilder().setTaskmanagerVersion(str).setBatchVersion(str2).build();
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.CreateFunctionResponse CreateFunction(TaskManager.CreateFunctionRequest createFunctionRequest) {
        Common.ExternalFun fun = createFunctionRequest.getFun();
        if (fun.getFile().isEmpty()) {
            return TaskManager.CreateFunctionResponse.newBuilder().setCode(-1).setMsg("ExternalFun does not have the file path").build();
        }
        try {
            ExternalFunctionManager.addFunction(fun.getName(), fun.getFile().substring(fun.getFile().lastIndexOf("/") + 1));
            return TaskManager.CreateFunctionResponse.newBuilder().setCode(0).setMsg("ok").build();
        } catch (Exception e) {
            return TaskManager.CreateFunctionResponse.newBuilder().setCode(-1).setMsg(e.getMessage()).build();
        }
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.DropFunctionResponse DropFunction(TaskManager.DropFunctionRequest dropFunctionRequest) {
        ExternalFunctionManager.dropFunction(dropFunctionRequest.getName());
        return TaskManager.DropFunctionResponse.newBuilder().setCode(0).setMsg("ok").build();
    }

    @Override // com._4paradigm.openmldb.taskmanager.server.TaskManagerInterface
    public TaskManager.SaveJobResultResponse SaveJobResult(TaskManager.SaveJobResultRequest saveJobResultRequest) {
        if (saveJobResultRequest.getResultId() != -1 || !saveJobResultRequest.getJsonData().equals("reset")) {
            if (this.jobResultSaver.saveFile(saveJobResultRequest.getResultId(), saveJobResultRequest.getJsonData())) {
                return TaskManager.SaveJobResultResponse.newBuilder().setCode(0).setMsg("ok").build();
            }
            logger.error("save job result failed(write to local file) for resultId: " + saveJobResultRequest.getResultId());
            return TaskManager.SaveJobResultResponse.newBuilder().setCode(-1).setMsg("save job result failed(write to local file)").build();
        }
        try {
            this.jobResultSaver.reset();
            return TaskManager.SaveJobResultResponse.newBuilder().setCode(0).setMsg("reset job result saver ok").build();
        } catch (IOException e) {
            e.printStackTrace();
            return TaskManager.SaveJobResultResponse.newBuilder().setCode(-1).setMsg("reset job result saver failed, " + e.getMessage()).build();
        }
    }
}
