/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.ProcessDao; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.server.utils.AlertManager; import com.alibaba.fastjson.JSONObject; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import static org.apache.dolphinscheduler.common.Constants.*; /** * master exec thread,split dag */ public class MasterExecThread implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MasterExecThread.class); /** * process instance */ private ProcessInstance processInstance; /** * runing TaskNode */ private final Map> activeTaskNode = new ConcurrentHashMap>(); private final ExecutorService taskExecService; /** * submit failure nodes */ private Boolean taskFailedSubmit = false; private List recoverNodeIdList = new ArrayList<>(); private Map errorTaskList = new ConcurrentHashMap<>(); private Map completeTaskList = new ConcurrentHashMap<>(); private Map readyToSubmitTaskList = new ConcurrentHashMap<>(); private Map dependFailedTask = new ConcurrentHashMap<>(); private Map forbiddenTaskList = new ConcurrentHashMap<>(); private List recoverToleranceFaultTaskList = new ArrayList<>(); private AlertManager alertManager = new AlertManager(); private DAG dag; /** * process dao */ private ProcessDao processDao; /** * load configuration file */ private static Configuration conf; public MasterExecThread(ProcessInstance processInstance,ProcessDao processDao){ this.processDao = processDao; this.processInstance = processInstance; int masterTaskExecNum = conf.getInt(Constants.MASTER_EXEC_TASK_THREADS, Constants.defaultMasterTaskExecNum); this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread", masterTaskExecNum); } static { try { conf = new PropertiesConfiguration(Constants.MASTER_PROPERTIES_PATH); }catch (ConfigurationException e){ logger.error("load configuration failed : " + e.getMessage(),e); System.exit(1); } } @Override public void run() { // process instance is null if (processInstance == null){ logger.info("process instance is not exists"); return; } // check to see if it's done if (processInstance.getState().typeIsFinished()){ logger.info("process instance is done : {}",processInstance.getId()); return; } try { if (processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess()){ // sub process complement data executeComplementProcess(); }else{ // execute flow executeProcess(); } }catch (Exception e){ logger.error("master exec thread exception: " + e.getMessage(), e); logger.error("process execute failed, process id:{}", processInstance.getId()); processInstance.setState(ExecutionStatus.FAILURE); processInstance.setEndTime(new Date()); processDao.updateProcessInstance(processInstance); }finally { taskExecService.shutdown(); // post handle postHandle(); } } private void executeProcess() throws Exception { prepareProcess(); runProcess(); endProcess(); } /** * execute complement process * @throws Exception */ private void executeComplementProcess() throws Exception { Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Date startDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE)); Date endDate = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); processDao.saveProcessInstance(processInstance); Date scheduleDate = processInstance.getScheduleTime(); if(scheduleDate == null){ scheduleDate = startDate; } while(Stopper.isRunning()){ // prepare dag and other info prepareProcess(); if(dag == null){ logger.error("process {} dag is null, please check out parameters", processInstance.getId()); processInstance.setState(ExecutionStatus.SUCCESS); processDao.updateProcessInstance(processInstance); return; } // execute process ,waiting for end runProcess(); // process instace failure ,no more complements if(!processInstance.getState().typeIsSuccess()){ logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState()); break; } // current process instance sucess ,next execute scheduleDate = DateUtils.getSomeDay(scheduleDate, 1); if(scheduleDate.after(endDate)){ // all success logger.info("process {} complement completely!", processInstance.getId()); break; } logger.info("process {} start to complement {} data", processInstance.getId(), DateUtils.dateToString(scheduleDate)); // execute next process instance complement data processInstance.setScheduleTime(scheduleDate); if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){ cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); processInstance.setCommandParam(JSONUtils.toJson(cmdParam)); } List taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId()); for(TaskInstance taskInstance : taskInstanceList){ taskInstance.setFlag(Flag.NO); processDao.updateTaskInstance(taskInstance); } processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processInstance.getProcessDefinition().getGlobalParamMap(), processInstance.getProcessDefinition().getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); processDao.saveProcessInstance(processInstance); } // flow end endProcess(); } /** * prepare process parameter * @throws Exception */ private void prepareProcess() throws Exception { // init task queue initTaskQueue(); // gen process dag buildFlowDag(); logger.info("prepare process :{} end", processInstance.getId()); } /** * process end handle */ private void endProcess() { processInstance.setEndTime(new Date()); processDao.updateProcessInstance(processInstance); if(processInstance.getState().typeIsWaittingThread()){ processDao.createRecoveryWaitingThreadCommand(null, processInstance); } List taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId()); alertManager.sendAlertProcessInstance(processInstance, taskInstances); } /** * generate process dag * @throws Exception */ private void buildFlowDag() throws Exception { recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam()); forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson()); // generate process to get DAG info List recoveryNameList = getRecoveryNodeNameList(); List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); ProcessDag processDag = generateFlowDag(processInstance.getProcessInstanceJson(), startNodeNameList, recoveryNameList, processInstance.getTaskDependType()); if(processDag == null){ //TODO... logger.error("processDag is null"); return; } // generate process dag dag = DagHelper.buildDagGraph(processDag); } private void initTaskQueue(){ taskFailedSubmit = false; activeTaskNode.clear(); dependFailedTask.clear(); completeTaskList.clear(); errorTaskList.clear(); List taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId()); for(TaskInstance task : taskInstanceList){ if(task.isTaskComplete()){ completeTaskList.put(task.getName(), task); } if(task.getState().typeIsFailure() && !task.taskCanRetry()){ errorTaskList.put(task.getName(), task); } } } /** * process post handle */ private void postHandle() { logger.info("develop mode is: {}", CommonUtils.isDevelopMode()); if (!CommonUtils.isDevelopMode()) { // get exec dir String execLocalPath = org.apache.dolphinscheduler.common.utils.FileUtils .getProcessExecDir(processInstance.getProcessDefinition().getProjectId(), processInstance.getProcessDefinitionId(), processInstance.getId()); try { FileUtils.deleteDirectory(new File(execLocalPath)); } catch (IOException e) { logger.error("delete exec dir failed : " + e.getMessage(), e); } } } /** * submit task to execute * @param taskInstance */ private TaskInstance submitTaskExec(TaskInstance taskInstance) { MasterBaseTaskExecThread abstractExecThread = null; if(taskInstance.isSubProcess()){ abstractExecThread = new SubProcessTaskExecThread(taskInstance, processInstance); }else { abstractExecThread = new MasterTaskExecThread(taskInstance, processInstance); } Future future = taskExecService.submit(abstractExecThread); activeTaskNode.putIfAbsent(abstractExecThread, future); return abstractExecThread.getTaskInstance(); } /** * find task instance in db. * in case submit more than one same name task in the same time. * @param taskName * @return */ private TaskInstance findTaskIfExists(String taskName){ List taskInstanceList = processDao.findValidTaskListByProcessId(this.processInstance.getId()); for(TaskInstance taskInstance : taskInstanceList){ if(taskInstance.getName().equals(taskName)){ return taskInstance; } } return null; } /** * encapsulation task * @param processInstance * @param nodeName * @return */ private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName, TaskNode taskNode, String parentNodeName) { TaskInstance taskInstance = findTaskIfExists(nodeName); if(taskInstance == null){ taskInstance = new TaskInstance(); // task name taskInstance.setName(nodeName); // process instance define id taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); // task instance state taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); // process instance id taskInstance.setProcessInstanceId(processInstance.getId()); // task instance node json taskInstance.setTaskJson(JSONObject.toJSONString(taskNode)); // task instance type taskInstance.setTaskType(taskNode.getType()); // task instance whether alert taskInstance.setAlertFlag(Flag.NO); // task instance start time taskInstance.setStartTime(new Date()); // task instance flag taskInstance.setFlag(Flag.YES); // task instance retry times taskInstance.setRetryTimes(0); // max task instance retry times taskInstance.setMaxRetryTimes(taskNode.getMaxRetryTimes()); // retry task instance interval taskInstance.setRetryInterval(taskNode.getRetryInterval()); // task instance priority if(taskNode.getTaskInstancePriority() == null){ taskInstance.setTaskInstancePriority(Priority.MEDIUM); }else{ taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); } int workerGroupId = taskNode.getWorkerGroupId(); taskInstance.setWorkerGroupId(workerGroupId); } return taskInstance; } /** * get post task instance by node * * @param dag * @param parentNodeName * @return */ private List getPostTaskInstanceByNode(DAG dag, String parentNodeName){ List postTaskList = new ArrayList<>(); Collection startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList); if(startVertex == null){ return postTaskList; } for (String nodeName : startVertex){ // encapsulation task instance TaskInstance taskInstance = createTaskInstance(processInstance, nodeName , dag.getNode(nodeName),parentNodeName); postTaskList.add(taskInstance); } return postTaskList; } /** * * return start task node list * * @return */ private List getStartSubmitTaskList(){ List startTaskList = getPostTaskInstanceByNode(dag, null); HashMap successTaskMaps = new HashMap<>(); List resultList = new ArrayList<>(); while(Stopper.isRunning()){ for(TaskInstance task : startTaskList){ if(task.getState().typeIsSuccess()){ successTaskMaps.put(task.getName(), task); }else if(!completeTaskList.containsKey(task.getName()) && !errorTaskList.containsKey(task.getName())){ resultList.add(task); } } startTaskList.clear(); if(successTaskMaps.size() == 0){ break; } Set taskNameKeys = successTaskMaps.keySet(); for(String taskName : taskNameKeys){ startTaskList.addAll(getPostTaskInstanceByNode(dag, taskName)); } successTaskMaps.clear(); } return resultList; } /** * submit post node * @param parentNodeName */ private void submitPostNode(String parentNodeName){ List submitTaskList = null; if(parentNodeName == null){ submitTaskList = getStartSubmitTaskList(); }else{ submitTaskList = getPostTaskInstanceByNode(dag, parentNodeName); } // if previous node success , post node submit for(TaskInstance task : submitTaskList){ if(readyToSubmitTaskList.containsKey(task.getName())){ continue; } if(completeTaskList.containsKey(task.getName())){ logger.info("task {} has already run success", task.getName()); continue; } if(task.getState().typeIsPause() || task.getState().typeIsCancel()){ logger.info("task {} stopped, the state is {}", task.getName(), task.getState().toString()); }else{ addTaskToStandByList(task); } } } /** * determine whether the dependencies of the task node are complete * @return */ private DependResult isTaskDepsComplete(String taskName) { Collection startNodes = dag.getBeginNode(); // ff the vertex returns true directly if(startNodes.contains(taskName)){ return DependResult.SUCCESS; } TaskNode taskNode = dag.getNode(taskName); List depsNameList = taskNode.getDepList(); for(String depsNode : depsNameList ){ if(forbiddenTaskList.containsKey(depsNode)){ continue; } // dependencies must be fully completed if(!completeTaskList.containsKey(depsNode)){ return DependResult.WAITING; } ExecutionStatus taskState = completeTaskList.get(depsNode).getState(); if(taskState.typeIsFailure()){ return DependResult.FAILED; } if(taskState.typeIsPause() || taskState.typeIsCancel()){ return DependResult.WAITING; } } logger.info("taskName: {} completeDependTaskList: {}", taskName, Arrays.toString(completeTaskList.keySet().toArray())); return DependResult.SUCCESS; } /** * query task instance by complete state * @param state * @return */ private List getCompleteTaskByState(ExecutionStatus state){ List resultList = new ArrayList<>(); Set taskList = completeTaskList.keySet(); for(String taskName : taskList){ TaskInstance taskInstance = completeTaskList.get(taskName); if(taskInstance.getState() == state){ resultList.add(taskInstance); } } return resultList; } /** * where there are ongoing tasks * @param state * @return */ private ExecutionStatus runningState(ExecutionStatus state){ if(state == ExecutionStatus.READY_STOP || state == ExecutionStatus.READY_PAUSE || state == ExecutionStatus.WAITTING_THREAD){ // if the running task is not completed, the state remains unchanged return state; }else{ return ExecutionStatus.RUNNING_EXEUTION; } } /** * exists failure task , contains submit failure、dependency failure,execute failure(retry after) * * @return */ private Boolean hasFailedTask(){ if(this.taskFailedSubmit){ return true; } if(this.errorTaskList.size() > 0){ return true; } return this.dependFailedTask.size() > 0; } /** * process instance failure * * @return */ private Boolean processFailed(){ if(hasFailedTask()) { if(processInstance.getFailureStrategy() == FailureStrategy.END){ return true; } if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0; } } return false; } /** * whether task for waiting thread * @return */ private Boolean hasWaitingThreadTask(){ List waitingList = getCompleteTaskByState(ExecutionStatus.WAITTING_THREAD); return waitingList.size() > 0; } /** * prepare for pause * 1,failed retry task in the preparation queue , returns to failure directly * 2,exists pause task,complement not completed, pending submission of tasks, return to suspension * 3,success * @return */ private ExecutionStatus processReadyPause(){ if(hasRetryTaskInStandBy()){ return ExecutionStatus.FAILURE; } List pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE); if(pauseList.size() > 0 || !isComplementEnd() || readyToSubmitTaskList.size() > 0){ return ExecutionStatus.PAUSE; }else{ return ExecutionStatus.SUCCESS; } } /** * generate the latest process instance status by the tasks state * @return */ private ExecutionStatus getProcessInstanceState(){ ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId()); ExecutionStatus state = instance.getState(); if(activeTaskNode.size() > 0){ return runningState(state); } // process failure if(processFailed()){ return ExecutionStatus.FAILURE; } // waiting thread if(hasWaitingThreadTask()){ return ExecutionStatus.WAITTING_THREAD; } // pause if(state == ExecutionStatus.READY_PAUSE){ return processReadyPause(); } // stop if(state == ExecutionStatus.READY_STOP){ List stopList = getCompleteTaskByState(ExecutionStatus.STOP); List killList = getCompleteTaskByState(ExecutionStatus.KILL); if(stopList.size() > 0 || killList.size() > 0 || !isComplementEnd()){ return ExecutionStatus.STOP; }else{ return ExecutionStatus.SUCCESS; } } // success if(state == ExecutionStatus.RUNNING_EXEUTION){ if(readyToSubmitTaskList.size() > 0){ //tasks currently pending submission, no retries, indicating that depend is waiting to complete return ExecutionStatus.RUNNING_EXEUTION; }else{ // if the waiting queue is empty and the status is in progress, then success return ExecutionStatus.SUCCESS; } } return state; } /** * whether complement end * @return */ private Boolean isComplementEnd() { if(!processInstance.isComplementData()){ return true; } try { Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Date endTime = DateUtils.getScheduleDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE)); return processInstance.getScheduleTime().equals(endTime); } catch (Exception e) { logger.error("complement end failed : " + e.getMessage(),e); return false; } } /** * updateProcessInstance process instance state * after each batch of tasks is executed, the status of the process instance is updated */ private void updateProcessInstanceState() { ExecutionStatus state = getProcessInstanceState(); if(processInstance.getState() != state){ logger.info( "work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}", processInstance.getId(), processInstance.getName(), processInstance.getState().toString(), state.toString(), processInstance.getCommandType().toString()); processInstance.setState(state); ProcessInstance instance = processDao.findProcessInstanceById(processInstance.getId()); instance.setState(state); instance.setProcessDefinition(processInstance.getProcessDefinition()); processDao.updateProcessInstance(instance); processInstance = instance; } } /** * get task dependency result * @param taskInstance * @return */ private DependResult getDependResultForTask(TaskInstance taskInstance){ DependResult inner = isTaskDepsComplete(taskInstance.getName()); return inner; } /** * add task to standy list * @param taskInstance */ private void addTaskToStandByList(TaskInstance taskInstance){ logger.info("add task to stand by list: {}", taskInstance.getName()); readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance); } /** * remove task from stand by list * @param taskInstance */ private void removeTaskFromStandbyList(TaskInstance taskInstance){ logger.info("remove task from stand by list: {}", taskInstance.getName()); readyToSubmitTaskList.remove(taskInstance.getName()); } /** * has retry task in standby * @return */ private Boolean hasRetryTaskInStandBy(){ Set taskNameSet = this.readyToSubmitTaskList.keySet(); for(String taskName : taskNameSet){ TaskInstance task = this.readyToSubmitTaskList.get(taskName); if(task.getState().typeIsFailure()){ return true; } } return false; } /** * submit and watch the tasks, until the work flow stop */ private void runProcess(){ // submit start node submitPostNode(null); boolean sendTimeWarning = false; while(!processInstance.IsProcessInstanceStop()){ // send warning email if process time out. if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){ alertManager.sendProcessTimeoutAlert(processInstance, processDao.findProcessDefineById(processInstance.getProcessDefinitionId())); sendTimeWarning = true; } Set keys = activeTaskNode.keySet(); for (MasterBaseTaskExecThread taskExecThread : keys) { Future future = activeTaskNode.get(taskExecThread); TaskInstance task = taskExecThread.getTaskInstance(); if(!future.isDone()){ continue; } // node monitor thread complete activeTaskNode.remove(taskExecThread); if(task == null){ this.taskFailedSubmit = true; continue; } logger.info("task :{}, id:{} complete, state is {} ", task.getName(), task.getId(), task.getState().toString()); // node success , post node submit if(task.getState() == ExecutionStatus.SUCCESS){ completeTaskList.put(task.getName(), task); submitPostNode(task.getName()); continue; } // node fails, retry first, and then execute the failure process if(task.getState().typeIsFailure()){ if(task.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE){ this.recoverToleranceFaultTaskList.add(task); } if(task.taskCanRetry()){ addTaskToStandByList(task); }else{ // node failure, based on failure strategy errorTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task); if(processInstance.getFailureStrategy() == FailureStrategy.END){ kill(); } } continue; } // other status stop/pause completeTaskList.put(task.getName(), task); } // send alert if(this.recoverToleranceFaultTaskList.size() > 0){ alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList); this.recoverToleranceFaultTaskList.clear(); } // updateProcessInstance completed task status // failure priority is higher than pause // if a task fails, other suspended tasks need to be reset kill if(errorTaskList.size() > 0){ for(String taskName : completeTaskList.keySet()){ TaskInstance completeTask = completeTaskList.get(taskName); if(completeTask.getState()== ExecutionStatus.PAUSE){ completeTask.setState(ExecutionStatus.KILL); completeTaskList.put(taskName, completeTask); processDao.updateTaskInstance(completeTask); } } } if(canSubmitTaskToQueue()){ submitStandByTask(); } try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } catch (InterruptedException e) { logger.error(e.getMessage(),e); } updateProcessInstanceState(); } logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState()); } /** * check process time out * @param processInstance * @return */ private boolean checkProcessTimeOut(ProcessInstance processInstance) { if(processInstance.getTimeout() == 0 ){ return false; } Date now = new Date(); long runningTime = DateUtils.diffMin(now, processInstance.getStartTime()); if(runningTime > processInstance.getTimeout()){ return true; } return false; } private boolean canSubmitTaskToQueue() { return OSUtils.checkResource(conf, true); } /** * close the ongoing tasks */ private void kill() { logger.info("kill called on process instance id: {}, num: {}", processInstance.getId(), activeTaskNode.size()); for (Map.Entry> entry : activeTaskNode.entrySet()) { MasterBaseTaskExecThread taskExecThread = entry.getKey(); Future future = entry.getValue(); if (!future.isDone()) { // record kill info logger.info("kill process instance, id: {}, task: {}", processInstance.getId(), taskExecThread.getTaskInstance().getId()); // kill node taskExecThread.kill(); } } } /** * whether the retry interval is timed out * @param taskInstance * @return */ private Boolean retryTaskIntervalOverTime(TaskInstance taskInstance){ if(taskInstance.getState() != ExecutionStatus.FAILURE){ return Boolean.TRUE; } if(taskInstance.getId() == 0 || taskInstance.getMaxRetryTimes() ==0 || taskInstance.getRetryInterval() == 0 ){ return Boolean.TRUE; } Date now = new Date(); long failedTimeInterval = DateUtils.differSec(now, taskInstance.getEndTime()); // task retry does not over time, return false if(taskInstance.getRetryInterval() * SEC_2_MINUTES_TIME_UNIT >= failedTimeInterval){ return Boolean.FALSE; } return Boolean.TRUE; } /** * handling the list of tasks to be submitted */ private void submitStandByTask(){ Set readySubmitTaskNames = readyToSubmitTaskList.keySet(); for(String readySubmitTaskName : readySubmitTaskNames){ TaskInstance task = readyToSubmitTaskList.get(readySubmitTaskName); DependResult dependResult = getDependResultForTask(task); if(DependResult.SUCCESS == dependResult){ if(retryTaskIntervalOverTime(task)){ submitTaskExec(task); removeTaskFromStandbyList(task); } }else if(DependResult.FAILED == dependResult){ // if the dependency fails, the current node is not submitted and the state changes to failure. dependFailedTask.put(readySubmitTaskName, task); removeTaskFromStandbyList(task); logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult); } } } private TaskInstance getRecoveryTaskInstance(String taskId){ if(!StringUtils.isNotEmpty(taskId)){ return null; } try { Integer intId = Integer.valueOf(taskId); TaskInstance task = processDao.findTaskInstanceById(intId); if(task == null){ logger.error("start node id cannot be found: {}", taskId); }else { return task; } }catch (Exception e){ logger.error("get recovery task instance failed : " + e.getMessage(),e); } return null; } /** * get start task instance list * @param cmdParam * @return */ private List getStartTaskInstanceList( String cmdParam){ List instanceList = new ArrayList<>(); Map paramMap = JSONUtils.toMap(cmdParam); if(paramMap != null && paramMap.containsKey(CMDPARAM_RECOVERY_START_NODE_STRING)){ String[] idList = paramMap.get(CMDPARAM_RECOVERY_START_NODE_STRING).split(Constants.COMMA); for(String nodeId : idList){ TaskInstance task = getRecoveryTaskInstance(nodeId); if(task != null){ instanceList.add(task); } } } return instanceList; } /** * parse "StartNodeNameList" from cmd param * @param cmdParam * @return */ private List parseStartNodeName(String cmdParam){ List startNodeNameList = new ArrayList<>(); Map paramMap = JSONUtils.toMap(cmdParam); if(paramMap == null){ return startNodeNameList; } if(paramMap.containsKey(CMDPARAM_START_NODE_NAMES)){ startNodeNameList = Arrays.asList(paramMap.get(CMDPARAM_START_NODE_NAMES).split(Constants.COMMA)); } return startNodeNameList; } /** * generate start node name list from parsing command param; * if "StartNodeIdList" exists in command param, return StartNodeIdList * @return */ private List getRecoveryNodeNameList(){ List recoveryNodeNameList = new ArrayList<>(); if(recoverNodeIdList.size() > 0) { for (TaskInstance task : recoverNodeIdList) { recoveryNodeNameList.add(task.getName()); } } return recoveryNodeNameList; } /** * generate flow dag * @param processDefinitionJson * @return * @throws Exception */ public ProcessDag generateFlowDag(String processDefinitionJson, List startNodeNameList, List recoveryNodeNameList, TaskDependType depNodeType)throws Exception{ return DagHelper.generateFlowDag(processDefinitionJson, startNodeNameList, recoveryNodeNameList, depNodeType); } }