Browse Source

[Bug] [dolphinscheduler] fix duplicate key (#8111)

* fix #8043

* fix null when no change

Co-authored-by: caishunfeng <534328519@qq.com>
2.0.7-release
JinYong Li 2 years ago committed by GitHub
parent
commit
6bf8063fe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 34
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 65
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  3. 47
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  4. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  5. 17
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  6. 33
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

34
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -577,18 +577,28 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} }
int insertVersion; boolean isChange = false;
if (processDefinition.equals(processDefinitionDeepCopy)) { if (processDefinition.equals(processDefinitionDeepCopy) && saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
insertVersion = processDefinitionDeepCopy.getVersion(); List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion(processDefinition.getCode(), insertVersion); if (taskRelationList.size() == processTaskRelationLogList.size()) {
processDefinitionLog.setOperator(loginUser.getId()); Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
processDefinitionLog.setOperateTime(new Date()); taskRelationList.stream().collect(Collectors.toMap(ProcessTaskRelationLog::getPostTaskCode, processTaskRelationLog -> processTaskRelationLog));
int update = processDefinitionLogMapper.updateById(processDefinitionLog); for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (!processTaskRelationLog.equals(taskRelationLogMap.get(processTaskRelationLog.getPostTaskCode()))) {
isChange = true;
break;
}
}
} else { } else {
processDefinition.setUpdateTime(new Date()); isChange = true;
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
} }
if (insertVersion == 0) { } else {
isChange = true;
}
if (isChange) {
processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
@ -601,6 +611,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
} else {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
}
return result; return result;
} }

65
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
@Autowired @Autowired
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessService processService;
/** /**
* create process task relation * create process task relation
* *
@ -135,6 +139,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
} }
} }
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition);
Date now = new Date(); Date now = new Date();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>(); List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
if (preTaskCode != 0L) { if (preTaskCode != 0L) {
@ -268,6 +273,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result; return result;
} }
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition);
Date now = new Date(); Date now = new Date();
ProcessTaskRelation processTaskRelation = upstreamList.get(0); ProcessTaskRelation processTaskRelation = upstreamList.get(0);
ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation); ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogMapper.queryRelationLogByRelation(processTaskRelation);
@ -290,6 +296,15 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result; return result;
} }
private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
processDefinition.setVersion(insertVersion);
}
/** /**
* delete process task relation * delete process task relation
* *
@ -331,7 +346,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
return result; return result;
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(projectCode); processTaskRelationLog.setProjectCode(projectCode);
processTaskRelationLog.setPostTaskCode(taskCode); processTaskRelationLog.setPostTaskCode(taskCode);
@ -379,7 +394,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.DATA_IS_NULL, "preTaskCodes"); putMsg(result, Status.DATA_IS_NULL, "preTaskCodes");
return result; return result;
} }
Status status = deleteUpstreamRelation(loginUser.getId(), projectCode, Status status = deleteUpstreamRelation(loginUser, projectCode,
Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode); Lists.newArrayList(preTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).distinct().toArray(Long[]::new), taskCode);
if (status != Status.SUCCESS) { if (status != Status.SUCCESS) {
putMsg(result, status); putMsg(result, status);
@ -417,11 +432,23 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); Set<Long> postTaskCodesSet = Lists.newArrayList(postTaskCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
int delete = 0; int delete = 0;
int deleteLog = 0; int deleteLog = 0;
Set<Long> processCodeSet = new HashSet<>();
for (long postTaskCode : postTaskCodesSet) { for (long postTaskCode : postTaskCodesSet) {
ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode); ProcessTaskRelationLog processTaskRelationLog = taskRelationLogMap.get(postTaskCode);
if (processTaskRelationLog != null) { if (processTaskRelationLog != null) {
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
processCodeSet.add(processTaskRelationLog.getProcessDefinitionCode());
}
}
for (long code : processCodeSet) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
} }
if ((delete & deleteLog) == 0) { if ((delete & deleteLog) == 0) {
@ -514,6 +541,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param postTaskCode post task code * @param postTaskCode post task code
* @return delete result code * @return delete result code
*/ */
@Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) { public Map<String, Object> deleteEdge(User loginUser, long projectCode, long processDefinitionCode, long preTaskCode, long postTaskCode) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
@ -522,6 +550,11 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result; return result;
} }
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode);
if (CollectionUtils.isEmpty(processTaskRelationList)) { if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
@ -546,11 +579,22 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
} }
return result; return result;
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition);
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelation.setPreTaskVersion(0); processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPreTaskCode(0L); processTaskRelation.setPreTaskCode(0L);
Date now = new Date();
processTaskRelation.setUpdateTime(now);
int update = processTaskRelationMapper.updateById(processTaskRelation); int update = processTaskRelationMapper.updateById(processTaskRelation);
if (update == 0) { processTaskRelation.setId(0);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
int insert = processTaskRelationLogMapper.insert(processTaskRelationLog);
if ((update & insert) == 0) {
putMsg(result, Status.DELETE_EDGE_ERROR); putMsg(result, Status.DELETE_EDGE_ERROR);
throw new ServiceException(Status.DELETE_EDGE_ERROR);
} }
return result; return result;
} }
@ -592,7 +636,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
* @param taskCode pre task code * @param taskCode pre task code
* @return status * @return status
*/ */
private Status deleteUpstreamRelation(int userId, long projectCode, Long[] preTaskCodes, long taskCode) { private Status deleteUpstreamRelation(User loginUser, long projectCode, Long[] preTaskCodes, long taskCode) {
List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes); List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryUpstreamByCodes(projectCode, taskCode, preTaskCodes);
if (CollectionUtils.isEmpty(upstreamList)) { if (CollectionUtils.isEmpty(upstreamList)) {
return Status.SUCCESS; return Status.SUCCESS;
@ -601,7 +645,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
Date now = new Date(); Date now = new Date();
for (ProcessTaskRelation processTaskRelation : upstreamList) { for (ProcessTaskRelation processTaskRelation : upstreamList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(userId); processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now); processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now); processTaskRelationLog.setUpdateTime(now);
upstreamLogList.add(processTaskRelationLog); upstreamLogList.add(processTaskRelationLog);
@ -617,14 +661,23 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) { for (Map<String, Long> codeCountMap : countListGroupByProcessDefinitionCode) {
long processDefinitionCode = codeCountMap.get("processDefinitionCode"); long processDefinitionCode = codeCountMap.get("processDefinitionCode");
long countValue = codeCountMap.get("countValue"); long countValue = codeCountMap.get("countValue");
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
return Status.PROCESS_DEFINE_NOT_EXIST;
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode); List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationListGroupByProcessDefinitionCode.get(processDefinitionCode);
if (countValue <= processTaskRelationLogList.size()) { if (countValue <= processTaskRelationLogList.size()) {
ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0); ProcessTaskRelationLog processTaskRelationLog = processTaskRelationLogList.remove(0);
if (processTaskRelationLog.getPreTaskCode() != 0) { if (processTaskRelationLog.getPreTaskCode() != 0) {
processTaskRelationLog.setPreTaskCode(0); processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0); processTaskRelationLog.setPreTaskVersion(0);
updates.add(processTaskRelationLog);
} }
processTaskRelationLog.setProcessDefinitionVersion(insertVersion);
updates.add(processTaskRelationLog);
} }
if (!processTaskRelationLogList.isEmpty()) { if (!processTaskRelationLogList.isEmpty()) {
deletes.addAll(processTaskRelationLogList); deletes.addAll(processTaskRelationLogList);

47
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -31,12 +31,14 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@ -94,6 +96,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired @Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper; private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@ -178,6 +183,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* @param projectCode project code * @param projectCode project code
* @param taskCode task code * @param taskCode task code
*/ */
@Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
@ -213,23 +219,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) { if (!taskRelationList.isEmpty()) {
int deleteRelation = 0; int deleteRelation = 0;
int deleteRelationLog = 0;
for (ProcessTaskRelation processTaskRelation : taskRelationList) { for (ProcessTaskRelation processTaskRelation : taskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId());
deleteRelation += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteRelationLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
} }
if ((deleteRelation & deleteRelationLog) == 0) { if (deleteRelation == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} }
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
updateProcessDefiniteVersion(loginUser, processDefinitionCode);
} }
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
} }
return result; return result;
} }
private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
return insertVersion;
}
/** /**
* update task definition * update task definition
* *
@ -290,27 +308,35 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} }
handleRelation(loginUser, taskCode, version, now);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update);
return result;
}
private void handleRelation(User loginUser, long taskCode, Integer version, Date now) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>(); List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
int delete = 0; int delete = 0;
int deleteLog = 0;
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if (processTaskRelationLog.getPreTaskCode() == taskCode) { if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version); processTaskRelationLog.setPreTaskVersion(version);
} }
if (processTaskRelationLog.getPostTaskCode() == taskCode) { if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version); processTaskRelationLog.setPostTaskVersion(version);
} }
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
processTaskRelationLog.setOperator(loginUser.getId()); processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now); processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now); processTaskRelationLog.setUpdateTime(now);
processTaskRelationLogList.add(processTaskRelationLog); processTaskRelationLogList.add(processTaskRelationLog);
} }
if ((delete & deleteLog) == 0) { if (delete == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR); throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else { } else {
int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList); int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
@ -320,9 +346,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} }
} }
} }
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update);
return result;
} }
/** /**
@ -352,10 +375,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} }
TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
taskDefinitionUpdate.setUserId(loginUser.getId()); taskDefinitionUpdate.setUserId(loginUser.getId());
Date now = new Date();
taskDefinitionUpdate.setUpdateTime(new Date()); taskDefinitionUpdate.setUpdateTime(new Date());
taskDefinitionUpdate.setId(taskDefinition.getId()); taskDefinitionUpdate.setId(taskDefinition.getId());
int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (switchVersion > 0) { if (switchVersion > 0) {
handleRelation(loginUser, taskCode, version, now);
result.put(Constants.DATA_LIST, taskCode); result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -667,6 +667,7 @@ public class ProcessDefinitionServiceTest {
processDefinition.setTenantId(1); processDefinition.setTenantId(1);
processDefinition.setDescription(""); processDefinition.setDescription("");
processDefinition.setCode(46L); processDefinition.setCode(46L);
processDefinition.setVersion(1);
return processDefinition; return processDefinition;
} }

17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.CollectionUtils;
@ -88,6 +89,9 @@ public class ProcessTaskRelationServiceTest {
@Mock @Mock
private ProcessTaskRelationLogMapper processTaskRelationLogMapper; private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Mock
private ProcessService processService;
/** /**
* get Mock Admin User * get Mock Admin User
* *
@ -456,6 +460,9 @@ public class ProcessTaskRelationServiceTest {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(processDefinition);
Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode); Map<String, Object> result1 = processTaskRelationService.deleteDownstreamRelation(loginUser, projectCode, "123", taskCode);
Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result1.get(Constants.STATUS));
} }
@ -541,6 +548,7 @@ public class ProcessTaskRelationServiceTest {
processTaskRelationLog.setProcessDefinitionVersion(1); processTaskRelationLog.setProcessDefinitionVersion(1);
Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1); Mockito.when(processTaskRelationLogMapper.deleteRelation(processTaskRelationLog)).thenReturn(1);
Mockito.when(processService.saveProcessDefine(loginUser, getProcessDefinition(), Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode); result = processTaskRelationService.deleteTaskProcessRelation(loginUser, projectCode, processDefinitionCode, taskCode);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} }
@ -549,7 +557,7 @@ public class ProcessTaskRelationServiceTest {
public void testDeleteEdge() { public void testDeleteEdge() {
long projectCode = 1L; long projectCode = 1L;
long processDefinitionCode = 3L; long processDefinitionCode = 3L;
long preTaskCode = 4L; long preTaskCode = 0L;
long postTaskCode = 5L; long postTaskCode = 5L;
Project project = getProject(projectCode); Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
@ -563,14 +571,21 @@ public class ProcessTaskRelationServiceTest {
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode); processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(processDefinitionCode); processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelation.setPreTaskCode(preTaskCode); processTaskRelation.setPreTaskCode(preTaskCode);
processTaskRelation.setPostTaskCode(postTaskCode); processTaskRelation.setPostTaskCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(loginUser.getId());
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>(); List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation); processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList); Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, preTaskCode, postTaskCode)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1); Mockito.when(processTaskRelationMapper.countByCode(projectCode, processDefinitionCode, 0L, postTaskCode)).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteById(processTaskRelation.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1); Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.insert(processTaskRelationLog)).thenReturn(1);
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(processDefinition);
Mockito.when(processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE)).thenReturn(1);
result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode); result = processTaskRelationService.deleteEdge(loginUser, projectCode, processDefinitionCode, preTaskCode, postTaskCode);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
} }

33
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2184,7 +2184,7 @@ public class ProcessService {
if (result > 0) { if (result > 0) {
result = switchProcessTaskRelationVersion(processDefinitionLog); result = switchProcessTaskRelationVersion(processDefinitionLog);
if (result <= 0) { if (result <= 0) {
return Constants.DEFINITION_FAILURE; return Constants.EXIT_CODE_FAILURE;
} }
} }
return result; return result;
@ -2196,7 +2196,36 @@ public class ProcessService {
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode()); processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
} }
List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); List<ProcessTaskRelationLog> processTaskRelationLogList = processTaskRelationLogMapper.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
return processTaskRelationMapper.batchInsert(processTaskRelationLogList); int batchInsert = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
if (batchInsert == 0) {
return Constants.EXIT_CODE_FAILURE;
} else {
int result = 0;
for (ProcessTaskRelationLog taskRelationLog : processTaskRelationLogList) {
int switchResult = switchTaskDefinitionVersion(taskRelationLog.getPostTaskCode(), taskRelationLog.getPostTaskVersion());
if (switchResult != Constants.EXIT_CODE_FAILURE) {
result++;
}
}
return result;
}
}
public int switchTaskDefinitionVersion(long taskCode, int taskVersion) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
return Constants.EXIT_CODE_FAILURE;
}
if (taskDefinition.getVersion() == taskVersion) {
return Constants.EXIT_CODE_SUCCESS;
}
TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskVersion);
if (taskDefinitionUpdate == null) {
return Constants.EXIT_CODE_FAILURE;
}
taskDefinitionUpdate.setUpdateTime(new Date());
taskDefinitionUpdate.setId(taskDefinition.getId());
return taskDefinitionMapper.updateById(taskDefinitionUpdate);
} }
/** /**

Loading…
Cancel
Save