Browse Source

[Fix-8367][Master] execute COMPLEMENT_DATA command always missing some ack event (#8407)

* fix bug_8367

* fix bug_8367

* fix bug_8367

* fix bug_8367
2.0.7-release
xiangzihao 3 years ago committed by GitHub
parent
commit
9bfff90c66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

48
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -480,28 +481,43 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.getScheduleTime(), processInstance.getScheduleTime(),
complementListDate.toString()); complementListDate.toString());
scheduleDate = complementListDate.get(index + 1); scheduleDate = complementListDate.get(index + 1);
}
//the next process complement //the next process complement
processInstance.setId(0); int create = this.createComplementDataCommand(scheduleDate);
if (create > 0) {
logger.info("create complement data command successfully. process id: {}", processInstance.getId());
}
return true;
} }
processInstance.setScheduleTime(scheduleDate);
private int createComplementDataCommand(Date scheduleDate) {
Command command = new Command();
command.setScheduleTime(scheduleDate);
command.setCommandType(CommandType.COMPLEMENT_DATA);
command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode());
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) { if (cmdParam.containsKey(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING)) {
cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING); cmdParam.remove(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING);
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
} }
cmdParam.replace(CMDPARAM_COMPLEMENT_DATA_START_DATE, DateUtils.format(scheduleDate, "yyyy-MM-dd HH:mm:ss"));
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); command.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setGlobalParams(ParameterUtils.curingGlobalParams( command.setTaskDependType(processInstance.getTaskDependType());
processDefinition.getGlobalParamMap(), command.setFailureStrategy(processInstance.getFailureStrategy());
processDefinition.getGlobalParamList(), command.setWarningType(processInstance.getWarningType());
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime())); command.setWarningGroupId(processInstance.getWarningGroupId());
processInstance.setStartTime(new Date()); command.setStartTime(new Date());
processInstance.setRestartTime(processInstance.getStartTime()); command.setExecutorId(processInstance.getExecutorId());
processInstance.setEndTime(null); command.setUpdateTime(new Date());
processService.saveProcessInstance(processInstance); command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
this.taskInstanceHashMap.clear(); command.setWorkerGroup(processInstance.getWorkerGroup());
startProcess(); command.setEnvironmentCode(processInstance.getEnvironmentCode());
return true; command.setDryRun(processInstance.getDryRun());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
return processService.createCommand(command);
} }
private boolean needComplementProcess() { private boolean needComplementProcess() {

Loading…
Cancel
Save