From 5c1b3f0ed580bb6330232627b8015e5c69ceeff5 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Fri, 21 Jun 2019 10:58:02 +0800 Subject: [PATCH 1/4] qianfan task result add judge --- .../java/cn/escheduler/common/Constants.java | 5 ++ .../common/enums/TaskRecordStatus.java | 35 ++++++++++++++ .../cn/escheduler/common/enums/TaskType.java | 8 +++- .../java/cn/escheduler/dao/TaskRecordDao.java | 47 ++++++++++++++++++- .../worker/runner/TaskScheduleThread.java | 20 ++++++++ 5 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index b12145973e..e5137ef8fe 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -252,6 +252,11 @@ public final class Constants { */ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; + /** + * date format of yyyyMMdd + */ + public static final String YYYYMMDD = "yyyyMMdd"; + /** * date format of yyyyMMddHHmmss */ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java new file mode 100644 index 0000000000..a0fb8f1dfc --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.enums; + + +/** + * task record status + * + */ +public enum TaskRecordStatus { + + /** + * status: + * 0 sucess + * 1 failure + * 2 exception + */ + SUCCESS,FAILURE,EXCEPTION + + +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java index 64c2e951eb..1d589167e3 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java @@ -30,5 +30,11 @@ public enum TaskType { * 6 PYTHON * 7 DEPENDENT */ - SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT + SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT; + + public static boolean typeIsNormalTask(String typeName) { + TaskType taskType = TaskType.valueOf(typeName); + return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT); + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java index 4378e06410..58d54589f6 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java @@ -17,6 +17,8 @@ package cn.escheduler.dao; import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.TaskRecordStatus; +import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.dao.model.TaskRecord; import org.apache.commons.configuration.Configuration; @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,7 +46,7 @@ public class TaskRecordDao { /** - * 加载配置文件 + * load conf file */ private static Configuration conf; @@ -56,6 +59,14 @@ public class TaskRecordDao { } } + + /** + * get task record flag + * @return + */ + public static boolean getTaskRecordFlag(){ + return conf.getBoolean(Constants.TASK_RECORD_FLAG); + } /** * create connection * @return @@ -253,4 +264,38 @@ public class TaskRecordDao { } return recordList; } + + /** + * according to procname and procdate query task record + * @param procName + * @param procDate + * @return + */ + public static TaskRecordStatus getTaskRecordState(String procName,String procDate){ + String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE='%s'" + ,procName,procDate); + List taskRecordList = getQueryResult(sql); + + // contains no record and sql exception + if (CollectionUtils.isEmpty(taskRecordList)){ + // exception + return TaskRecordStatus.EXCEPTION; + }else if (taskRecordList.size() > 1){ + return TaskRecordStatus.EXCEPTION; + }else { + TaskRecord taskRecord = taskRecordList.get(0); + if (taskRecord == null){ + return TaskRecordStatus.EXCEPTION; + } + Long targetRowCount = taskRecord.getTargetRowCount(); + if (targetRowCount <= 0){ + return TaskRecordStatus.FAILURE; + }else { + return TaskRecordStatus.SUCCESS; + } + + } + } + + } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index f381b9ef66..60edad5606 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -19,15 +19,18 @@ package cn.escheduler.server.worker.runner; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; +import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.utils.CommonUtils; +import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.dao.ProcessDao; +import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; @@ -199,6 +202,23 @@ public class TaskScheduleThread implements Callable { if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ status = ExecutionStatus.SUCCESS; + // task recor flat : if true , start up qianfan + if (TaskRecordDao.getTaskRecordFlag() + && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ + Date scheduleTime = processInstance.getScheduleTime(); + if(scheduleTime == null){ + scheduleTime = processInstance.getStartTime(); + } + + // process exec time : yyyyMMdd format + String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } + } + }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ status = ExecutionStatus.KILL; }else { From b27a080baae1abcbd3574c8264b9f27ef961cadb Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Fri, 21 Jun 2019 11:32:14 +0800 Subject: [PATCH 2/4] qianfan task result add judge update --- .../src/main/java/cn/escheduler/dao/TaskRecordDao.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java index 58d54589f6..a4e317a1dd 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java @@ -272,8 +272,8 @@ public class TaskRecordDao { * @return */ public static TaskRecordStatus getTaskRecordState(String procName,String procDate){ - String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE='%s'" - ,procName,procDate); + String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'" + ,procName,procDate + "%"); List taskRecordList = getQueryResult(sql); // contains no record and sql exception @@ -297,5 +297,4 @@ public class TaskRecordDao { } } - } From 602591aa8c0ded9d002389d4ed73211f602b860c Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Wed, 26 Jun 2019 15:42:30 +0800 Subject: [PATCH 3/4] qianfan task result add judge update --- .../worker/runner/TaskScheduleThread.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 60edad5606..75b1e01a60 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -25,15 +25,14 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; -import cn.escheduler.common.utils.CommonUtils; -import cn.escheduler.common.utils.DateUtils; -import cn.escheduler.common.utils.HadoopUtils; -import cn.escheduler.common.utils.TaskParametersUtils; +import cn.escheduler.common.task.shell.ShellParameters; +import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; +import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; @@ -144,6 +143,7 @@ public class TaskScheduleThread implements Callable { TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); + List projectRes = createProjectResFiles(taskNode); // copy hdfs file to local @@ -205,17 +205,25 @@ public class TaskScheduleThread implements Callable { // task recor flat : if true , start up qianfan if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - Date scheduleTime = processInstance.getScheduleTime(); - if(scheduleTime == null){ - scheduleTime = processInstance.getStartTime(); - } - // process exec time : yyyyMMdd format - String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; + AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } + } } } From 8b88cd0b3d71ee0503f60245f46595eb3eff5220 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Wed, 26 Jun 2019 16:09:03 +0800 Subject: [PATCH 4/4] qianfan task result add judge update --- .../worker/runner/TaskScheduleThread.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 75b1e01a60..c21a3a2f1b 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -25,7 +25,12 @@ import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; +import cn.escheduler.common.task.mr.MapreduceParameters; +import cn.escheduler.common.task.procedure.ProcedureParameters; +import cn.escheduler.common.task.python.PythonParameters; import cn.escheduler.common.task.shell.ShellParameters; +import cn.escheduler.common.task.spark.SparkParameters; +import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; @@ -206,7 +211,7 @@ public class TaskScheduleThread implements Callable { if (TaskRecordDao.getTaskRecordFlag() && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - AbstractParameters params = JSONUtils.parseObject(taskProps.getTaskParams(), AbstractParameters.class); + AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); // replace placeholder Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), @@ -279,6 +284,39 @@ public class TaskScheduleThread implements Callable { } + /** + * get current task parameter class + * @return + */ + private Class getCurTaskParamsClass(){ + Class paramsClass = null; + TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); + switch (taskType){ + case SHELL: + paramsClass = ShellParameters.class; + break; + case SQL: + paramsClass = SqlParameters.class; + break; + case PROCEDURE: + paramsClass = ProcedureParameters.class; + break; + case MR: + paramsClass = MapreduceParameters.class; + break; + case SPARK: + paramsClass = SparkParameters.class; + break; + case PYTHON: + paramsClass = PythonParameters.class; + break; + default: + logger.error("not support this task type: {}", taskType); + throw new IllegalArgumentException("not support this task type"); + } + return paramsClass; + } + /** * kill task */