Browse Source

Merge remote-tracking branch 'remotes/upstream/dev-1.1.0' into dev-1.1.0

pull/2/head
qiaozhanwei 5 years ago
parent
commit
30dbbad637
  1. 18
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  2. 6
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  3. 6
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  4. 144
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

18
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao {
// find pause tasks and init task's state
cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for(Integer taskId : suspendedNodeList){
// 把暂停状态初始化
initTaskInstance(this.findTaskInstanceById(taskId));
@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao {
* @param taskInstance
*/
private void initTaskInstance(TaskInstance taskInstance){
if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
}else{
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
if(!taskInstance.isSubProcess()){
if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
}
}
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
}
/**

6
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider {
public String queryDetailById(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
SELECT("inst.*,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q");
FROM(TABLE_NAME + " inst");
WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}");
WHERE("inst.id = #{processId}");
}
}.toString();
}

6
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant != null){
processInstance.setTenantCode(tenant.getTenantCode());
}
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);

144
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.utils.*;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogger;
@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable<Boolean> {
// set task params
taskProps.setTaskParams(taskNode.getParams());
// set tenant code , execute task linux user
taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
taskProps.setScheduleTime(processInstance.getScheduleTime());
taskProps.setNodeName(taskInstance.getName());
taskProps.setTaskInstId(taskInstance.getId());
taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
// set queue
if (StringUtils.isEmpty(queue)){
taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else {
taskProps.setQueue(queue);
}
taskProps.setTaskStartTime(taskInstance.getStartTime());
taskProps.setDefinedParams(allParamMap);
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setDependence(taskInstance.getDependency());
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
// custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
// job init
task.init();
// job handle
task.handle();
logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS;
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant == null){
processInstance.setTenantCode(tenant.getTenantCode());
logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}",
processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId()
);
status = ExecutionStatus.FAILURE;
}else{
taskProps.setTenantCode(tenant.getTenantCode());
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
// set queue
if (StringUtils.isEmpty(queue)){
taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else {
taskProps.setQueue(tenant.getQueueName());
}
taskProps.setTaskStartTime(taskInstance.getStartTime());
taskProps.setDefinedParams(allParamMap);
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setDependence(taskInstance.getDependency());
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
// custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
// job init
task.init();
// job handle
task.handle();
logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS;
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
}
}
}
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL;
}else {
status = ExecutionStatus.FAILURE;
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL;
}else {
status = ExecutionStatus.FAILURE;
}
}
}catch (Exception e){
logger.error("task escheduler failure : " + e.getMessage(),e);

Loading…
Cancel
Save