Browse Source

Merge pull request #488 from qiaozhanwei/branch-1.0.2

qianfan task result add judge update
pull/2/head
乔占卫 6 years ago committed by GitHub
parent
commit
fe077894ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  2. 35
      escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java
  3. 8
      escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java
  4. 46
      escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java
  5. 20
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

5
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -252,6 +252,11 @@ public final class Constants {
*/
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
/**
* date format of yyyyMMdd
*/
public static final String YYYYMMDD = "yyyyMMdd";
/**
* date format of yyyyMMddHHmmss
*/

35
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* task record status
*
*/
public enum TaskRecordStatus {
/**
* status
* 0 sucess
* 1 failure
* 2 exception
*/
SUCCESS,FAILURE,EXCEPTION
}

8
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java

@ -30,5 +30,11 @@ public enum TaskType {
* 6 PYTHON
* 7 DEPENDENT
*/
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT;
public static boolean typeIsNormalTask(String typeName) {
TaskType taskType = TaskType.valueOf(typeName);
return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT);
}
}

46
escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java

@ -17,6 +17,8 @@
package cn.escheduler.dao;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.dao.model.TaskRecord;
import org.apache.commons.configuration.Configuration;
@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -43,7 +46,7 @@ public class TaskRecordDao {
/**
* 加载配置文件
* load conf file
*/
private static Configuration conf;
@ -56,6 +59,14 @@ public class TaskRecordDao {
}
}
/**
* get task record flag
* @return
*/
public static boolean getTaskRecordFlag(){
return conf.getBoolean(Constants.TASK_RECORD_FLAG);
}
/**
* create connection
* @return
@ -253,4 +264,37 @@ public class TaskRecordDao {
}
return recordList;
}
/**
* according to procname and procdate query task record
* @param procName
* @param procDate
* @return
*/
public static TaskRecordStatus getTaskRecordState(String procName,String procDate){
String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'"
,procName,procDate + "%");
List<TaskRecord> taskRecordList = getQueryResult(sql);
// contains no record and sql exception
if (CollectionUtils.isEmpty(taskRecordList)){
// exception
return TaskRecordStatus.EXCEPTION;
}else if (taskRecordList.size() > 1){
return TaskRecordStatus.EXCEPTION;
}else {
TaskRecord taskRecord = taskRecordList.get(0);
if (taskRecord == null){
return TaskRecordStatus.EXCEPTION;
}
Long targetRowCount = taskRecord.getTargetRowCount();
if (targetRowCount <= 0){
return TaskRecordStatus.FAILURE;
}else {
return TaskRecordStatus.SUCCESS;
}
}
}
}

20
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -19,15 +19,18 @@ package cn.escheduler.server.worker.runner;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.TaskTimeoutParameter;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.HadoopUtils;
import cn.escheduler.common.utils.TaskParametersUtils;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.LoggerUtils;
@ -199,6 +202,23 @@ public class TaskScheduleThread implements Callable<Boolean> {
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS;
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){
Date scheduleTime = processInstance.getScheduleTime();
if(scheduleTime == null){
scheduleTime = processInstance.getStartTime();
}
// process exec time : yyyyMMdd format
String scheduleDate = DateUtils.format(scheduleTime, Constants.YYYYMMDD);
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), scheduleDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL;
}else {

Loading…
Cancel
Save