Browse Source

[Bug][Master] The task of dependent should use taskCode (#6730)

* fix #6682 The task of dependent should use taskCode

* fix #6682 The task of dependent should use taskCode
3.0.0/version-upgrade
OS 3 years ago committed by GitHub
parent
commit
24bd8a16e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  2. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java
  5. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java
  6. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  7. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  8. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  9. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  10. 44
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  11. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  12. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  13. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/contextMenu.vue

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.MAX_TASK_TIMEOUT;
@ -409,7 +409,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param loginUser login user
* @param instanceId instance id
* @param processDefinitionCode process definition code
* @param version
* @param processVersion
* @param commandType command type
* @return insert result code
*/
@ -533,7 +533,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
}
if (!StringUtils.isEmpty(startNodeList)) {
cmdParam.put(CMD_PARAM_START_NODE_NAMES, startNodeList);
cmdParam.put(CMD_PARAM_START_NODES, startNodeList);
}
if (warningType != null) {
command.setWarningType(warningType);

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -434,6 +434,8 @@ public final class Constants {
public static final String CMD_PARAM_START_NODE_NAMES = "StartNodeNameList";
public static final String CMD_PARAM_START_NODES = "StartNodeList";
public static final String CMD_PARAM_START_PARAMS = "StartParams";
public static final String CMD_PARAM_FATHER_PARAMS = "fatherParams";
@ -784,6 +786,8 @@ public final class Constants {
public static final String CONTENT = "content";
public static final String DEPENDENT_SPLIT = ":||";
public static final String DEPENDENT_ALL = "ALL";
public static final long DEPENDENT_ALL_TASK_CODE = 0;
/**

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java

@ -26,16 +26,16 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
public class DependentItem {
private long projectCode;
private long definitionCode;
private String depTasks;
private long depTaskCode;
private String cycle;
private String dateValue;
private DependResult dependResult;
private ExecutionStatus status;
public String getKey() {
return String.format("%d-%s-%s-%s",
return String.format("%d-%d-%s-%s",
getDefinitionCode(),
getDepTasks(),
getDepTaskCode(),
getCycle(),
getDateValue());
}
@ -56,12 +56,12 @@ public class DependentItem {
this.definitionCode = definitionCode;
}
public String getDepTasks() {
return depTasks;
public long getDepTaskCode() {
return depTaskCode;
}
public void setDepTasks(String depTasks) {
this.depTasks = depTasks;
public void setDepTaskCode(long depTaskCode) {
this.depTaskCode = depTaskCode;
}
public String getCycle() {

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/DagHelper.java

@ -106,7 +106,7 @@ public class DagHelper {
} else {
// specified start nodes or resume execution
for (String startNodeCode : startNodeList) {
TaskNode startNode = findNodeByName(taskNodeList, startNodeCode);
TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode);
List<TaskNode> childNodeList = new ArrayList<>();
if (startNode == null) {
logger.error("start node name [{}] is not in task node list [{}] ",

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/TaskInstanceTest.java

@ -75,7 +75,7 @@ public class TaskInstanceTest {
List<DependentTaskModel> dependTaskList = new ArrayList<>();
List<DependentItem> dependentItems = new ArrayList<>();
DependentItem dependentItem = new DependentItem();
dependentItem.setDepTasks("A");
dependentItem.setDepTaskCode(111L);
dependentItem.setDefinitionCode(222L);
dependentItem.setCycle("today");
dependentItems.add(dependentItem);

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -21,9 +21,8 @@ import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_D
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -810,8 +809,8 @@ public class WorkflowExecuteThread implements Runnable {
Map<String, Property> allProperty = new HashMap<>();
Map<String, TaskInstance> allTaskInstance = new HashMap<>();
if (CollectionUtils.isNotEmpty(preTask)) {
for (String preTaskName : preTask) {
TaskInstance preTaskInstance = completeTaskList.get(preTaskName);
for (String preTaskCode : preTask) {
TaskInstance preTaskInstance = completeTaskList.get(preTaskCode);
if (preTaskInstance == null) {
continue;
}
@ -1360,8 +1359,8 @@ public class WorkflowExecuteThread implements Runnable {
if (paramMap == null) {
return startNodeNameList;
}
if (paramMap.containsKey(CMD_PARAM_START_NODE_NAMES)) {
startNodeNameList = Arrays.asList(paramMap.get(CMD_PARAM_START_NODE_NAMES).split(Constants.COMMA));
if (paramMap.containsKey(CMD_PARAM_START_NODES)) {
startNodeNameList = Arrays.asList(paramMap.get(CMD_PARAM_START_NODES).split(Constants.COMMA));
}
return startNodeNameList;
}

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -63,7 +63,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
/**
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
@ -159,7 +159,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getName(), task.getState());
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
}
List<DependResult> modelResultList = new ArrayList<>();
@ -181,18 +181,18 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
private DependResult getDependResultForItem(DependentItem item) {
DependResult dependResult = DependResult.SUCCESS;
if (!completeTaskList.containsKey(item.getDepTasks())) {
logger.info("depend item: {} have not completed yet.", item.getDepTasks());
if (!completeTaskList.containsKey(item.getDepTaskCode())) {
logger.info("depend item: {} have not completed yet.", item.getDepTaskCode());
dependResult = DependResult.FAILED;
return dependResult;
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTaskCode());
if (executionStatus != item.getStatus()) {
logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTasks(), item.getStatus(), executionStatus);
logger.info("depend item : {} expect status: {}, actual status: {}", item.getDepTaskCode(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED;
}
logger.info("dependent item complete {} {},{}",
Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult);
Constants.DEPENDENT_SPLIT, item.getDepTaskCode(), dependResult);
return dependResult;
}

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -113,10 +113,10 @@ public class DependentExecute {
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
if (dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)) {
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByProcessInstance(processInstance);
} else {
result = getDependTaskResult(dependentItem.getDepTasks(), processInstance);
result = getDependTaskResult(dependentItem.getDepTaskCode(), processInstance);
}
if (result != DependResult.SUCCESS) {
break;
@ -143,17 +143,17 @@ public class DependentExecute {
/**
* get depend task result
*
* @param taskName
* @param taskCode
* @param processInstance
* @return
*/
private DependResult getDependTaskResult(String taskName, ProcessInstance processInstance) {
private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance) {
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : taskInstanceList) {
if (task.getName().equals(taskName)) {
if (task.getTaskCode() == taskCode) {
taskInstance = task;
break;
}

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -140,7 +140,7 @@ public class ConditionsTaskTest {
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
DependentItem dependentItem = new DependentItem();
dependentItem.setDepTasks("1");
dependentItem.setDepTaskCode(11L);
dependentItem.setStatus(ExecutionStatus.SUCCESS);
DependentTaskModel dependentTaskModel = new DependentTaskModel();

44
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -60,6 +59,10 @@ public class DependentTaskTest {
public static final Long TASK_CODE = 1111L;
public static final Long DEPEND_TASK_CODE_A = 110L;
public static final Long DEPEND_TASK_CODE_B = 111L;
public static final Long DEPEND_TASK_CODE_C = 112L;
public static final Long DEPEND_TASK_CODE_D = 113L;
public static final int TASK_VERSION = 1;
private ProcessService processService;
@ -125,7 +128,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2L, "A", "today", "day")
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -152,8 +155,8 @@ public class DependentTaskTest {
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "A", dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "B", dependentProcessInstance)
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A, dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, DEPEND_TASK_CODE_B, dependentProcessInstance)
).collect(Collectors.toList()));
}
@ -172,8 +175,8 @@ public class DependentTaskTest {
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "B", dependentProcessInstance)
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, DEPEND_TASK_CODE_A, dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B, dependentProcessInstance)
).collect(Collectors.toList()));
}
@ -182,15 +185,15 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel1 = new DependentTaskModel();
dependentTaskModel1.setRelation(DependentRelation.AND);
dependentTaskModel1.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2L, "A", "today", "day"),
getDependentItemFromTaskNode(3L, "B", "today", "day")
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day"),
getDependentItemFromTaskNode(3L, DEPEND_TASK_CODE_B, "today", "day")
).collect(Collectors.toList()));
DependentTaskModel dependentTaskModel2 = new DependentTaskModel();
dependentTaskModel2.setRelation(DependentRelation.OR);
dependentTaskModel2.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2L, "A", "today", "day"),
getDependentItemFromTaskNode(3L, "C", "today", "day")
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day"),
getDependentItemFromTaskNode(3L, DEPEND_TASK_CODE_C, "today", "day")
).collect(Collectors.toList()));
/*
@ -225,13 +228,13 @@ public class DependentTaskTest {
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", processInstance200)
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, DEPEND_TASK_CODE_A, processInstance200)
).collect(Collectors.toList()));
Mockito.when(processService
.findValidTaskListByProcessId(300))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, "B", processInstance300),
getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, "C", processInstance300)
getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B, processInstance300),
getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, DEPEND_TASK_CODE_C, processInstance300)
).collect(Collectors.toList()));
//DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
@ -247,7 +250,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL, "today", "day")
getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL_TASK_CODE, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -300,7 +303,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2L, "A", "today", "day")
getDependentItemFromTaskNode(2L, DEPEND_TASK_CODE_A, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -327,7 +330,7 @@ public class DependentTaskTest {
.thenAnswer(i -> {
processInstance.setState(ExecutionStatus.READY_STOP);
return Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, "A", dependentProcessInstance)
getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, DEPEND_TASK_CODE_A, dependentProcessInstance)
).collect(Collectors.toList());
})
.thenThrow(new IllegalStateException("have not been stopped as expected"));
@ -351,6 +354,7 @@ public class DependentTaskTest {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-10");
taskNode.setName("D");
taskNode.setCode(DEPEND_TASK_CODE_D);
taskNode.setType(TaskType.DEPENDENT.getDesc());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
return taskNode;
@ -380,10 +384,10 @@ public class DependentTaskTest {
/**
* DependentItem defines the condition for the dependent
*/
private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, String taskName, String date, String cycle) {
private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, long taskCode, String date, String cycle) {
DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionCode(processDefinitionCode);
dependentItem.setDepTasks(taskName);
dependentItem.setDepTaskCode(taskCode);
dependentItem.setDateValue(date);
dependentItem.setCycle(cycle);
// so far, the following fields have no effect
@ -401,12 +405,12 @@ public class DependentTaskTest {
private TaskInstance getTaskInstanceForValidTaskList(
int taskInstanceId, ExecutionStatus state,
String taskName, ProcessInstance processInstance
long taskCode, ProcessInstance processInstance
) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType(TaskType.DEPENDENT.getDesc());
taskInstance.setId(taskInstanceId);
taskInstance.setName(taskName);
taskInstance.setTaskCode(taskCode);
taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setState(state);
return taskInstance;

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVERY_START_NODE_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODE_NAMES;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_NODES;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -155,7 +155,7 @@ public class WorkflowExecuteThreadTest {
public void testParseStartNodeName() throws ParseException {
try {
Map<String, String> cmdParam = new HashMap<>();
cmdParam.put(CMD_PARAM_START_NODE_NAMES, "t1,t2,t3");
cmdParam.put(CMD_PARAM_START_NODES, "1,2,3");
Mockito.when(processInstance.getCommandParam()).thenReturn(JSONUtils.toJsonString(cmdParam));
Class<WorkflowExecuteThread> masterExecThreadClass = WorkflowExecuteThread.class;
Method method = masterExecThreadClass.getDeclaredMethod("parseStartNodeName", String.class);

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -794,8 +794,8 @@ public class ProcessService {
private Boolean checkCmdParam(Command command, Map<String, String> cmdParam) {
if (command.getTaskDependType() == TaskDependType.TASK_ONLY || command.getTaskDependType() == TaskDependType.TASK_PRE) {
if (cmdParam == null
|| !cmdParam.containsKey(Constants.CMD_PARAM_START_NODE_NAMES)
|| cmdParam.get(Constants.CMD_PARAM_START_NODE_NAMES).isEmpty()) {
|| !cmdParam.containsKey(Constants.CMD_PARAM_START_NODES)
|| cmdParam.get(Constants.CMD_PARAM_START_NODES).isEmpty()) {
logger.error("command node depend type is {}, but start nodes is null ", command.getTaskDependType());
return false;
}

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/contextMenu.vue

@ -104,7 +104,7 @@
this.currentTask = { ...this.currentTask, ...task }
},
onStart () {
this.dagChart.startRunning(this.currentTask.name)
this.dagChart.startRunning(this.currentTask.code)
},
onEdit () {
this.dagChart.openFormModel(this.currentTask.code, this.currentTask.type)

Loading…
Cancel
Save