Browse Source

[Fix-6302] [API] Add a new task to an existing workflow, and the task is lost (#6307)

* fix processDefinition save/update/delete/move/import transaction

* fix ut

* fix processDefinnition release/processInstance update/taskDefinition transaction

* fix processDefinnition release/processInstance update/taskDefinition transaction

* fix 6302 update processDefinition,the task lost

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinYong Li 3 years ago committed by GitHub
parent
commit
0b53adeb07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  2. 37
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

14
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2188,23 +2188,23 @@ public class ProcessService {
} }
newTaskDefinitionLogs.add(taskDefinitionLog); newTaskDefinitionLogs.add(taskDefinitionLog);
} }
int insertResult = 0;
int updateResult = 0;
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) { for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode()); TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
if (task == null) { if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate); newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else { } else {
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
taskDefinitionToUpdate.setId(task.getId()); taskDefinitionToUpdate.setId(task.getId());
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate); updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
return update & insert;
} }
} }
if (!newTaskDefinitionLogs.isEmpty()) { if (!newTaskDefinitionLogs.isEmpty()) {
int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
return logInsert & insert;
} }
return Constants.EXIT_CODE_SUCCESS; return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
} }
/** /**

37
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
@ -51,6 +52,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
@ -72,6 +74,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
/** /**
* process service test * process service test
@ -100,6 +103,8 @@ public class ProcessServiceTest {
@Mock @Mock
private TaskDefinitionLogMapper taskDefinitionLogMapper; private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Mock @Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock @Mock
private ProcessDefinitionLogMapper processDefineLogMapper; private ProcessDefinitionLogMapper processDefineLogMapper;
@ -360,6 +365,38 @@ public class ProcessServiceTest {
Assert.assertEquals(0, processService.switchVersion(processDefinition, processDefinitionLog)); Assert.assertEquals(0, processService.switchVersion(processDefinition, processDefinitionLog));
} }
@Test
public void testSaveTaskDefine() {
User operator = new User();
operator.setId(-1);
operator.setUserType(UserType.GENERAL_USER);
long projectCode = 751485690568704L;
String taskJson = "[{\"code\":751500437479424,\"name\":\"aa\",\"version\":1,\"description\":\"\",\"delayTime\":0,"
+ "\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"sleep 1s\\necho 11\","
+ "\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"waitStartTimeout\":{}},"
+ "\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"yarn\",\"failRetryTimes\":0,\"failRetryInterval\":1,"
+ "\"timeoutFlag\":\"OPEN\",\"timeoutNotifyStrategy\":\"FAILED\",\"timeout\":1,\"environmentCode\":751496815697920},"
+ "{\"code\":751516889636864,\"name\":\"bb\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":{\"resourceList\":[],"
+ "\"localParams\":[],\"rawScript\":\"echo 22\",\"dependence\":{},\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},"
+ "\"waitStartTimeout\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":\"0\","
+ "\"failRetryInterval\":\"1\",\"timeoutFlag\":\"CLOSE\",\"timeoutNotifyStrategy\":\"\",\"timeout\":0,\"delayTime\":\"0\",\"environmentCode\":-1}]";
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskJson, TaskDefinitionLog.class);
TaskDefinitionLog taskDefinition = new TaskDefinitionLog();
taskDefinition.setCode(751500437479424L);
taskDefinition.setName("aa");
taskDefinition.setProjectCode(751485690568704L);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
taskDefinition.setUserId(-1);
taskDefinition.setVersion(1);
taskDefinition.setCreateTime(new Date());
taskDefinition.setUpdateTime(new Date());
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(), taskDefinition.getVersion())).thenReturn(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByCode(taskDefinition.getCode())).thenReturn(taskDefinition);
int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs);
Assert.assertEquals(0, result);
}
@Test @Test
public void testGenDagGraph() { public void testGenDagGraph() {
ProcessDefinition processDefinition = new ProcessDefinition(); ProcessDefinition processDefinition = new ProcessDefinition();

Loading…
Cancel
Save