Browse Source

[Improvement][API] fix 6772 new api bug (#7098)

* add processDefinition releaseWorkflowAndSchedule

* add ProcessTaskRelationServiceImpl.moveTaskProcessRelation

* add dependency and subprocess judgement

* fix api bug

* fix codestyle
3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
b265297fc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java
  2. 120
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 249
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  4. 37
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  5. 7
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 91
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  7. 27
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  8. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java
  9. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  10. 16
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  11. 28
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  12. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java

@ -142,7 +142,7 @@ public class ProcessTaskRelationController extends BaseController {
}
/**
* delete process task relation
* delete process task relation (delete task from workflow)
*
* @param loginUser login user
* @param projectCode project code

120
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -233,7 +233,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenantId);
globalParams, locations, timeout, loginUser.getId(), tenantId);
processDefinition.setExecutionType(executionType);
return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs);
@ -300,8 +300,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), ProcessTaskRelation.class))
.collect(Collectors.toList());
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
@ -412,7 +412,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
for (ProcessDefinition pd : records) {
@ -591,7 +591,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
@ -685,11 +685,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
int delete = processDefinitionMapper.deleteById(processDefinition.getId());
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (delete == 0 || deleteRelation == 0) {
if (delete == 0) {
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (deleteRelation == 0) {
logger.warn("The process definition has not relation, it will be delete successfully");
}
putMsg(result, Status.SUCCESS);
return result;
}
@ -1156,7 +1159,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())));
List<TaskDefinitionLog> taskDefinitionList = processService.genTaskDefineList(processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()));
Map<Long, TaskDefinitionLog> taskDefinitionMap = taskDefinitionList.stream()
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
.collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog));
if (limit > processInstanceList.size()) {
limit = processInstanceList.size();
@ -1171,8 +1174,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessInstance processInstance = processInstanceList.get(i);
Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime();
parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), processInstance.getProcessDefinitionCode(),
"", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
"", processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(),
DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime())));
}
List<TreeViewDto> parentTreeViewDtoList = new ArrayList<>();
@ -1363,7 +1366,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
for (ProcessDefinition processDefinition : processDefinitionList) {
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<ProcessTaskRelationLog> taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
@ -1532,13 +1535,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create empty process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param tenantCode tenantCode
* @param timeout timeout
* @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @return process definition code
*/
@ -1584,7 +1587,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, "", timeout, loginUser.getId(), tenantId);
globalParams, "", timeout, loginUser.getId(), tenantId);
processDefinition.setExecutionType(executionType);
result = createEmptyDagDefine(loginUser, processDefinition);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
@ -1596,7 +1599,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// save dag schedule
Map<String, Object> scheduleResult = createDagSchedule(loginUser, project, processDefinition, scheduleJson);
Map<String, Object> scheduleResult = createDagSchedule(loginUser, processDefinition, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS);
putMsg(result, scheduleResultStatus);
@ -1617,10 +1620,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
private Map<String, Object> createDagSchedule(User loginUser,
Project project,
ProcessDefinition processDefinition,
String scheduleJson) {
private Map<String, Object> createDagSchedule(User loginUser, ProcessDefinition processDefinition, String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (scheduleObj == null) {
@ -1651,12 +1651,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode());
scheduleMapper.insert(scheduleObj);
/**
* updateProcessInstance receivers and cc by process definition id
*/
processDefinition.setWarningGroupId(scheduleObj.getWarningGroupId());
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
@ -1665,15 +1659,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition basic info
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param tenantCode tenantCode
* @param scheduleJson scheduleJson
* @param executionType executionType
* @return update result code
*/
@ -1728,7 +1722,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams, "", timeout, tenantId);
processDefinition.setExecutionType(executionType);
result = updateDagDefineBasicInfo(loginUser, processDefinition, processDefinitionDeepCopy);
List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, Lists.newArrayList());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -1746,26 +1741,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
private Map<String, Object> updateDagDefineBasicInfo(User loginUser,
ProcessDefinition processDefinition,
ProcessDefinition processDefinitionDeepCopy) {
Map<String, Object> result = new HashMap<>();
int insertVersion;
if (processDefinition.equals(processDefinitionDeepCopy)) {
insertVersion = processDefinitionDeepCopy.getVersion();
} else {
processDefinition.setUpdateTime(new Date());
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
}
if (insertVersion == 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
return result;
}
private Map<String, Object> updateDagSchedule(User loginUser,
long projectCode,
long processDefinitionCode,
@ -1791,24 +1766,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
param.setTimezoneId(schedule.getTimezoneId());
return schedulerService.updateScheduleByProcessDefinitionCode(
loginUser,
projectCode,
processDefinitionCode,
JSONUtils.toJsonString(param),
warningType,
warningGroupId,
failureStrategy,
processInstancePriority,
workerGroup,
environmentCode);
loginUser,
projectCode,
processDefinitionCode,
JSONUtils.toJsonString(param),
warningType,
warningGroupId,
failureStrategy,
processInstancePriority,
workerGroup,
environmentCode);
}
/**
* release process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState releaseState
* @return update result code
*/
@ -1846,6 +1821,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
scheduleObj.setReleaseState(ReleaseState.ONLINE);
scheduleMapper.updateById(scheduleObj);
break;
case OFFLINE:

