diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 2aa59c7013..17ca727340 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao { // find pause tasks and init task's state cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); + List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), + ExecutionStatus.KILL); + suspendedNodeList.addAll(stopNodeList); for(Integer taskId : suspendedNodeList){ // 把暂停状态初始化 initTaskInstance(this.findTaskInstanceById(taskId)); @@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao { * @param taskInstance */ private void initTaskInstance(TaskInstance taskInstance){ - if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){ - taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); - }else{ - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateTaskInstance(taskInstance); + + if(!taskInstance.isSubProcess()){ + if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){ + taskInstance.setFlag(Flag.NO); + updateTaskInstance(taskInstance); + return; + } } + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + updateTaskInstance(taskInstance); } /** diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java index 1dc3bbc99b..78165e3f9a 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java @@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider { public String queryDetailById(Map parameter) { return new SQL() { { - SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); + SELECT("inst.*,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); - FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q"); + FROM(TABLE_NAME + " inst"); - WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}"); + WHERE("inst.id = #{processId}"); } }.toString(); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index e741966f06..1c328b6cba 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{ // get process define ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), - processDefine.getUserId()); - - if(tenant != null){ - processInstance.setTenantCode(tenant.getTenantCode()); - } taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processDefine); diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index c21a3a2f1b..591712ba7d 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; +import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.dao.model.Tenant; import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; @@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable { // set task params taskProps.setTaskParams(taskNode.getParams()); // set tenant code , execute task linux user - taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setNodeName(taskInstance.getName()); taskProps.setTaskInstId(taskInstance.getId()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); - // set queue - if (StringUtils.isEmpty(queue)){ - taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); - }else { - taskProps.setQueue(queue); - } - taskProps.setTaskStartTime(taskInstance.getStartTime()); - taskProps.setDefinedParams(allParamMap); - - // set task timeout - setTaskTimeout(taskProps, taskNode); - - taskProps.setDependence(taskInstance.getDependency()); - - taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); - - // custom logger - TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); - - task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); - - // job init - task.init(); - - // job handle - task.handle(); - - - logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); - - if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ - status = ExecutionStatus.SUCCESS; - // task recor flat : if true , start up qianfan - if (TaskRecordDao.getTaskRecordFlag() - && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - - AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); - - // replace placeholder - Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), - taskProps.getDefinedParams(), - params.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); - if (paramsMap != null && !paramsMap.isEmpty() - && paramsMap.containsKey("v_proc_date")){ - String vProcDate = paramsMap.get("v_proc_date").getValue(); - if (!StringUtils.isEmpty(vProcDate)){ - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; + + ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); + + Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), + processDefine.getUserId()); + + if(tenant == null){ + processInstance.setTenantCode(tenant.getTenantCode()); + logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}", + processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId() + ); + status = ExecutionStatus.FAILURE; + }else{ + taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); + // set queue + if (StringUtils.isEmpty(queue)){ + taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + }else { + taskProps.setQueue(tenant.getQueueName()); + } + taskProps.setTaskStartTime(taskInstance.getStartTime()); + taskProps.setDefinedParams(allParamMap); + + // set task timeout + setTaskTimeout(taskProps, taskNode); + + taskProps.setDependence(taskInstance.getDependency()); + + taskProps.setTaskAppId(String.format("%s_%s_%s", + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId())); + + // custom logger + TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId())); + + task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); + + // job init + task.init(); + + // job handle + task.handle(); + logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); + + if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + status = ExecutionStatus.SUCCESS; + // task recor flat : if true , start up qianfan + if (TaskRecordDao.getTaskRecordFlag() + && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ + + AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } } } } - } - }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ - status = ExecutionStatus.KILL; - }else { - status = ExecutionStatus.FAILURE; + }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ + status = ExecutionStatus.KILL; + }else { + status = ExecutionStatus.FAILURE; + } } }catch (Exception e){ logger.error("task escheduler failure : " + e.getMessage(),e);