From 602591aa8c0ded9d002389d4ed73211f602b860c Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Wed, 26 Jun 2019 15:42:30 +0800 Subject: [PATCH 1/2] qianfan task result add judge update --- .../worker/runner/TaskScheduleThread.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 60edad5606..75b1e01a60 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -25,15 +25,14 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; -import cn.escheduler.common.utils.CommonUtils; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.HadoopUtils; -import cn.escheduler.common.utils.TaskParametersUtils; +import cn.escheduler.common.task.shell.ShellParameters; +import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; +import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; @@ -144,6 +143,7 @@ public class TaskScheduleThread implements Callable { TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); + List projectRes = createProjectResFiles(taskNode); // copy hdfs file to local @@ -205,17 +205,25 @@ public class TaskScheduleThread implements Callable { // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - Date scheduleTime = processInstance.getScheduleTime(); - if(scheduleTime == null){ - scheduleTime = processInstance.getStartTime(); - } - // process exec time : yyyyMMdd format - String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; + AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } + } } } From 8b88cd0b3d71ee0503f60245f46595eb3eff5220 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Wed, 26 Jun 2019 16:09:03 +0800 Subject: [PATCH 2/2] qianfan task result add judge update --- .../worker/runner/TaskScheduleThread.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 75b1e01a60..c21a3a2f1b 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -25,7 +25,12 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; +import cn.escheduler.common.task.mr.MapreduceParameters; +import cn.escheduler.common.task.procedure.ProcedureParameters; +import cn.escheduler.common.task.python.PythonParameters; import cn.escheduler.common.task.shell.ShellParameters; +import cn.escheduler.common.task.spark.SparkParameters; +import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; @@ -206,7 +211,7 @@ public class TaskScheduleThread implements Callable { if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); + AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); // replace placeholder Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), @@ -279,6 +284,39 @@ public class TaskScheduleThread implements Callable { } + /** + * get current task parameter class + * @return + */ + private Class getCurTaskParamsClass(){ + Class paramsClass = null; + TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); + switch (taskType){ + case SHELL: + paramsClass = ShellParameters.class; + break; + case SQL: + paramsClass = SqlParameters.class; + break; + case PROCEDURE: + paramsClass = ProcedureParameters.class; + break; + case MR: + paramsClass = MapreduceParameters.class; + break; + case SPARK: + paramsClass = SparkParameters.class; + break; + case PYTHON: + paramsClass = PythonParameters.class; + break; + default: + logger.error("not support this task type: {}", taskType); + throw new IllegalArgumentException("not support this task type"); + } + return paramsClass; + } + /** * kill task */