249
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -90,11 +91,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
/**
* create process task relation
*
* @param loginUser login user
* @param projectCode project code
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode processDefinitionCode
* @param preTaskCode preTaskCode
* @param postTaskCode postTaskCode
* @param preTaskCode preTaskCode
* @param postTaskCode postTaskCode
* @return create result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@ -162,6 +163,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
if ((insert & insertLog) > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
return result;
}
@ -173,6 +177,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(userId);
@ -183,13 +189,14 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
/**
* move task to other processDefinition
*
* @param loginUser login user info
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param loginUser login user info
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param targetProcessDefinitionCode target process definition code
* @param taskCode the current task code (the post task code)
* @param taskCode the current task code (the post task code)
* @return move result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@ -263,12 +270,22 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
Date now = new Date();
ProcessTaskRelation processTaskRelation = upstreamList.get(0);
ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelation.setUpdateTime(now);
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
int update = processTaskRelationMapper.updateById(processTaskRelation);
if (update == 0) {
int updateLog = processTaskRelationLogMapper.updateById(processTaskRelationLog);
if (update == 0 || updateLog == 0) {
putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.MOVE_PROCESS_TASK_RELATION_ERROR);
} else {
putMsg(result, Status.SUCCESS);
}
return result;
}
@ -282,6 +299,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param taskCode the post task code
* @return delete result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> deleteTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@ -294,38 +312,48 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (null == taskDefinition) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
if (CollectionUtils.isNotEmpty(downstreamList)) {
Set<Long> postTaskCodes = downstreamList
.stream()
.map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet());
.stream()
.map(ProcessTaskRelation::getPostTaskCode)
.collect(Collectors.toSet());
putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
return result;
}
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setPreTaskCode(taskCode);
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
int deleteRelation = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
if (0 == deleteRelation) {
int deleteRelationLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if (0 == deleteRelation || 0 == deleteRelationLog) {
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (null == taskDefinition) {
putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
return result;
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
|| TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
int deleteTaskDefinition = taskDefinitionMapper.deleteByCode(taskCode);
if (0 == deleteTaskDefinition) {
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR);
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
}
putMsg(result, Status.SUCCESS);
return result;
}
@ -338,6 +366,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param taskCode the post task code
* @return delete result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> deleteUpstreamRelation(User loginUser, long projectCode, String preTaskCodes, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@ -347,11 +376,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
if (StringUtils.isEmpty(preTaskCodes)) {
putMsg(result,Status.DATA_IS_NULL,"preTaskCodes");
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result;
}
Set<Long> preTaskCodesSet = Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
Status status = deleteUpstreamRelation(projectCode, preTaskCodesSet.toArray(new Long[0]), taskCode);
Status status = deleteUpstreamRelation(loginUser.getId(), projectCode,
Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode);
if (status != Status.SUCCESS) {
putMsg(result, status);
}
@ -367,6 +396,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param taskCode the pre task code
* @return delete result code
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> deleteDownstreamRelation(User loginUser, long projectCode, String postTaskCodes, long taskCode) {
Project project = projectMapper.queryByCode(projectCode);
@ -376,26 +406,28 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
if (StringUtils.isEmpty(postTaskCodes)) {
putMsg(result,Status.DATA_IS_NULL,"postTaskCodes");
putMsg(result, Status.DATA_IS_NULL, "postTaskCodes");
return result;
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode);
Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
processTaskRelationList.stream()
.map(ProcessTaskRelationLog::new)
.collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<Long> deleteFailedCodeList = new ArrayList<>();
postTaskCodesSet.stream().forEach(
postTaskCode -> {
try {
Status status = deleteUpstreamRelation(projectCode, new Long[]{taskCode}, postTaskCode);
if (Status.SUCCESS != status) {
deleteFailedCodeList.add(postTaskCode);
}
} catch (Exception e) {
deleteFailedCodeList.add(postTaskCode);
}
}
);
if (!deleteFailedCodeList.isEmpty()) {
putMsg(result, Status.DELETE_TASK_PROCESS_RELATION_ERROR, String.join(",", deleteFailedCodeList.stream().map(o -> o + "").collect(Collectors.toList())));
int delete = 0;
int deleteLog = 0;
for (long postTaskCode : postTaskCodesSet) {
ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode);
if (processTaskRelationLog != null) {
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
}
}
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
putMsg(result, Status.SUCCESS);
}
return result;
}
@ -420,15 +452,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
Set<TaskDefinition> taskDefinitions = processTaskRelationList
.stream()
.map(processTaskRelation -> {
TaskDefinition taskDefinition = buildTaskDefinition();
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
taskDefinition.setCode(processTaskRelation.getPreTaskCode());
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
return taskDefinition;
})
.collect(Collectors.toSet());
.stream()
.map(processTaskRelation -> {
TaskDefinition taskDefinition = buildTaskDefinition();
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
taskDefinition.setCode(processTaskRelation.getPreTaskCode());
taskDefinition.setVersion(processTaskRelation.getPreTaskVersion());
return taskDefinition;
})
.collect(Collectors.toSet());
taskDefinitionLogList = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
}
result.put(Constants.DATA_LIST, taskDefinitionLogList);
@ -456,15 +488,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(processTaskRelationList)) {
Set<TaskDefinition> taskDefinitions = processTaskRelationList
.stream()
.map(processTaskRelation -> {
TaskDefinition taskDefinition = buildTaskDefinition();
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
taskDefinition.setCode(processTaskRelation.getPostTaskCode());
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
return taskDefinition;
})
.collect(Collectors.toSet());
.stream()
.map(processTaskRelation -> {
TaskDefinition taskDefinition = buildTaskDefinition();
taskDefinition.setProjectCode(processTaskRelation.getProjectCode());
taskDefinition.setCode(processTaskRelation.getPostTaskCode());
taskDefinition.setVersion(processTaskRelation.getPostTaskVersion());
return taskDefinition;
})
.collect(Collectors.toSet());
taskDefinitionLogList = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitions);
}
result.put(Constants.DATA_LIST, taskDefinitionLogList);
@ -478,8 +510,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @param preTaskCode pre task code
* @param postTaskCode post task code
* @return delete result code
*/
@Override
@ -501,7 +533,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
ProcessTaskRelation processTaskRelation = processTaskRelationList.get(0);
int upstreamCount = processTaskRelationMapper.countByCode(projectCode, processTaskRelation.getProcessDefinitionCode(),
0L, processTaskRelation.getPostTaskCode());
0L, processTaskRelation.getPostTaskCode());
if (upstreamCount == 0) {
putMsg(result, Status.DATA_IS_NULL, "upstreamCount");
@ -541,8 +573,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
TaskDefinition that = (TaskDefinition) o;
return getCode() == that.getCode()
&& getVersion() == that.getVersion()
&& getProjectCode() == that.getProjectCode();
&& getVersion() == that.getVersion()
&& getProjectCode() == that.getProjectCode();
}
@Override
@ -560,52 +592,61 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param taskCode pre task code
* @return status
*/
private Status deleteUpstreamRelation(long projectCode, Long[] preTaskCodes, long taskCode) {
private Status deleteUpstreamRelation(int userId, long projectCode, Long[] preTaskCodes, long taskCode) {
List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
if (CollectionUtils.isEmpty(upstreamList)) {
return Status.SUCCESS;
}
Map<Long, List<ProcessTaskRelation>> processTaskRelationListGroupByProcessDefinitionCode = upstreamList.stream()
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
List<ProcessTaskRelationLog> upstreamLogList = new ArrayList<>();
Date now = new Date();
for (ProcessTaskRelation processTaskRelation : upstreamList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(userId);
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
upstreamLogList.add(processTaskRelationLog);
}
Map<Long, List<ProcessTaskRelationLog>> processTaskRelationListGroupByProcessDefinitionCode = upstreamLogList.stream()
.collect(Collectors.groupingBy(ProcessTaskRelationLog::getProcessDefinitionCode));
// count upstream relation group by process definition code
List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = processTaskRelationMapper
.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode);
List<ProcessTaskRelation> deletes = new ArrayList<>();
List<ProcessTaskRelation> updates = new ArrayList<>();
countListGroupByProcessDefinitionCode.stream().forEach(
processDefinitionCodeUpstreamCountMap ->
processDefinitionCodeUpstreamCountMap.entrySet().stream().forEach(
o -> {
Long processDefinitionCode = o.getKey();
Integer count = o.getValue();
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
if (count <= processTaskRelationList.size()) {
ProcessTaskRelation processTaskRelation = processTaskRelationList.remove(0);
if (processTaskRelation.getPreTaskCode() != 0) {
processTaskRelation.setPreTaskCode(0);
processTaskRelation.setPreTaskVersion(0);
updates.add(processTaskRelation);
}
}
if (!processTaskRelationList.isEmpty()) {
deletes.addAll(processTaskRelationList);
}
}
)
);
int update = 0;
if (!updates.isEmpty()) {
update = processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(updates);
List<Map<String, Long>> countListGroupByProcessDefinitionCode = processTaskRelationMapper
.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, processTaskRelationListGroupByProcessDefinitionCode.keySet().toArray(new Long[0]), taskCode);
List<ProcessTaskRelationLog> deletes = new ArrayList<>();
List<ProcessTaskRelationLog> updates = new ArrayList<>();
for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) {
long processDefinitionCode = codeCountMap.get("processDefinitionCode");
long countValue = codeCountMap.get("countValue");
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
if (countValue <= processTaskRelationLogList.size()) {
ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0);
if (processTaskRelationLog.getPreTaskCode() != 0) {
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
updates.add(processTaskRelationLog);
}
}
if (!processTaskRelationLogList.isEmpty()) {
deletes.addAll(processTaskRelationLogList);
}
}
deletes.addAll(updates);
int delete = 0;
if (!deletes.isEmpty()) {
delete = processTaskRelationMapper.deleteBatchIds(deletes.stream().map(ProcessTaskRelation::getId).collect(Collectors.toList()));
int deleteLog = 0;
for (ProcessTaskRelationLog processTaskRelationLog : deletes) {
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
}
if (update < 0 || delete < 0) {
return Status.DELETE_TASK_PROCESS_RELATION_ERROR;
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
if (!updates.isEmpty()) {
int insert = processTaskRelationMapper.batchInsert(updates);
int insertLog = processTaskRelationLogMapper.batchInsert(updates);
if ((insert & insertLog) == 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
}
return Status.SUCCESS;
}

37
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -89,6 +91,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private ProcessService processService;
@ -255,7 +260,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionToUpdate.setId(taskDefinition.getId());
taskDefinitionToUpdate.setProjectCode(projectCode);
taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
taskDefinitionToUpdate.setVersion(version + 1);
taskDefinitionToUpdate.setVersion(++version);
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
taskDefinitionToUpdate.setUpdateTime(now);
@ -268,6 +273,36 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
int delete = 0;
int deleteLog = 0;
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
}
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLogList.add(processTaskRelationLog);
}
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
if ((insertRelation & insertRelationLog) == 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
}
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update);
return result;

