Browse Source

[Bug] [Master] Workflow keep running when task has no id (#14315)

* change completeTaskMap to completeTaskSet

* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

Co-authored-by: Wenjun Ruan <wenjun@apache.org>

* change method name

* fix ut

* fix spotless

---------

Co-authored-by: Wenjun Ruan <wenjun@apache.org>
3.2.1-prepare
旺阳 1 year ago committed by GitHub
parent
commit
465e7ae6ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java
  2. 142
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 32
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java

@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
@ -55,7 +55,7 @@ public class TaskStateEventHandler implements StateEventHandler {
"Handle task instance state event, the current task instance state {} will be changed to {}",
task.getState().name(), taskStateEvent.getStatus().name());
Map<Long, Integer> completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap();
Set<Long> completeTaskSet = workflowExecuteRunnable.getCompleteTaskCodes();
if (task.getState().isFinished()
&& (taskStateEvent.getStatus() != null && taskStateEvent.getStatus().isRunning())) {
String errorMessage = String.format(
@ -67,8 +67,7 @@ public class TaskStateEventHandler implements StateEventHandler {
}
if (task.getState().isFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode())
&& completeTaskMap.get(task.getTaskCode()).equals(task.getId())) {
if (completeTaskSet.contains(task.getTaskCode())) {
log.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}

142
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -199,10 +199,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
private final Map<Long, Integer> errorTaskMap = new ConcurrentHashMap<>();
/**
* complete task map, taskCode as key, taskInstanceId as value
* complete task set
* in a DAG, only one taskInstance per taskCode is valid
*/
private final Map<Long, Integer> completeTaskMap = new ConcurrentHashMap<>();
private final Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
/**
* depend failed task set
@ -444,7 +444,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
if (taskInstance.getState().isSuccess()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
completeTaskSet.add(taskInstance.getTaskCode());
mergeTaskInstanceVarPool(taskInstance);
processInstanceDao.upsertProcessInstance(processInstance);
// save the cacheKey only if the task is defined as cache task and the task is success
@ -459,7 +459,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
completeTaskSet.add(taskInstance.getTaskCode());
// There are child nodes and the failure policy is: CONTINUE
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
Long.toString(taskInstance.getTaskCode()),
@ -472,8 +472,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
}
}
} else if (taskInstance.getState().isFinished()) {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// todo: when the task instance type is pause, then it should not in completeTaskSet
completeTaskSet.add(taskInstance.getTaskCode());
}
log.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
taskInstance.getTaskCode(),
@ -482,9 +482,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(), taskInstance.getHost());
} catch (Exception ex) {
log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskSet", ex);
// remove the task from complete map, so that we can finish in the next time.
completeTaskMap.remove(taskInstance.getTaskCode());
completeTaskSet.remove(taskInstance.getTaskCode());
throw ex;
}
}
@ -873,7 +873,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
// do we need to clear?
taskExecuteRunnableMap.clear();
dependFailedTaskSet.clear();
completeTaskMap.clear();
completeTaskSet.clear();
errorTaskMap.clear();
if (!isNewProcessInstance()) {
@ -909,7 +909,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
if (task.isTaskComplete()) {
log.info("TaskInstance is already complete.");
completeTaskMap.put(task.getTaskCode(), task.getId());
completeTaskSet.add(task.getTaskCode());
continue;
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()),
@ -980,9 +980,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
}
}
}
log.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}",
log.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskSet: {}, errorTaskMap: {}",
dependFailedTaskSet,
completeTaskMap,
completeTaskSet,
errorTaskMap);
}
@ -1236,7 +1236,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskCode : preTask) {
Integer taskId = completeTaskMap.get(Long.parseLong(preTaskCode));
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(Long.parseLong(preTaskCode));
if (!existTaskInstanceOptional.isPresent()) {
continue;
}
Integer taskId = existTaskInstanceOptional.get().getId();
if (taskId == null) {
continue;
}
@ -1303,20 +1308,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
*/
private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
Long taskConde = entry.getKey();
Integer taskInstanceId = entry.getValue();
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
if (taskInstance == null) {
log.warn("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}",
taskInstanceId,
taskConde);
// This case will happen when we submit to db failed, then the taskInstanceId is 0
continue;
completeTaskSet.forEach(taskCode -> {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode);
if (existTaskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId());
if (taskInstance == null) {
// This case will happen when we submit to db failed, then the taskInstanceId is 0
log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}",
taskCode);
} else {
completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance);
}
}
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
});
}
return completeTaskInstanceMap;
}
@ -1364,17 +1370,21 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
}
// the end node of the branch of the dag
if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) {
TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode)));
String taskInstanceVarPool = endTaskInstance.getVarPool();
if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
String processInstanceVarPool = processInstance.getVarPool();
if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties = new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties));
} else {
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(NumberUtils.toLong(parentNodeCode));
if (existTaskInstanceOptional.isPresent()) {
TaskInstance endTaskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId());
String taskInstanceVarPool = endTaskInstance.getVarPool();
if (StringUtils.isNotEmpty(taskInstanceVarPool)) {
Set<Property> taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class));
String processInstanceVarPool = processInstance.getVarPool();
if (StringUtils.isNotEmpty(processInstanceVarPool)) {
Set<Property> properties =
new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class));
properties.addAll(taskProperties);
processInstance.setVarPool(JSONUtils.toJsonString(properties));
} else {
processInstance.setVarPool(JSONUtils.toJsonString(taskProperties));
}
}
}
}
@ -1387,7 +1397,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
continue;
}
if (task.getId() != null && completeTaskMap.containsKey(task.getTaskCode())) {
if (task.getId() != null && completeTaskSet.contains(task.getTaskCode())) {
log.info("Task has already run success, taskName: {}", task.getName());
continue;
}
@ -1459,12 +1469,18 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
for (String depsNode : indirectDepCodeList) {
if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) {
// dependencies must be fully completed
Long despNodeTaskCode = Long.parseLong(depsNode);
if (!completeTaskMap.containsKey(despNodeTaskCode)) {
long despNodeTaskCode = Long.parseLong(depsNode);
if (!completeTaskSet.contains(despNodeTaskCode)) {
return DependResult.WAITING;
}
Integer depsTaskId = completeTaskMap.get(despNodeTaskCode);
TaskExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState();
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(despNodeTaskCode);
if (!existTaskInstanceOptional.isPresent()) {
return DependResult.NON_EXEC;
}
TaskExecutionStatus depTaskState =
taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
if (depTaskState.isKill()) {
return DependResult.NON_EXEC;
}
@ -1484,7 +1500,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
}
}
log.info("The dependTasks of task all success, currentTaskCode: {}, dependTaskCodes: {}",
taskCode, Arrays.toString(completeTaskMap.keySet().toArray()));
taskCode, Arrays.toString(completeTaskSet.toArray()));
return DependResult.SUCCESS;
}
@ -1535,8 +1551,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
.contains(nextNodeCode);
}
long taskCode = Long.parseLong(dependNodeCode);
Integer taskInstanceId = completeTaskMap.get(taskCode);
TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState();
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode);
if (!existTaskInstanceOptional.isPresent()) {
return false;
}
TaskExecutionStatus depTaskState = taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState();
return !depTaskState.isFailure();
}
@ -1548,12 +1568,17 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
*/
private List<TaskInstance> getCompleteTaskByState(TaskExecutionStatus state) {
List<TaskInstance> resultList = new ArrayList<>();
for (Integer taskInstanceId : completeTaskMap.values()) {
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
if (taskInstance != null && taskInstance.getState() == state) {
resultList.add(taskInstance);
completeTaskSet.forEach(taskCode -> {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode);
if (existTaskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId());
if (taskInstance != null && taskInstance.getState() == state) {
resultList.add(taskInstance);
}
}
}
});
return resultList;
}
@ -1905,7 +1930,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
"Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());
completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task);
submitPostNode(Long.toString(task.getTaskCode()));
continue;
@ -1931,7 +1956,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
processInstance.getId(),
task.getTaskCode());
}
completeTaskMap.put(task.getTaskCode(), task.getId());
completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task);
errorTaskMap.put(task.getTaskCode(), task.getId());
@ -2066,8 +2091,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
return false;
}
public Map<Long, Integer> getCompleteTaskMap() {
return completeTaskMap;
public Set<Long> getCompleteTaskCodes() {
return completeTaskSet;
}
public Map<Long, DefaultTaskExecuteRunnable> getTaskExecuteRunnableMap() {
@ -2117,7 +2142,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
* 1. find all task code from sub dag (only contains related task)
* 2. set the flag of tasks to Flag.NO
* 3. clear varPool data from re-execute task instance in process instance
* 4. remove related task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap
* 4. remove related task instance from taskInstanceMap, completeTaskSet, validTaskMap, errorTaskMap
*
* @return task instance
*/
@ -2176,9 +2201,14 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
processInstance.setVarPool(JSONUtils.toJsonString(processProperties));
processInstanceDao.updateById(processInstance);
// remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap
// remove task instance from taskInstanceMap, completeTaskSet, validTaskMap, errorTaskMap
// completeTaskSet remove dependency taskInstanceMap, so the sort can't change
completeTaskSet.removeIf(set -> {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(set);
return existTaskInstanceOptional
.filter(taskInstance -> dag.containsNode(Integer.toString(taskInstance.getId()))).isPresent();
});
taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode())));
completeTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
validTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
errorTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey())));
}

