Browse Source

fix task api update bug (#8270)

2.0.7-release
JinYong Li 3 years ago committed by GitHub
parent
commit
d5d71842e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 100
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  2. 93
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

100
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -115,75 +115,58 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode);
updateProcessDefiniteVersion(loginUser, result, processDefinition);
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
if (!processTaskRelations.isEmpty()) {
Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream()
Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode)
.collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation));
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
return result;
}
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L));
// delete no upstream
int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog);
int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if ((delete & deleteLog) == 0) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
if (!preTaskCodeMap.isEmpty()) {
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
return result;
}
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
// delete no upstream
processTaskRelations.remove(preTaskCodeMap.get(0L));
}
}
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);
Date now = new Date();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
ProcessTaskRelation processTaskRelation = setRelation(processDefinition, postTaskDefinition);
if (preTaskCode != 0L) {
// upstream is or not exist
List<ProcessTaskRelation> upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode);
TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode);
if (upstreamProcessTaskRelations.isEmpty()) {
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLogs.add(processTaskRelationLog);
List<ProcessTaskRelation> upstreamTaskRelationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList());
// upstream is or not exist
if (upstreamTaskRelationList.isEmpty()) {
ProcessTaskRelation preProcessTaskRelation = setRelation(processDefinition, preTaskDefinition);
preProcessTaskRelation.setPreTaskCode(0L);
preProcessTaskRelation.setPreTaskVersion(0);
processTaskRelations.add(preProcessTaskRelation);
}
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode());
processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion());
processTaskRelationLogs.add(processTaskRelationLog);
} else {
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLogs.add(processTaskRelationLog);
}
int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs);
int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
if ((insert & insertLog) > 0) {
putMsg(result, Status.SUCCESS);
processTaskRelation.setPreTaskCode(preTaskDefinition.getCode());
processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
} else {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
processTaskRelation.setPreTaskCode(0L);
processTaskRelation.setPreTaskVersion(0);
}
processTaskRelations.add(processTaskRelation);
updateRelation(loginUser, result, processDefinition, processTaskRelations);
return result;
}
private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setPostTaskCode(taskDefinition.getCode());
processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLog.setOperator(userId);
processTaskRelationLog.setOperateTime(now);
return processTaskRelationLog;
private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) {
Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(processDefinition.getProjectCode());
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelation.setPostTaskCode(taskDefinition.getCode());
processTaskRelation.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelation.setConditionType(ConditionType.NONE);
processTaskRelation.setConditionParams("{}");
processTaskRelation.setCreateTime(now);
processTaskRelation.setUpdateTime(now);
return processTaskRelation;
}
private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
@ -245,6 +228,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ","));
return result;
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
@ -261,7 +245,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
List<ProcessTaskRelation> processTaskRelationList) {
updateProcessDefiniteVersion(loginUser, result, processDefinition);
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
@ -331,6 +314,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
}
processTaskRelationList.removeAll(processTaskRelationWaitRemove);
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result;
}
@ -374,6 +358,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode);
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result;
}
@ -511,6 +496,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelation.setPreTaskCode(0L);
processTaskRelationList.add(processTaskRelation);
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result;
}

93
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -28,9 +28,9 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -67,6 +66,7 @@ import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;
/**
* task definition service impl
@ -93,9 +93,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@ -218,17 +215,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (delete > 0) {
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
int deleteRelation = 0;
for (ProcessTaskRelation processTaskRelation : taskRelationList) {
deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId());
}
if (deleteRelation == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
updateProcessDefiniteVersion(loginUser, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList());
} else {
putMsg(result, Status.SUCCESS);
}
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@ -236,7 +229,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) {
private void updateDag(User loginUser, Map<String, Object> result, long processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
@ -245,7 +239,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
return insertVersion;
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
}
/**
@ -308,54 +311,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
handleRelation(loginUser, taskCode, version, now);
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
} else {
putMsg(result, Status.SUCCESS);
}
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update);
return result;
}
private void handleRelation(User loginUser, long taskCode, Integer version, Date now) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
int delete = 0;
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
}
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLogList.add(processTaskRelationLog);
}
if (delete == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
if ((insertRelation & insertRelationLog) == 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
}
}
/**
* update task definition
* switch task definition
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param version the version user want to switch
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> switchVersion(User loginUser, long projectCode, long taskCode, int version) {
Project project = projectMapper.queryByCode(projectCode);
@ -375,14 +352,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
taskDefinitionUpdate.setUserId(loginUser.getId());
Date now = new Date();
taskDefinitionUpdate.setUpdateTime(new Date());
taskDefinitionUpdate.setId(taskDefinition.getId());
int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (switchVersion > 0) {
handleRelation(loginUser, taskCode, version, now);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
} else {
putMsg(result, Status.SUCCESS);
}
} else {
putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
}

Loading…
Cancel
Save