Browse Source

[Fix] openapi version inconsistency when updating workflows, tasks and relationship (#13094)

Co-authored-by: xiangzihao <460888207@qq.com>
3.2.0-release
insist777 2 years ago committed by GitHub
parent
commit
ca5af013ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java
  3. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java
  5. 107
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  6. 135
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  7. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  8. 14
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  9. 108
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationV2Controller.java

@ -125,7 +125,8 @@ public class ProcessTaskRelationV2Controller extends BaseController {
@PathVariable("code") Long code, @PathVariable("code") Long code,
@RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { @RequestBody TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
List<ProcessTaskRelation> processTaskRelations = processTaskRelationService List<ProcessTaskRelation> processTaskRelations = processTaskRelationService
.updateUpstreamTaskDefinition(loginUser, code, taskRelationUpdateUpstreamRequest); .updateUpstreamTaskDefinitionWithSyncDag(loginUser, code, Boolean.TRUE,
taskRelationUpdateUpstreamRequest);
return Result.success(processTaskRelations); return Result.success(processTaskRelations);
} }
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflow/WorkflowUpdateRequest.java

@ -103,8 +103,6 @@ public class WorkflowUpdateRequest {
processDefinitionDeepCopy.setLocations(this.location); processDefinitionDeepCopy.setLocations(this.location);
} }
int version = processDefinitionDeepCopy.getVersion() + 1;
processDefinitionDeepCopy.setVersion(version);
processDefinitionDeepCopy.setUpdateTime(new Date()); processDefinitionDeepCopy.setUpdateTime(new Date());
return processDefinitionDeepCopy; return processDefinitionDeepCopy;
} }

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -381,6 +381,7 @@ public enum Status {
"批量创建工作流任务关系 {0} 错误"), "批量创建工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error", PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
"批量修改工作流任务关系错误"), "批量修改工作流任务关系错误"),
UPSTREAM_TASK_NOT_EXISTS(50071, "upstream task want to set dependence do not exists {0}", "指定的上游任务 {0} 不存在"),
WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation", WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation",
"工作流实例未结束,不能执行此操作"), "工作流实例未结束,不能执行此操作"),

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationService.java

@ -86,11 +86,13 @@ public interface ProcessTaskRelationService {
* *
* @param loginUser login user * @param loginUser login user
* @param taskCode relation upstream code * @param taskCode relation upstream code
* @param needSyncDag needSyncDag
* @param taskRelationUpdateUpstreamRequest relation downstream code * @param taskRelationUpdateUpstreamRequest relation downstream code
*/ */
List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser, List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
long taskCode, long taskCode,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest); Boolean needSyncDag,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest);
/** /**
* delete task upstream relation * delete task upstream relation

107
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_MOVE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VERSION_LIST;
@ -2775,12 +2776,110 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
processDefinitionUpdate.setTenantId(tenant.getId()); processDefinitionUpdate.setTenantId(tenant.getId());
} }
int update = processDefinitionMapper.updateById(processDefinitionUpdate); int insertVersion = this.saveProcessDefine(loginUser, processDefinitionUpdate);
if (update <= 0) { if (insertVersion == 0) {
logger.error("Update process definition error, projectCode:{}, processDefinitionName:{}.",
processDefinitionUpdate.getCode(),
processDefinitionUpdate.getName());
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
this.syncObj2Log(loginUser, processDefinition);
return processDefinition; int insertRelationVersion = this.saveTaskRelation(loginUser, processDefinitionUpdate, insertVersion);
if (insertRelationVersion != Constants.EXIT_CODE_SUCCESS) {
logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
processDefinitionUpdate.setVersion(insertVersion);
return processDefinitionUpdate;
}
public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
processDefinitionLog.setVersion(insertVersion);
processDefinition.setVersion(insertVersion);
processDefinitionLog.setOperator(loginUser.getId());
processDefinition.setUserId(loginUser.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
processDefinition.setUpdateTime(processDefinition.getUpdateTime());
processDefinitionLog.setId(null);
int result = processDefinitionMapper.updateById(processDefinition);
int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
processDefinitionLog.setId(processDefinition.getId());
return (insertLog & result) > 0 ? insertVersion : 0;
}
public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
int processDefinitionVersion) {
long projectCode = processDefinition.getProjectCode();
long processDefinitionCode = processDefinition.getCode();
List<ProcessTaskRelation> taskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelationLog> taskRelationList =
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
List<Long> taskCodeList =
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
if (taskRelationList.isEmpty()) {
return Constants.EXIT_CODE_SUCCESS;
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
if (taskDefinitionLogMap != null) {
TaskDefinitionLog preTaskDefinitionLog =
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
if (preTaskDefinitionLog != null) {
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
}
TaskDefinitionLog postTaskDefinitionLog =
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
if (postTaskDefinitionLog != null) {
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
}
}
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
}
if (!taskRelations.isEmpty()) {
Set<Integer> processTaskRelationSet =
taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
Set<Integer> taskRelationSet =
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
boolean isSame = CollectionUtils.isEqualCollection(processTaskRelationSet,
taskRelationSet);
if (isSame) {
logger.info("process task relations is non-existent, projectCode:{}, processCode:{}.",
processDefinition.getProjectCode(), processDefinition.getCode());
return Constants.EXIT_CODE_SUCCESS;
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
}
List<ProcessTaskRelation> processTaskRelations =
taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
int insert = processTaskRelationMapper.batchInsert(processTaskRelations);
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
} }
protected Map<String, Object> updateDagSchedule(User loginUser, protected Map<String, Object> updateDagSchedule(User loginUser,

135
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
@ -31,12 +32,14 @@ import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
@ -93,6 +96,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
@Autowired @Autowired
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@ -402,13 +408,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* *
* @param loginUser login user * @param loginUser login user
* @param taskCode relation upstream code * @param taskCode relation upstream code
* @param needSyncDag needSyncDag
* @param taskRelationUpdateUpstreamRequest relation downstream code * @param taskRelationUpdateUpstreamRequest relation downstream code
*/ */
@Override @Override
@Transactional @Transactional
public List<ProcessTaskRelation> updateUpstreamTaskDefinition(User loginUser, public List<ProcessTaskRelation> updateUpstreamTaskDefinitionWithSyncDag(User loginUser,
long taskCode, long taskCode,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) { Boolean needSyncDag,
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest) {
TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode); TaskDefinition downstreamTask = taskDefinitionMapper.queryByCode(taskCode);
if (downstreamTask == null) { if (downstreamTask == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode); throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskCode);
@ -436,17 +444,24 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR,
taskRelationUpdateUpstreamRequest.toString()); taskRelationUpdateUpstreamRequest.toString());
} }
processDefinition.setUpdateTime(new Date());
int insertVersion = processDefinition.getVersion();
if (needSyncDag) {
insertVersion =
this.saveProcessDefine(loginUser, processDefinition);
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
}
// get new relation to create and out of date relation to delete // get new relation to create and out of date relation to delete
List<Long> taskCodeCreates = upstreamTaskCodes List<Long> taskCodeCreates = upstreamTaskCodes
.stream() .stream()
.filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch( .filter(upstreamTaskCode -> processTaskRelationExists.stream().noneMatch(
processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode)) processTaskRelationExist -> processTaskRelationExist.getPreTaskCode() == upstreamTaskCode))
.collect(Collectors.toList()); .collect(Collectors.toList());
List<Long> taskCodeDeletes = processTaskRelationExists List<Integer> taskCodeDeletes = processTaskRelationExists.stream()
.stream() .filter(ptr -> !upstreamTaskCodes.contains(ptr.getPreTaskCode()))
.map(ProcessTaskRelation::getPreTaskCode) .map(ProcessTaskRelation::getId)
.filter(preTaskCode -> !upstreamTaskCodes.contains(preTaskCode))
.collect(Collectors.toList()); .collect(Collectors.toList());
// delete relation not exists // delete relation not exists
@ -460,10 +475,20 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
// create relation not exists // create relation not exists
List<ProcessTaskRelation> processTaskRelations = new ArrayList<>(); List<ProcessTaskRelation> processTaskRelations = new ArrayList<>();
for (long createCode : taskCodeCreates) { for (long createCode : taskCodeCreates) {
TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode); long upstreamCode = 0L;
int version = 0;
if (createCode != 0L) {
// 0 for DAG root, should not, it may already exists and skip to create anymore
TaskDefinition upstreamTask = taskDefinitionMapper.queryByCode(createCode);
if (upstreamTask == null) {
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, createCode);
}
upstreamCode = upstreamTask.getCode();
version = upstreamTask.getVersion();
}
ProcessTaskRelation processTaskRelationCreate = ProcessTaskRelation processTaskRelationCreate =
new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(), new ProcessTaskRelation(null, processDefinition.getVersion(), downstreamTask.getProjectCode(),
processDefinition.getCode(), upstreamTask.getCode(), upstreamTask.getVersion(), processDefinition.getCode(), upstreamCode, version,
downstreamTask.getCode(), downstreamTask.getVersion(), null, null); downstreamTask.getCode(), downstreamTask.getVersion(), null, null);
processTaskRelations.add(processTaskRelationCreate); processTaskRelations.add(processTaskRelationCreate);
} }
@ -473,10 +498,98 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
} }
// batch sync to process task relation log // batch sync to process task relation log
this.batchPersist2ProcessTaskRelationLog(loginUser, processTaskRelations); int saveTaskRelationResult = saveTaskRelation(loginUser, processDefinition, insertVersion);
if (saveTaskRelationResult != Constants.EXIT_CODE_SUCCESS) {
logger.error("Save process task relations error, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
logger.info("Save process task relations complete, projectCode:{}, processCode:{}, processVersion:{}.",
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
processTaskRelations.get(0).setProcessDefinitionVersion(insertVersion);
return processTaskRelations; return processTaskRelations;
} }
public int saveTaskRelation(User loginUser, ProcessDefinition processDefinition,
int processDefinitionVersion) {
long projectCode = processDefinition.getProjectCode();
long processDefinitionCode = processDefinition.getCode();
List<ProcessTaskRelation> taskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelationLog> taskRelationList =
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
List<Long> taskCodeList =
taskRelations.stream().map(ProcessTaskRelation::getPostTaskCode).collect(Collectors.toList());
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(taskCodeList);
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitions.stream().map(TaskDefinitionLog::new).collect(Collectors.toList());
if (taskRelationList.isEmpty()) {
return Constants.EXIT_CODE_SUCCESS;
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode);
processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion);
if (taskDefinitionLogMap != null) {
TaskDefinitionLog preTaskDefinitionLog =
taskDefinitionLogMap.get(processTaskRelationLog.getPreTaskCode());
if (preTaskDefinitionLog != null) {
processTaskRelationLog.setPreTaskVersion(preTaskDefinitionLog.getVersion());
}
TaskDefinitionLog postTaskDefinitionLog =
taskDefinitionLogMap.get(processTaskRelationLog.getPostTaskCode());
if (postTaskDefinitionLog != null) {
processTaskRelationLog.setPostTaskVersion(postTaskDefinitionLog.getVersion());
}
}
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
}
if (CollectionUtils.isNotEmpty(taskRelations)) {
Set<Integer> processTaskRelationSet =
taskRelations.stream().map(ProcessTaskRelation::hashCode).collect(toSet());
Set<Integer> taskRelationSet =
taskRelationList.stream().map(ProcessTaskRelationLog::hashCode).collect(toSet());
boolean isSame = org.apache.commons.collections.CollectionUtils.isEqualCollection(processTaskRelationSet,
taskRelationSet);
if (isSame) {
return Constants.EXIT_CODE_SUCCESS;
}
processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode);
}
List<ProcessTaskRelation> processTaskRelations =
taskRelationList.stream().map(ProcessTaskRelation::new).collect(Collectors.toList());
int insert = processTaskRelationMapper.batchInsert(processTaskRelations);
int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList);
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
public int saveProcessDefine(User loginUser, ProcessDefinition processDefinition) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode());
int insertVersion = version == null || version == 0 ? Constants.VERSION_FIRST : version + 1;
processDefinitionLog.setVersion(insertVersion);
processDefinitionLog.setOperator(loginUser.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
processDefinitionLog.setId(null);
int insertLog = processDefinitionLogMapper.insert(processDefinitionLog);
processDefinitionLog.setId(processDefinition.getId());
int result = processDefinitionMapper.updateById(processDefinitionLog);
return (insertLog & result) > 0 ? insertVersion : 0;
}
private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition, private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
List<ProcessTaskRelation> processTaskRelationList) { List<ProcessTaskRelation> processTaskRelationList) {
List<ProcessTaskRelationLog> relationLogs = List<ProcessTaskRelationLog> relationLogs =

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -214,8 +214,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
String upstreamCodes) { String upstreamCodes) {
TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest(); TaskRelationUpdateUpstreamRequest taskRelationUpdateUpstreamRequest = new TaskRelationUpdateUpstreamRequest();
taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode); taskRelationUpdateUpstreamRequest.setWorkflowCode(workflowCode);
taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes); if (upstreamCodes != null) {
return processTaskRelationService.updateUpstreamTaskDefinition(user, taskCode, taskRelationUpdateUpstreamRequest.setUpstreams(upstreamCodes);
}
return processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(user, taskCode, Boolean.FALSE,
taskRelationUpdateUpstreamRequest); taskRelationUpdateUpstreamRequest);
} }
@ -498,9 +500,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} }
} }
private void updateDag(User loginUser, long processDefinitionCode, public void updateDag(User loginUser, long processDefinitionCode,
List<ProcessTaskRelation> processTaskRelationList, List<ProcessTaskRelation> processTaskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) { List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) { if (processDefinition == null) {
logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode); logger.error("Process definition does not exist, processDefinitionCode:{}.", processDefinitionCode);
@ -625,6 +627,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<ProcessTaskRelation> taskRelationList = List<ProcessTaskRelation> taskRelationList =
processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode); processTaskRelationMapper.queryUpstreamByCode(taskDefinitionUpdate.getProjectCode(), taskCode);
if (CollectionUtils.isNotEmpty(taskRelationList)) { if (CollectionUtils.isNotEmpty(taskRelationList)) {
logger.info( logger.info(
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
@ -634,10 +637,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
.queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode); .queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog)); updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog));
} }
this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(), this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(),
taskUpdateRequest.getUpstreamTasksCodes()); taskUpdateRequest.getUpstreamTasksCodes());
return taskDefinitionUpdate; return taskDefinitionUpdate;
} }