7
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -429,6 +429,13 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setPostTaskCode(123L);
processTaskRelationList.add(processTaskRelation);
Mockito.when(processService.findRelationByCode(projectCode, 46L)).thenReturn(processTaskRelationList);
Map<String, Object> onlineRes = processDefinitionService.releaseProcessDefinition(
loginUser, projectCode, 46, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));

91
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -192,13 +192,13 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationUpstream1.setPostTaskCode(taskCode);
processTaskRelationUpstream1.setPreTaskVersion(1);
processTaskRelationUpstream1.setPreTaskCode(123);
processTaskRelationUpstream0.setProcessDefinitionCode(124);
processTaskRelationUpstream1.setProcessDefinitionCode(124);
processTaskRelationUpstream1.setProjectCode(projectCode);
ProcessTaskRelation processTaskRelationUpstream2 = new ProcessTaskRelation();
processTaskRelationUpstream2.setPostTaskCode(taskCode);
processTaskRelationUpstream2.setPreTaskVersion(2);
processTaskRelationUpstream1.setPreTaskCode(123);
processTaskRelationUpstream0.setProcessDefinitionCode(125);
processTaskRelationUpstream2.setPreTaskCode(123);
processTaskRelationUpstream2.setProcessDefinitionCode(125);
processTaskRelationUpstream2.setProjectCode(projectCode);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelationUpstream0);
@ -238,6 +238,7 @@ public class ProcessTaskRelationServiceTest {
processDefinition.setTenantId(1);
processDefinition.setDescription("");
processDefinition.setCode(1L);
processDefinition.setVersion(1);
return processDefinition;
}
@ -312,8 +313,17 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(1);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation)).thenReturn(processTaskRelationLog);
Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.updateById(processTaskRelationLog)).thenReturn(1);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@ -435,27 +445,17 @@ public class ProcessTaskRelationServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode);
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new ArrayList<>();
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(123L, 2);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(124L, 1);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
{
put(125L, 3);
}
});
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3);
Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3);
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(1L);
processTaskRelation.setPreTaskCode(taskCode);
processTaskRelation.setPostTaskCode(123L);
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryDownstreamByCode(projectCode, taskCode)).thenReturn(processTaskRelationList);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@ -473,27 +473,35 @@ public class ProcessTaskRelationServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskUpstreamRelationList(projectCode, taskCode);
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(processTaskRelationList);
List<Map<Long, Integer>> countListGroupByProcessDefinitionCode = new ArrayList<>();
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
Mockito.when(processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, new Long[]{123L})).thenReturn(Lists.newArrayList());
List<Map<String, Long>> countListGroupByProcessDefinitionCode = new ArrayList<>();
countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
{
put(123L, 2);
put("processDefinitionCode", 123L);
put("countValue", 2L);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
{
put(124L, 1);
put("processDefinitionCode", 124L);
put("countValue", 1L);
}
});
countListGroupByProcessDefinitionCode.add(new HashMap<Long, Integer>() {
countListGroupByProcessDefinitionCode.add(new HashMap<String, Long>() {
{
put(125L, 3);
put("processDefinitionCode", 125L);
put("countValue", 3L);
}
});
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(2);
Mockito.when(processTaskRelationMapper.countUpstreamByCodeGroupByProcessDefinitionCode(projectCode, new Long[]{123L, 124L, 125L}, 2)).thenReturn(countListGroupByProcessDefinitionCode);
Mockito.when(processTaskRelationMapper.batchUpdateProcessTaskRelationPreTask(new ArrayList())).thenReturn(3);
Mockito.when(processTaskRelationMapper.deleteBatchIds(new ArrayList())).thenReturn(3);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Map<String, Object> result1 = processTaskRelationService.deleteUpstreamRelation(loginUser, projectCode, "123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
}
@ -501,8 +509,8 @@ public class ProcessTaskRelationServiceTest {
@Test
public void testDeleteTaskProcessRelation() {
long projectCode = 1L;
long taskCode = 2L;
long processDefinitionCode = 3L;
long taskCode = 1L;
long processDefinitionCode = 1L;
long preTaskCode = 4L;
long postTaskCode = 5L;
Project project = getProject(projectCode);
@ -520,10 +528,19 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationLog.setPreTaskCode(taskCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTaskType(TaskType.CONDITIONS.getDesc());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
Mockito.when(taskDefinitionMapper.deleteByCode(taskCode)).thenReturn(1);
processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(1);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}

27
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -42,7 +43,6 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
@ -50,7 +50,6 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@Ignore
@RunWith(MockitoJUnitRunner.class)
public class TaskDefinitionServiceImplTest {
@ -74,7 +73,6 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
;
@Test
public void createTaskDefinition() {
@ -166,6 +164,7 @@ public class TaskDefinitionServiceImplTest {
@Test
public void deleteTaskDefinitionByCode() {
long projectCode = 1L;
long taskCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@ -177,13 +176,14 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryByTaskCode(Mockito.anyLong()))
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
Mockito.when(processTaskRelationMapper.queryDownstreamByTaskCode(taskCode))
.thenReturn(new ArrayList<>());
Mockito.when(taskDefinitionMapper.deleteByCode(Mockito.anyLong()))
Mockito.when(taskDefinitionMapper.deleteByCode(taskCode))
.thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.deleteTaskDefinitionByCode(loginUser, projectCode, Mockito.anyLong());
.deleteTaskDefinitionByCode(loginUser, projectCode, taskCode);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@ -241,6 +241,15 @@ public class TaskDefinitionServiceImplTest {
return project;
}
private TaskDefinition getTaskDefinition() {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(1L);
taskDefinition.setCode(1L);
taskDefinition.setVersion(1);
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
return taskDefinition;
}
@Test
public void checkJson() {
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
@ -305,12 +314,8 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> onlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS));
// process definition online, resource does not exist
Map<String, Object> onlineResResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.ONLINE);
Assert.assertEquals(Status.SUCCESS, onlineResResult.get(Constants.STATUS));
// release error code
Map<String, Object> failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
}
}
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.ibatis.annotations.Param;
@ -65,4 +66,12 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper<ProcessTaskRela
* @return int
*/
int deleteRelation(@Param("processTaskRelationLog") ProcessTaskRelationLog processTaskRelationLog);
/**
* query process task relation log
*
* @param processTaskRelation processTaskRelation
* @return process task relation log
*/
ProcessTaskRelationLog queryRelationLogByRelation(@Param("processTaskRelation") ProcessTaskRelation processTaskRelation);
}

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -102,16 +102,6 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
List<ProcessTaskRelation> queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode);
/**
* query task relation by codes
*
* @param projectCode projectCode
* @param taskCode taskCode
* @param postTaskCodes postTaskCodes list
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryDownstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("postTaskCodes") Long[] postTaskCodes);
/**
* query task relation by codes
*
@ -130,8 +120,9 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processDefinitionCodes processDefinitionCodes
* @return upstream count list group by process definition code
*/
List<Map<Long, Integer>> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCodes") Long[] processDefinitionCodes, @Param("taskCode") long taskCode);
List<Map<String, Long>> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode,
@Param("processDefinitionCodes") Long[] processDefinitionCodes,
@Param("taskCode") long taskCode);
/**
* batch update process task relation pre task
@ -176,4 +167,5 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
@Param("processDefinitionCode") long processDefinitionCode,
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);
}

16
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -51,9 +51,23 @@
WHERE project_code = #{processTaskRelationLog.projectCode}
and process_definition_code = #{processTaskRelationLog.processDefinitionCode}
and process_definition_version = #{processTaskRelationLog.processDefinitionVersion}
<if test="processTaskRelationLog.preTaskCode !=0 and processTaskRelationLog.preTaskVersion != 0">
and pre_task_code = #{processTaskRelationLog.preTaskCode}
and pre_task_version = #{processTaskRelationLog.preTaskVersion}
</if>
and post_task_code = #{processTaskRelationLog.postTaskCode}
and post_task_version = #{processTaskRelationLog.post_task_version}
and post_task_version = #{processTaskRelationLog.postTaskVersion}
</delete>
<select id="queryRelationLogByRelation" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog">
select
<include refid="baseSql"/>
from t_ds_process_task_relation_log
WHERE project_code = #{processTaskRelation.projectCode}
and process_definition_code = #{processTaskRelation.processDefinitionCode}
and process_definition_version = #{processTaskRelation.processDefinitionVersion}
and pre_task_code = #{processTaskRelation.preTaskCode}
and pre_task_version = #{processTaskRelation.preTaskVersion}
and post_task_code = #{processTaskRelation.postTaskCode}
and post_task_version = #{processTaskRelation.postTaskVersion}
</select>
</mapper>

28
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -92,20 +92,6 @@
and post_task_code = #{taskCode}
</select>
<select id="queryDownstreamByCodes" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and pre_task_code = #{taskCode}
<if test="postTaskCodes != null and postTaskCodes.length != 0">
and post_task_code in
<foreach collection="postTaskCodes" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</if>
</select>
<select id="queryUpstreamByCodes" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
@ -120,9 +106,9 @@
</if>
</select>
<select id="countUpstreamByCodeGroupByProcessDefinitionCode" resultType="java.lang.Integer">
select process_definition_code,
count(0)
<select id="countUpstreamByCodeGroupByProcessDefinitionCode" resultType="java.util.HashMap">
select process_definition_code as processDefinitionCode,
count(0) as countValue
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and post_task_code = #{taskCode}
@ -172,10 +158,12 @@
WHERE project_code = #{processTaskRelationLog.projectCode}
and process_definition_code = #{processTaskRelationLog.processDefinitionCode}
and process_definition_version = #{processTaskRelationLog.processDefinitionVersion}
and pre_task_code = #{processTaskRelationLog.preTaskCode}
and pre_task_version = #{processTaskRelationLog.preTaskVersion}
<if test="processTaskRelationLog.preTaskCode !=0 and processTaskRelationLog.preTaskVersion != 0">
and pre_task_code = #{processTaskRelationLog.preTaskCode}
and pre_task_version = #{processTaskRelationLog.preTaskVersion}
</if>
and post_task_code = #{processTaskRelationLog.postTaskCode}
and post_task_version = #{processTaskRelationLog.post_task_version}
and post_task_version = #{processTaskRelationLog.postTaskVersion}
</delete>
<select id="countByCode" resultType="int">

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2310,6 +2310,9 @@ public class ProcessService {
*/
public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion,
List<ProcessTaskRelationLog> taskRelationList, List<TaskDefinitionLog> taskDefinitionLogs) {
if (taskRelationList.isEmpty()) {
return Constants.EXIT_CODE_SUCCESS;
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()

Loading…
Cancel
Save