diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index b12145973e..e5137ef8fe 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -252,6 +252,11 @@ public final class Constants { */ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; + /** + * date format of yyyyMMdd + */ + public static final String YYYYMMDD = "yyyyMMdd"; + /** * date format of yyyyMMddHHmmss */ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java new file mode 100644 index 0000000000..a0fb8f1dfc --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.enums; + + +/** + * task record status + * + */ +public enum TaskRecordStatus { + + /** + * status: + * 0 sucess + * 1 failure + * 2 exception + */ + SUCCESS,FAILURE,EXCEPTION + + +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java index 64c2e951eb..1d589167e3 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java @@ -30,5 +30,11 @@ public enum TaskType { * 6 PYTHON * 7 DEPENDENT */ - SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT + SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT; + + public static boolean typeIsNormalTask(String typeName) { + TaskType taskType = TaskType.valueOf(typeName); + return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT); + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java index 4378e06410..58d54589f6 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java @@ -17,6 +17,8 @@ package cn.escheduler.dao; import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.TaskRecordStatus; +import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.dao.model.TaskRecord; import org.apache.commons.configuration.Configuration; @@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -43,7 +46,7 @@ public class TaskRecordDao { /** - * 加载配置文件 + * load conf file */ private static Configuration conf; @@ -56,6 +59,14 @@ public class TaskRecordDao { } } + + /** + * get task record flag + * @return + */ + public static boolean getTaskRecordFlag(){ + return conf.getBoolean(Constants.TASK_RECORD_FLAG); + } /** * create connection * @return @@ -253,4 +264,38 @@ public class TaskRecordDao { } return recordList; } + + /** + * according to procname and procdate query task record + * @param procName + * @param procDate + * @return + */ + public static TaskRecordStatus getTaskRecordState(String procName,String procDate){ + String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE='%s'" + ,procName,procDate); + List taskRecordList = getQueryResult(sql); + + // contains no record and sql exception + if (CollectionUtils.isEmpty(taskRecordList)){ + // exception + return TaskRecordStatus.EXCEPTION; + }else if (taskRecordList.size() > 1){ + return TaskRecordStatus.EXCEPTION; + }else { + TaskRecord taskRecord = taskRecordList.get(0); + if (taskRecord == null){ + return TaskRecordStatus.EXCEPTION; + } + Long targetRowCount = taskRecord.getTargetRowCount(); + if (targetRowCount <= 0){ + return TaskRecordStatus.FAILURE; + }else { + return TaskRecordStatus.SUCCESS; + } + + } + } + + } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index f381b9ef66..60edad5606 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -19,15 +19,18 @@ package cn.escheduler.server.worker.runner; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; +import cn.escheduler.common.enums.TaskRecordStatus; import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.utils.CommonUtils; +import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.dao.ProcessDao; +import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; @@ -199,6 +202,23 @@ public class TaskScheduleThread implements Callable { if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ status = ExecutionStatus.SUCCESS; + // task recor flat : if true , start up qianfan + if (TaskRecordDao.getTaskRecordFlag() + && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ + Date scheduleTime = processInstance.getScheduleTime(); + if(scheduleTime == null){ + scheduleTime = processInstance.getStartTime(); + } + + // process exec time : yyyyMMdd format + String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD); + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } + } + }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ status = ExecutionStatus.KILL; }else {