|
|
|
@ -66,6 +66,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
|
|
|
|
|
import org.apache.dolphinscheduler.service.task.TaskPluginManager; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections.CollectionUtils; |
|
|
|
|
import org.apache.commons.collections.MapUtils; |
|
|
|
|
import org.apache.commons.lang3.StringUtils; |
|
|
|
|
|
|
|
|
|
import java.lang.reflect.InvocationTargetException; |
|
|
|
@ -150,7 +151,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); |
|
|
|
|
if (taskDefinitionLogs.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isEmpty(taskDefinitionLogs)) { |
|
|
|
|
logger.warn("Parameter taskDefinitionJson is invalid."); |
|
|
|
|
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); |
|
|
|
|
return result; |
|
|
|
@ -336,7 +337,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
// upstreamTaskCodes - queryUpStreamTaskCodes
|
|
|
|
|
Set<Long> diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)) |
|
|
|
|
.collect(Collectors.toSet()); |
|
|
|
|
if (!diffCode.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(diffCode)) { |
|
|
|
|
String taskCodes = StringUtils.join(diffCode, Constants.COMMA); |
|
|
|
|
logger.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", |
|
|
|
|
taskCodes); |
|
|
|
@ -438,7 +439,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
// Whether task have downstream tasks
|
|
|
|
|
List<ProcessTaskRelation> processTaskRelationList = |
|
|
|
|
processTaskRelationMapper.queryDownstreamByTaskCode(taskDefinition.getCode()); |
|
|
|
|
if (!processTaskRelationList.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(processTaskRelationList)) { |
|
|
|
|
Set<Long> postTaskCodes = processTaskRelationList |
|
|
|
|
.stream() |
|
|
|
|
.map(ProcessTaskRelation::getPostTaskCode) |
|
|
|
@ -474,7 +475,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
// Delete task upstream tasks if exists
|
|
|
|
|
List<ProcessTaskRelation> taskRelationList = |
|
|
|
|
processTaskRelationMapper.queryUpstreamByCode(taskDefinition.getProjectCode(), taskCode); |
|
|
|
|
if (!taskRelationList.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(taskRelationList)) { |
|
|
|
|
logger.debug( |
|
|
|
|
"Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", |
|
|
|
|
taskCode); |
|
|
|
@ -540,7 +541,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
} |
|
|
|
|
List<ProcessTaskRelation> taskRelationList = |
|
|
|
|
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); |
|
|
|
|
if (!taskRelationList.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(taskRelationList)) { |
|
|
|
|
logger.info( |
|
|
|
|
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", |
|
|
|
|
taskCode); |
|
|
|
@ -614,7 +615,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
|
|
|
|
|
List<ProcessTaskRelation> taskRelationList = |
|
|
|
|
processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode); |
|
|
|
|
if (!taskRelationList.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(taskRelationList)) { |
|
|
|
|
logger.info( |
|
|
|
|
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", |
|
|
|
|
taskCode); |
|
|
|
@ -794,13 +795,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Map<Long, TaskDefinition> queryUpStreamTaskCodeMap; |
|
|
|
|
if (!upstreamTaskCodes.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) { |
|
|
|
|
List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); |
|
|
|
|
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream() |
|
|
|
|
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); |
|
|
|
|
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
|
|
|
|
|
upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet()); |
|
|
|
|
if (!upstreamTaskCodes.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) { |
|
|
|
|
String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA); |
|
|
|
|
logger.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.", |
|
|
|
|
notExistTaskCodes); |
|
|
|
@ -810,7 +811,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
} else { |
|
|
|
|
queryUpStreamTaskCodeMap = new HashMap<>(); |
|
|
|
|
} |
|
|
|
|
if (!upstreamTaskRelations.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) { |
|
|
|
|
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0); |
|
|
|
|
List<ProcessTaskRelation> processTaskRelations = |
|
|
|
|
processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); |
|
|
|
@ -834,7 +835,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion()); |
|
|
|
|
processTaskRelationList.add(taskRelation); |
|
|
|
|
} |
|
|
|
|
if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) { |
|
|
|
|
if (MapUtils.isEmpty(queryUpStreamTaskCodeMap) && CollectionUtils.isNotEmpty(processTaskRelationList)) { |
|
|
|
|
processTaskRelationList.add(processTaskRelationList.get(0)); |
|
|
|
|
} |
|
|
|
|
updateDag(loginUser, taskRelation.getProcessDefinitionCode(), processTaskRelations, |
|
|
|
@ -888,7 +889,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
if (switchVersion > 0) { |
|
|
|
|
List<ProcessTaskRelation> taskRelationList = |
|
|
|
|
processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); |
|
|
|
|
if (!taskRelationList.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(taskRelationList)) { |
|
|
|
|
logger.info( |
|
|
|
|
"Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", |
|
|
|
|
taskCode); |
|
|
|
@ -1022,7 +1023,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
|
|
|
|
|
taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName, |
|
|
|
|
searchTaskName, taskType, taskExecuteType); |
|
|
|
|
List<TaskMainInfo> records = taskMainInfoIPage.getRecords(); |
|
|
|
|
if (!records.isEmpty()) { |
|
|
|
|
if (CollectionUtils.isNotEmpty(records)) { |
|
|
|
|
Map<Long, TaskMainInfo> taskMainInfoMap = new HashMap<>(); |
|
|
|
|
for (TaskMainInfo info : records) { |
|
|
|
|
taskMainInfoMap.compute(info.getTaskCode(), (k, v) -> { |
|
|
|
|