Browse Source

Merge remote-tracking branch 'upstream/branch-1.0.2' into dev110_up

pull/2/head
lenboo 6 years ago
parent
commit
85caad3c72
  1. 5
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  2. 35
      escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java
  3. 8
      escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java
  4. 46
      escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java
  5. 72
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

5
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -278,6 +278,11 @@ public final class Constants {
*/
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
/**
* date format of yyyyMMdd
*/
public static final String YYYYMMDD = "yyyyMMdd";
/**
* date format of yyyyMMddHHmmss
*/

35
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* task record status
*
*/
public enum TaskRecordStatus {
/**
* status
* 0 sucess
* 1 failure
* 2 exception
*/
SUCCESS,FAILURE,EXCEPTION
}

8
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java

@ -30,5 +30,11 @@ public enum TaskType {
* 6 PYTHON
* 7 DEPENDENT
*/
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT;
public static boolean typeIsNormalTask(String typeName) {
TaskType taskType = TaskType.valueOf(typeName);
return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT);
}
}

46
escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java

@ -17,6 +17,8 @@
package cn.escheduler.dao;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.dao.model.TaskRecord;
import org.apache.commons.configuration.Configuration;
@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -43,7 +46,7 @@ public class TaskRecordDao {
/**
* 加载配置文件
* load conf file
*/
private static Configuration conf;
@ -56,6 +59,14 @@ public class TaskRecordDao {
}
}
/**
* get task record flag
* @return
*/
public static boolean getTaskRecordFlag(){
return conf.getBoolean(Constants.TASK_RECORD_FLAG);
}
/**
* create connection
* @return
@ -253,4 +264,37 @@ public class TaskRecordDao {
}
return recordList;
}
/**
* according to procname and procdate query task record
* @param procName
* @param procDate
* @return
*/
public static TaskRecordStatus getTaskRecordState(String procName,String procDate){
String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'"
,procName,procDate + "%");
List<TaskRecord> taskRecordList = getQueryResult(sql);
// contains no record and sql exception
if (CollectionUtils.isEmpty(taskRecordList)){
// exception
return TaskRecordStatus.EXCEPTION;
}else if (taskRecordList.size() > 1){
return TaskRecordStatus.EXCEPTION;
}else {
TaskRecord taskRecord = taskRecordList.get(0);
if (taskRecord == null){
return TaskRecordStatus.EXCEPTION;
}
Long targetRowCount = taskRecord.getTargetRowCount();
if (targetRowCount <= 0){
return TaskRecordStatus.FAILURE;
}else {
return TaskRecordStatus.SUCCESS;
}
}
}
}

72
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -19,18 +19,25 @@ package cn.escheduler.server.worker.runner;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.TaskTimeoutParameter;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.HadoopUtils;
import cn.escheduler.common.utils.TaskParametersUtils;
import cn.escheduler.common.task.mr.MapreduceParameters;
import cn.escheduler.common.task.procedure.ProcedureParameters;
import cn.escheduler.common.task.python.PythonParameters;
import cn.escheduler.common.task.shell.ShellParameters;
import cn.escheduler.common.task.spark.SparkParameters;
import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.utils.*;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogger;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager;
@ -141,6 +148,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
List<String> projectRes = createProjectResFiles(taskNode);
// copy hdfs file to local
@ -199,6 +207,31 @@ public class TaskScheduleThread implements Callable<Boolean> {
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS;
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
}
}
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL;
}else {
@ -251,6 +284,39 @@ public class TaskScheduleThread implements Callable<Boolean> {
}
/**
* get current task parameter class
* @return
*/
private Class getCurTaskParamsClass(){
Class paramsClass = null;
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
switch (taskType){
case SHELL:
paramsClass = ShellParameters.class;
break;
case SQL:
paramsClass = SqlParameters.class;
break;
case PROCEDURE:
paramsClass = ProcedureParameters.class;
break;
case MR:
paramsClass = MapreduceParameters.class;
break;
case SPARK:
paramsClass = SparkParameters.class;
break;
case PYTHON:
paramsClass = PythonParameters.class;
break;
default:
logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type");
}
return paramsClass;
}
/**
* kill task
*/

Loading…
Cancel
Save