From 602591aa8c0ded9d002389d4ed73211f602b860c Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Wed, 26 Jun 2019 15:42:30 +0800 Subject: [PATCH] qianfan task result add judge update --- .../worker/runner/TaskScheduleThread.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 60edad5606..75b1e01a60 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -25,15 +25,14 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; -import cn.escheduler.common.utils.CommonUtils; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.HadoopUtils; -import cn.escheduler.common.utils.TaskParametersUtils; +import cn.escheduler.common.task.shell.ShellParameters; +import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; +import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; @@ -144,6 +143,7 @@ public class TaskScheduleThread implements Callable { TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); + List projectRes = createProjectResFiles(taskNode); // copy hdfs file to local @@ -205,17 +205,25 @@ public class TaskScheduleThread implements Callable { // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - Date scheduleTime = processInstance.getScheduleTime(); - if(scheduleTime == null){ - scheduleTime = processInstance.getStartTime(); - } - // process exec time : yyyyMMdd format - String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; + AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } + } } }