Browse Source

[FIX-5908][MasterServer] When executing an compensation task, the execution thread would have a NPE (#5909)

* fix the npe in MasterExec

* fix the compile error
2.0.7-release
kyoty 3 years ago committed by GitHub
parent
commit
2fa3b419a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -46,6 +46,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
@ -525,9 +526,9 @@ public class MasterExecThread implements Runnable {
return taskInstance; return taskInstance;
} }
public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) { public void getPreVarPool(TaskInstance taskInstance, Set<String> preTask) {
Map<String,Property> allProperty = new HashMap<>(); Map<String, Property> allProperty = new HashMap<>();
Map<String,TaskInstance> allTaskInstance = new HashMap<>(); Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) { if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskName : preTask) { for (String preTaskName : preTask) {
TaskInstance preTaskInstance = completeTaskList.get(preTaskName); TaskInstance preTaskInstance = completeTaskList.get(preTaskName);
@ -565,17 +566,17 @@ public class MasterExecThread implements Runnable {
TaskInstance otherTask = allTaskInstance.get(proName); TaskInstance otherTask = allTaskInstance.get(proName);
if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) { if (otherTask.getEndTime().getTime() > preTaskInstance.getEndTime().getTime()) {
allProperty.put(proName, thisProperty); allProperty.put(proName, thisProperty);
allTaskInstance.put(proName,preTaskInstance); allTaskInstance.put(proName, preTaskInstance);
} else { } else {
allProperty.put(proName, otherPro); allProperty.put(proName, otherPro);
} }
} else { } else {
allProperty.put(proName, thisProperty); allProperty.put(proName, thisProperty);
allTaskInstance.put(proName,preTaskInstance); allTaskInstance.put(proName, preTaskInstance);
} }
} else { } else {
allProperty.put(proName, thisProperty); allProperty.put(proName, thisProperty);
allTaskInstance.put(proName,preTaskInstance); allTaskInstance.put(proName, preTaskInstance);
} }
} }
@ -947,7 +948,7 @@ public class MasterExecThread implements Runnable {
if (!sendTimeWarning && checkProcessTimeOut(processInstance)) { if (!sendTimeWarning && checkProcessTimeOut(processInstance)) {
processAlertManager.sendProcessTimeoutAlert(processInstance, processAlertManager.sendProcessTimeoutAlert(processInstance,
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())); processInstance.getProcessDefinitionVersion()));
sendTimeWarning = true; sendTimeWarning = true;
} }
for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : activeTaskNode.entrySet()) { for (Map.Entry<MasterBaseTaskExecThread, Future<Boolean>> entry : activeTaskNode.entrySet()) {
@ -976,7 +977,9 @@ public class MasterExecThread implements Runnable {
task.getName(), task.getId(), task.getState()); task.getName(), task.getId(), task.getState());
// node success , post node submit // node success , post node submit
if (task.getState() == ExecutionStatus.SUCCESS) { if (task.getState() == ExecutionStatus.SUCCESS) {
ProcessDefinition relatedProcessDefinition = processInstance.getProcessDefinition();
processInstance = processService.findProcessInstanceById(processInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId());
processInstance.setProcessDefinition(relatedProcessDefinition);
processInstance.setVarPool(task.getVarPool()); processInstance.setVarPool(task.getVarPool());
processService.updateProcessInstance(processInstance); processService.updateProcessInstance(processInstance);
completeTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task);

Loading…
Cancel
Save