Browse Source

[Improvement][Master] Construct processInstance may NPE when master handling command (#12056)

* [Improvement][Master] Construct processInstance may NPE when master handling command

* use an enpty map

Co-authored-by: xuhaihui <xuhaihui@cmss.chinamobile.com>
3.2.0-release
xuhhui 2 years ago committed by GitHub
parent
commit
6466cc7c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

49
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -169,6 +169,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/**
@ -938,6 +939,9 @@ public class ProcessServiceImpl implements ProcessService {
throw new IllegalArgumentException("Cannot find the process definition for this workflowInstance");
}
Map<String, String> cmdParam = JSONUtils.toMap(command.getCommandParam());
if(cmdParam == null){
cmdParam = new HashMap<>();
}
int processInstanceId = command.getProcessInstanceId();
if (processInstanceId == 0) {
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
@ -947,36 +951,37 @@ public class ProcessServiceImpl implements ProcessService {
return null;
}
}
if (cmdParam != null) {
CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
// reset global params while repeat running is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun.
String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition);
CommandType commandTypeIfComplement = getCommandTypeIfComplement(processInstance, command);
// reset global params while repeat running is needed by cmdParam
if (commandTypeIfComplement == CommandType.REPEAT_RUNNING) {
setGlobalParamIfCommanded(processDefinition, cmdParam);
}
// time zone
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
// Recalculate global parameters after rerun.
String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition);
// reset command parameter
if (processInstance.getCommandParam() != null) {
Map<String, String> processCmdParam = JSONUtils.toMap(processInstance.getCommandParam());
Map<String, String> finalCmdParam = cmdParam;
processCmdParam.forEach((key, value) -> {
if (!cmdParam.containsKey(key)) {
cmdParam.put(key, value);
if (!finalCmdParam.containsKey(key)) {
finalCmdParam.put(key, value);
}
});
}
// reset command parameter if sub process
if (cmdParam != null && cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
if (cmdParam.containsKey(Constants.CMD_PARAM_SUB_PROCESS)) {
processInstance.setCommandParam(command.getCommandParam());
}
if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
@ -1009,7 +1014,7 @@ public class ProcessServiceImpl implements ProcessService {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(Constants.COMMA, convertIntListToString(failedList)));
String.join(Constants.COMMA, convertIntListToString(failedList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;
@ -1027,7 +1032,7 @@ public class ProcessServiceImpl implements ProcessService {
initTaskInstance(this.findTaskInstanceById(taskId));
}
cmdParam.put(Constants.CMD_PARAM_RECOVERY_START_NODE_STRING,
String.join(",", convertIntListToString(stopNodeList)));
String.join(Constants.COMMA, convertIntListToString(stopNodeList)));
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
processInstance.setRunTimes(runTime + 1);
break;

Loading…
Cancel
Save