From 1b98db9240a4dfb2aa2e1b165bdf436488551487 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Tue, 28 Jul 2020 11:12:51 +0800 Subject: [PATCH] fix bug#1336 serial complement data can have multiple process instances (#3317) Co-authored-by: lenboo --- .../server/master/runner/MasterExecThread.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java index 39a5e1e032..b801ab3bad 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java @@ -249,6 +249,9 @@ public class MasterExecThread implements Runnable { } while(Stopper.isRunning()){ + + logger.info("process {} start to complement {} data", + processInstance.getId(), DateUtils.dateToString(scheduleDate)); // prepare dag and other info prepareProcess(); @@ -263,13 +266,13 @@ public class MasterExecThread implements Runnable { // execute process ,waiting for end runProcess(); + endProcess(); // process instance failure ,no more complements if(!processInstance.getState().typeIsSuccess()){ logger.info("process {} state {}, complement not completely!", processInstance.getId(), processInstance.getState()); break; } - // current process instance success ,next execute if(null == iterator){ // loop by day @@ -288,9 +291,7 @@ public class MasterExecThread implements Runnable { } scheduleDate = iterator.next(); } - - logger.info("process {} start to complement {} data", - processInstance.getId(), DateUtils.dateToString(scheduleDate)); + // flow end // execute next process instance complement data processInstance.setScheduleTime(scheduleDate); if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){ @@ -298,22 +299,15 @@ public class MasterExecThread implements Runnable { processInstance.setCommandParam(JSONUtils.toJson(cmdParam)); } - List taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for(TaskInstance taskInstance : taskInstanceList){ - taskInstance.setFlag(Flag.NO); - processService.updateTaskInstance(taskInstance); - } processInstance.setState(ExecutionStatus.RUNNING_EXEUTION); processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( processInstance.getProcessDefinition().getGlobalParamMap(), processInstance.getProcessDefinition().getGlobalParamList(), CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); - + processInstance.setId(0); processService.saveProcessInstance(processInstance); } - // flow end - endProcess(); }