Browse Source

fix bug#1336 serial complement data can have multiple process instances (#3317)

Co-authored-by: lenboo <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 4 years ago committed by lenboo
parent
commit
1b98db9240
  1. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -249,6 +249,9 @@ public class MasterExecThread implements Runnable {
}
while(Stopper.isRunning()){
logger.info("process {} start to complement {} data",
processInstance.getId(), DateUtils.dateToString(scheduleDate));
// prepare dag and other info
prepareProcess();
@ -263,13 +266,13 @@ public class MasterExecThread implements Runnable {
// execute process ,waiting for end
runProcess();
endProcess();
// process instance failure ,no more complements
if(!processInstance.getState().typeIsSuccess()){
logger.info("process {} state {}, complement not completely!",
processInstance.getId(), processInstance.getState());
break;
}
// current process instance success ,next execute
if(null == iterator){
// loop by day
@ -288,9 +291,7 @@ public class MasterExecThread implements Runnable {
}
scheduleDate = iterator.next();
}
logger.info("process {} start to complement {} data",
processInstance.getId(), DateUtils.dateToString(scheduleDate));
// flow end
// execute next process instance complement data
processInstance.setScheduleTime(scheduleDate);
if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
@ -298,22 +299,15 @@ public class MasterExecThread implements Runnable {
processInstance.setCommandParam(JSONUtils.toJson(cmdParam));
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance taskInstance : taskInstanceList){
taskInstance.setFlag(Flag.NO);
processService.updateTaskInstance(taskInstance);
}
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processInstance.getProcessDefinition().getGlobalParamMap(),
processInstance.getProcessDefinition().getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
processInstance.setId(0);
processService.saveProcessInstance(processInstance);
}
// flow end
endProcess();
}

Loading…
Cancel
Save