Browse Source

[fix-12721] Fix cannot modify the upstream task in task definition page. (#12722)

3.2.0-release
jackfanwan 1 year ago committed by GitHub
parent
commit
8439b5dc69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 100
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  2. 82
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  3. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java
  4. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java

100
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -63,6 +63,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@ -82,6 +83,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@ -122,6 +124,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogDao processTaskRelationLogDao;
@Autowired
private ProcessTaskRelationService processTaskRelationService;
@ -852,10 +857,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} else {
queryUpStreamTaskCodeMap = new HashMap<>();
}
if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) {
if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) {
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode());
// set upstream code list
updateUpstreamTask(new HashSet<>(queryUpStreamTaskCodeMap.keySet()),
taskCode, projectCode, taskRelation.getProcessDefinitionCode(), loginUser);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> relationList = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
@ -879,8 +889,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) && CollectionUtils.isNotEmpty(processTaskRelationList)) {
processTaskRelationList.add(processTaskRelationList.get(0));
}
updateDag(loginUser, taskRelation.getProcessDefinitionCode(), processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
logger.info(
"Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.",
@ -890,6 +898,92 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
private void updateUpstreamTask(Set<Long> allPreTaskCodeSet, long taskCode, long projectCode,
long processDefinitionCode, User loginUser) {
// query all process task relation
List<ProcessTaskRelation> hadProcessTaskRelationList = processTaskRelationMapper
.queryUpstreamByCode(projectCode, taskCode);
// remove pre
Set<Long> removePreTaskSet = new HashSet<>();
List<ProcessTaskRelation> removePreTaskList = new ArrayList<>();
// add pre
Set<Long> addPreTaskSet = new HashSet<>();
List<ProcessTaskRelation> addPreTaskList = new ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
// filter all process task relation
if (CollectionUtils.isNotEmpty(hadProcessTaskRelationList)) {
for (ProcessTaskRelation processTaskRelation : hadProcessTaskRelationList) {
if (processTaskRelation.getPreTaskCode() == 0) {
continue;
}
// had
if (allPreTaskCodeSet.contains(processTaskRelation.getPreTaskCode())) {
allPreTaskCodeSet.remove(processTaskRelation.getPreTaskCode());
} else {
// remove
removePreTaskSet.add(processTaskRelation.getPreTaskCode());
processTaskRelation.setPreTaskCode(0);
processTaskRelation.setPreTaskVersion(0);
removePreTaskList.add(processTaskRelation);
processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser, processTaskRelation));
}
}
}
// add
if (allPreTaskCodeSet.size() != 0) {
addPreTaskSet.addAll(allPreTaskCodeSet);
}
// get add task code map
allPreTaskCodeSet.add(Long.valueOf(taskCode));
List<TaskDefinition> taskDefinitionList = taskDefinitionMapper.queryByCodeList(allPreTaskCodeSet);
Map<Long, TaskDefinition> taskCodeMap = taskDefinitionList.stream().collect(Collectors
.toMap(TaskDefinition::getCode, Function.identity(), (a, b) -> a));
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
TaskDefinition taskDefinition = taskCodeMap.get(taskCode);
for (Long preTaskCode : addPreTaskSet) {
TaskDefinition preTaskRelation = taskCodeMap.get(preTaskCode);
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(
null, processDefinition.getVersion(), projectCode, processDefinition.getCode(),
preTaskRelation.getCode(), preTaskRelation.getVersion(),
taskDefinition.getCode(), taskDefinition.getVersion(), ConditionType.NONE, "{}");
addPreTaskList.add(processTaskRelation);
processTaskRelationLogList.add(createProcessTaskRelationLog(loginUser, processTaskRelation));
}
int insert = 0;
int remove = 0;
int log = 0;
// insert process task relation table data
if (CollectionUtils.isNotEmpty(addPreTaskList)) {
insert = processTaskRelationMapper.batchInsert(addPreTaskList);
}
if (CollectionUtils.isNotEmpty(removePreTaskList)) {
for (ProcessTaskRelation processTaskRelation : removePreTaskList) {
remove += processTaskRelationMapper.updateById(processTaskRelation);
}
}
if (CollectionUtils.isNotEmpty(processTaskRelationLogList)) {
log = processTaskRelationLogDao.batchInsert(processTaskRelationLogList);
}
if (insert + remove != log) {
throw new RuntimeException("updateUpstreamTask error");
}
}
private ProcessTaskRelationLog createProcessTaskRelationLog(User loginUser,
ProcessTaskRelation processTaskRelation) {
Date now = new Date();
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
}
/**
* switch task definition
*

82
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -56,12 +56,14 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -124,11 +126,15 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessDefinitionService processDefinitionService;
@Mock
private ProcessTaskRelationLogDao processTaskRelationLogDao;
private static final String TASK_PARAMETER =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
private static final long PROJECT_CODE = 1L;
private static final long PROCESS_DEFINITION_CODE = 2L;
private static final long TASK_CODE = 3L;
private static final String UPSTREAM_CODE = "3,5";
private static final int VERSION = 1;
private static final int RESOURCE_RATE = -1;
protected User user;
@ -169,14 +175,7 @@ public class TaskDefinitionServiceImplTest {
@Test
public void updateTaskDefinition() {
String taskDefinitionJson =
"{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}";
String taskDefinitionJson = getTaskDefinitionJson();;
Project project = getProject();
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project);
@ -591,6 +590,52 @@ public class TaskDefinitionServiceImplTest {
Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
}
@Test
public void testUpdateTaskWithUpstream() {
String taskDefinitionJson = getTaskDefinitionJson();
TaskDefinition taskDefinition = getTaskDefinition();
taskDefinition.setFlag(Flag.NO);
TaskDefinition taskDefinitionSecond = getTaskDefinition();
taskDefinitionSecond.setCode(5);
user.setUserType(UserType.ADMIN_USER);
Mockito.when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject());
Mockito.when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true);
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any())).thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByCodeList(Mockito.anySet()))
.thenReturn(Arrays.asList(taskDefinition, taskDefinitionSecond));
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(PROJECT_CODE, TASK_CODE))
.thenReturn(getProcessTaskRelationListV2());
Mockito.when(processDefinitionMapper.queryByCode(PROCESS_DEFINITION_CODE)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Mockito.when(processTaskRelationMapper.updateById(Mockito.any())).thenReturn(1);
Mockito.when(processTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2);
// success
Map<String, Object> successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE,
taskDefinitionJson, UPSTREAM_CODE);
Assertions.assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS));
user.setUserType(UserType.GENERAL_USER);
}
private String getTaskDefinitionJson() {
return "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\","
+ "\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\","
+ "\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":\\\"echo ${datetime}\\\","
+ "\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\","
+ "\"flag\":0,\"taskPriority\":0,\"workerGroup\":\"default\",\"failRetryTimes\":0,"
+ "\"failRetryInterval\":0,\"timeoutFlag\":0,\"timeoutNotifyStrategy\":0,\"timeout\":0,"
+ "\"delayTime\":0,\"resourceIds\":\"\"}";
}
/**
* create admin user
*/
@ -663,6 +708,27 @@ public class TaskDefinitionServiceImplTest {
return processTaskRelationList;
}
private List<ProcessTaskRelation> getProcessTaskRelationListV2() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
fillProcessTaskRelation(processTaskRelation);
processTaskRelationList.add(processTaskRelation);
processTaskRelation = new ProcessTaskRelation();
fillProcessTaskRelation(processTaskRelation);
processTaskRelation.setPreTaskCode(4L);
processTaskRelationList.add(processTaskRelation);
return processTaskRelationList;
}
private void fillProcessTaskRelation(ProcessTaskRelation processTaskRelation) {
processTaskRelation.setProjectCode(PROJECT_CODE);
processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation.setPreTaskCode(TASK_CODE);
processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
}
private List<ProcessTaskRelationLog> getProcessTaskRelationLogList() {
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessTaskRelationLogDao.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface ProcessTaskRelationLogDao {
@ -26,4 +28,12 @@ public interface ProcessTaskRelationLogDao {
List<ProcessTaskRelationLog> findByWorkflowDefinitionCode(long workflowDefinitionCode);
void deleteByWorkflowDefinitionCode(long workflowDefinitionCode);
/**
* batch insert process task relation
*
* @param taskRelationList taskRelationList
* @return int
*/
int batchInsert(@Param("taskRelationList") List<ProcessTaskRelationLog> taskRelationList);
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessTaskRelationLogDaoImpl.java

@ -41,4 +41,9 @@ public class ProcessTaskRelationLogDaoImpl implements ProcessTaskRelationLogDao
public void deleteByWorkflowDefinitionCode(long workflowDefinitionCode) {
processTaskRelationLogMapper.deleteByWorkflowDefinitionCode(workflowDefinitionCode);
}
@Override
public int batchInsert(List<ProcessTaskRelationLog> taskRelationList) {
return processTaskRelationLogMapper.batchInsert(taskRelationList);
}
}

Loading…
Cancel
Save