diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 177fb8a8c2..8b2743d1c5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -252,6 +252,9 @@ public class MasterExecThread implements Runnable { } while(Stopper.isRunning()){ + + logger.info("process {} start to complement {} data", + processInstance.getId(), DateUtils.dateToString(scheduleDate)); // prepare dag and other info prepareProcess(); @@ -266,13 +269,13 @@ public class MasterExecThread implements Runnable { // execute process ,waiting for end runProcess(); + endProcess(); // process instance failure ,no more complements if(!processInstance.getState().typeIsSuccess()){ logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState()); break; } - // current process instance success ,next execute if(null == iterator){ // loop by day @@ -291,9 +294,7 @@ public class MasterExecThread implements Runnable { } scheduleDate = iterator.next(); } - - logger.info("process {} start to complement {} data", - processInstance.getId(), DateUtils.dateToString(scheduleDate)); + // flow end // execute next process instance complement data processInstance.setScheduleTime(scheduleDate); if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){ @@ -301,22 +302,15 @@ public class MasterExecThread implements Runnable { processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); } - List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for(TaskInstance taskInstance : taskInstanceList){ - taskInstance.setFlag(Flag.NO); - processService.updateTaskInstance(taskInstance); - } processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processInstance.getProcessDefinition().getGlobalParamMap(), processInstance.getProcessDefinition().getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); - + processInstance.setId(0); processService.saveProcessInstance(processInstance); } - // flow end - endProcess(); }