14
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -1068,11 +1068,23 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
((ServiceException) exception).getCode()); ((ServiceException) exception).getCode());
// success // success
Mockito.when(processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinition.getCode()))
.thenReturn(processDefinition.getVersion());
Mockito.when(processDefinitionMapper.updateById(isA(ProcessDefinition.class))).thenReturn(1); Mockito.when(processDefinitionMapper.updateById(isA(ProcessDefinition.class))).thenReturn(1);
ProcessDefinition processDefinitionUpdate = ProcessDefinition processDefinitionUpdate =
processDefinitionService.updateSingleProcessDefinition(user, processDefinitionCode, processDefinitionService.updateSingleProcessDefinition(user, processDefinitionCode,
workflowUpdateRequest); workflowUpdateRequest);
Assertions.assertEquals(processDefinition, processDefinitionUpdate); Assertions.assertNotNull(processDefinitionUpdate);
// check version
Assertions.assertEquals(processDefinition.getVersion() + 1, processDefinitionUpdate.getVersion());
}
@Test
public void testCheckVersion() {
WorkflowFilterRequest workflowFilterRequest = new WorkflowFilterRequest();
workflowFilterRequest.setWorkflowName(name);
} }
@Test @Test

108
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest; import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
@ -40,18 +41,23 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.process.ProcessServiceImpl;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import java.text.MessageFormat; import java.text.MessageFormat;
@ -91,9 +97,18 @@ public class TaskDefinitionServiceImplTest {
@Mock @Mock
private ProjectServiceImpl projectService; private ProjectServiceImpl projectService;
@InjectMocks
private ProcessServiceImpl processServiceImpl;
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Mock
private ProcessDefinitionLogMapper processDefineLogMapper;
@Mock
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Mock @Mock
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@ -429,11 +444,15 @@ public class TaskDefinitionServiceImplTest {
// success // success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class), Mockito.when(
isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList()); processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class), Mockito.when(processDefinitionService.updateSingleProcessDefinition(isA(User.class), isA(Long.class),
isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition()); isA(WorkflowUpdateRequest.class))).thenReturn(getProcessDefinition());
Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest)); Assertions.assertDoesNotThrow(() -> taskDefinitionService.createTaskDefinitionV2(user, taskCreateRequest));
} }
@Test @Test
@ -500,10 +519,54 @@ public class TaskDefinitionServiceImplTest {
// success // success
Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1); Mockito.when(taskDefinitionLogMapper.insert(isA(TaskDefinitionLog.class))).thenReturn(1);
// we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService // we do not test updateUpstreamTaskDefinition, because it should be tested in processTaskRelationService
Mockito.when(processTaskRelationService.updateUpstreamTaskDefinition(isA(User.class), isA(Long.class), Mockito.when(
isA(TaskRelationUpdateUpstreamRequest.class))).thenReturn(getProcessTaskRelationList()); processTaskRelationService.updateUpstreamTaskDefinitionWithSyncDag(isA(User.class), isA(Long.class),
isA(Boolean.class),
isA(TaskRelationUpdateUpstreamRequest.class)))
.thenReturn(getProcessTaskRelationList());
Assertions.assertDoesNotThrow( Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest)); () -> taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest));
TaskDefinition taskDefinition =
taskDefinitionService.updateTaskDefinitionV2(user, TASK_CODE, taskUpdateRequest);
Assertions.assertEquals(getTaskDefinition().getVersion() + 1, taskDefinition.getVersion());
}
@Test
public void testUpdateDag() {
User loginUser = getLoginUser();
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setId(null);
List<ProcessTaskRelation> processTaskRelationList = getProcessTaskRelationList();
TaskDefinitionLog taskDefinitionLog = getTaskDefinitionLog();
ArrayList<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
taskDefinitionLogs.add(taskDefinitionLog);
Integer version = 1;
Mockito.when(processDefinitionMapper.queryByCode(isA(long.class))).thenReturn(processDefinition);
// saveProcessDefine
Mockito.when(processDefineLogMapper.queryMaxVersionForDefinition(isA(long.class))).thenReturn(version);
Mockito.when(processDefineLogMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
Mockito.when(processDefinitionMapper.insert(isA(ProcessDefinitionLog.class))).thenReturn(1);
int insertVersion =
processServiceImpl.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE))
.thenReturn(insertVersion);
Assertions.assertEquals(insertVersion, version + 1);
// saveTaskRelation
List<ProcessTaskRelationLog> processTaskRelationLogList = getProcessTaskRelationLogList();
Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getProjectCode()),
eq(processDefinition.getCode()))).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1);
int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, processTaskRelationLogList, taskDefinitionLogs,
Boolean.TRUE);
Assertions.assertEquals(Constants.EXIT_CODE_SUCCESS, insertResult);
Assertions.assertDoesNotThrow(
() -> taskDefinitionService.updateDag(loginUser, processDefinition.getCode(), processTaskRelationList,
taskDefinitionLogs));
} }
@Test @Test
@ -528,6 +591,17 @@ public class TaskDefinitionServiceImplTest {
Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE)); Assertions.assertDoesNotThrow(() -> taskDefinitionService.getTaskDefinition(user, TASK_CODE));
} }
/**
* create admin user
*/
private User getLoginUser() {
User loginUser = new User();
loginUser.setUserType(UserType.GENERAL_USER);
loginUser.setUserName("admin");
loginUser.setId(1);
return loginUser;
}
/** /**
* get mock Project * get mock Project
* *
@ -563,6 +637,19 @@ public class TaskDefinitionServiceImplTest {
return taskDefinition; return taskDefinition;
} }
private TaskDefinitionLog getTaskDefinitionLog() {
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
taskDefinitionLog.setProjectCode(PROJECT_CODE);
taskDefinitionLog.setCode(TASK_CODE);
taskDefinitionLog.setVersion(VERSION);
taskDefinitionLog.setTaskType("SHELL");
taskDefinitionLog.setTaskParams(TASK_PARAMETER);
taskDefinitionLog.setFlag(Flag.YES);
taskDefinitionLog.setCpuQuota(RESOURCE_RATE);
taskDefinitionLog.setMemoryMax(RESOURCE_RATE);
return taskDefinitionLog;
}
private List<ProcessTaskRelation> getProcessTaskRelationList() { private List<ProcessTaskRelation> getProcessTaskRelationList() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>(); List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
@ -576,6 +663,19 @@ public class TaskDefinitionServiceImplTest {
return processTaskRelationList; return processTaskRelationList;
} }
private List<ProcessTaskRelationLog> getProcessTaskRelationLogList() {
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(PROJECT_CODE);
processTaskRelationLog.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelationLog.setPreTaskCode(TASK_CODE);
processTaskRelationLog.setPostTaskCode(TASK_CODE + 1L);
processTaskRelationLogList.add(processTaskRelationLog);
return processTaskRelationLogList;
}
private List<ProcessTaskRelation> getProcessTaskRelationList2() { private List<ProcessTaskRelation> getProcessTaskRelationList2() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>(); List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();

Loading…
Cancel
Save