32
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -69,6 +69,8 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.springframework.context.ApplicationContext;
import com.google.common.collect.Sets;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkflowExecuteRunnableTest {
@ -207,15 +209,15 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
completeTaskSet.add(taskInstance1.getTaskCode());
completeTaskSet.add(taskInstance2.getTaskCode());
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);
Field completeTaskSetField = masterExecThreadClass.getDeclaredField("completeTaskSet");
completeTaskSetField.setAccessible(true);
completeTaskSetField.set(workflowExecuteThread, completeTaskSet);
Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap");
taskInstanceMapField.setAccessible(true);
@ -225,10 +227,10 @@ public class WorkflowExecuteRunnableTest {
Assertions.assertNotNull(taskInstance.getVarPool());
taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]");
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
completeTaskSet.add(taskInstance2.getTaskCode());
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);
completeTaskSetField.setAccessible(true);
completeTaskSetField.set(workflowExecuteThread, completeTaskSet);
taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
@ -279,15 +281,15 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
Map<Long, Integer> completeTaskList = new ConcurrentHashMap<>();
completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId());
completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId());
Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
completeTaskSet.add(taskInstance1.getTaskCode());
completeTaskSet.add(taskInstance2.getTaskCode());
Class<WorkflowExecuteRunnable> masterExecThreadClass = WorkflowExecuteRunnable.class;
Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap");
Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskSet");
completeTaskMapField.setAccessible(true);
completeTaskMapField.set(workflowExecuteThread, completeTaskList);
completeTaskMapField.set(workflowExecuteThread, completeTaskSet);
Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap");
taskInstanceMapField.setAccessible(true);
@ -318,7 +320,7 @@ public class WorkflowExecuteRunnableTest {
workflowExecuteThread.clearDataIfExecuteTask();
Assertions.assertEquals(1, taskInstanceMap.size());
Assertions.assertEquals(1, completeTaskList.size());
Assertions.assertEquals(1, completeTaskSet.size());
}

Loading…
Cancel
Save