Browse Source

fix bug#1336 serial complement data can have multiple process instances

pull/3/MERGE
lenboo 4 years ago
parent
commit
39182568b2
  1. 18
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

18
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -252,6 +252,9 @@ public class MasterExecThread implements Runnable {
}
while(Stopper.isRunning()){
logger.info("process {} start to complement {} data",
processInstance.getId(), DateUtils.dateToString(scheduleDate));
// prepare dag and other info
prepareProcess();
@ -266,13 +269,13 @@ public class MasterExecThread implements Runnable {
// execute process ,waiting for end
runProcess();
endProcess();
// process instance failure ,no more complements
if(!processInstance.getState().typeIsSuccess()){
logger.info("process {} state {}, complement not completely!",
processInstance.getId(), processInstance.getState());
break;
}
// current process instance success ,next execute
if(null == iterator){
// loop by day
@ -291,9 +294,7 @@ public class MasterExecThread implements Runnable {
}
scheduleDate = iterator.next();
}
logger.info("process {} start to complement {} data",
processInstance.getId(), DateUtils.dateToString(scheduleDate));
// flow end
// execute next process instance complement data
processInstance.setScheduleTime(scheduleDate);
if(cmdParam.containsKey(Constants.CMDPARAM_RECOVERY_START_NODE_STRING)){
@ -301,22 +302,15 @@ public class MasterExecThread implements Runnable {
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance taskInstance : taskInstanceList){
taskInstance.setFlag(Flag.NO);
processService.updateTaskInstance(taskInstance);
}
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams(
processInstance.getProcessDefinition().getGlobalParamMap(),
processInstance.getProcessDefinition().getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime()));
processInstance.setId(0);
processService.saveProcessInstance(processInstance);
}
// flow end
endProcess();
}

Loading…
Cancel
Save