Browse Source

pick 8270/8308 (#8355)

3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
4a4e72b0d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  2. 117
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  3. 202
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -60,6 +60,8 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore; import springfox.documentation.annotations.ApiIgnore;
import org.apache.commons.lang3.StringUtils;
/** /**
* task definition controller * task definition controller
*/ */
@ -121,7 +123,7 @@ public class TaskDefinitionController extends BaseController {
@RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode, @RequestParam(value = "processDefinitionCode", required = true) long processDefinitionCode,
@RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
@RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) {
Map<String, Object> result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, processDefinitionCode, taskDefinitionJsonObj, upstreamCodes); Map<String, Object> result = taskDefinitionService.createTaskBindsWorkFlow(loginUser, projectCode, processDefinitionCode, taskDefinitionJsonObj, StringUtils.defaultString(upstreamCodes));
return returnDataList(result); return returnDataList(result);
} }

117
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -72,9 +71,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
@Autowired @Autowired
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper;
@Autowired @Autowired
private TaskDefinitionLogMapper taskDefinitionLogMapper; private TaskDefinitionLogMapper taskDefinitionLogMapper;
@ -115,75 +111,59 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
return result; return result;
} }
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, postTaskCode); updateProcessDefiniteVersion(loginUser, result, processDefinition);
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelations = Lists.newArrayList(processTaskRelationList);
if (!processTaskRelations.isEmpty()) { if (!processTaskRelations.isEmpty()) {
Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream() Map<Long, ProcessTaskRelation> preTaskCodeMap = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == postTaskCode)
.collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation)); .collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelation -> processTaskRelation));
if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) { if (!preTaskCodeMap.isEmpty()) {
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode); if (preTaskCodeMap.containsKey(preTaskCode) || (!preTaskCodeMap.containsKey(0L) && preTaskCode == 0L)) {
return result; putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, processDefinitionCode);
} return result;
if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) { }
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(preTaskCodeMap.get(0L)); if (preTaskCodeMap.containsKey(0L) && preTaskCode != 0L) {
// delete no upstream // delete no upstream
int delete = processTaskRelationMapper.deleteRelation(processTaskRelationLog); processTaskRelations.remove(preTaskCodeMap.get(0L));
int deleteLog = processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if ((delete & deleteLog) == 0) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
} }
} }
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition); TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
Date now = new Date(); ProcessTaskRelation processTaskRelation = setRelation(processDefinition, postTaskDefinition);
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
if (preTaskCode != 0L) { if (preTaskCode != 0L) {
// upstream is or not exist
List<ProcessTaskRelation> upstreamProcessTaskRelations = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, preTaskCode);
TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode); TaskDefinition preTaskDefinition = taskDefinitionMapper.queryByCode(preTaskCode);
if (upstreamProcessTaskRelations.isEmpty()) { List<ProcessTaskRelation> upstreamTaskRelationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() == preTaskCode).collect(Collectors.toList());
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), preTaskDefinition); // upstream is or not exist
processTaskRelationLog.setPreTaskCode(0L); if (upstreamTaskRelationList.isEmpty()) {
processTaskRelationLog.setPreTaskVersion(0); ProcessTaskRelation preProcessTaskRelation = setRelation(processDefinition, preTaskDefinition);
processTaskRelationLogs.add(processTaskRelationLog); preProcessTaskRelation.setPreTaskCode(0L);
preProcessTaskRelation.setPreTaskVersion(0);
processTaskRelations.add(preProcessTaskRelation);
} }
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode); processTaskRelation.setPreTaskCode(preTaskDefinition.getCode());
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition); processTaskRelation.setPreTaskVersion(preTaskDefinition.getVersion());
processTaskRelationLog.setPreTaskCode(preTaskDefinition.getCode());
processTaskRelationLog.setPreTaskVersion(preTaskDefinition.getVersion());
processTaskRelationLogs.add(processTaskRelationLog);
} else {
TaskDefinition postTaskDefinition = taskDefinitionMapper.queryByCode(postTaskCode);
ProcessTaskRelationLog processTaskRelationLog = setRelationLog(processDefinition, now, loginUser.getId(), postTaskDefinition);
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLogs.add(processTaskRelationLog);
}
int insert = processTaskRelationMapper.batchInsert(processTaskRelationLogs);
int insertLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogs);
if ((insert & insertLog) > 0) {
putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); processTaskRelation.setPreTaskCode(0L);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); processTaskRelation.setPreTaskVersion(0);
} }
processTaskRelations.add(processTaskRelation);
updateRelation(loginUser, result, processDefinition, processTaskRelations);
return result; return result;
} }
private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinition, Date now, int userId, TaskDefinition taskDefinition) { private ProcessTaskRelation setRelation(ProcessDefinition processDefinition, TaskDefinition taskDefinition) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); Date now = new Date();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); processTaskRelation.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setPostTaskCode(taskDefinition.getCode()); processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setPostTaskVersion(taskDefinition.getVersion()); processTaskRelation.setPostTaskCode(taskDefinition.getCode());
processTaskRelationLog.setConditionType(ConditionType.NONE); processTaskRelation.setPostTaskVersion(taskDefinition.getVersion());
processTaskRelationLog.setConditionParams("{}"); processTaskRelation.setConditionType(ConditionType.NONE);
processTaskRelationLog.setCreateTime(now); processTaskRelation.setConditionParams("{}");
processTaskRelationLog.setUpdateTime(now); processTaskRelation.setCreateTime(now);
processTaskRelationLog.setOperator(userId); processTaskRelation.setUpdateTime(now);
processTaskRelationLog.setOperateTime(now); return processTaskRelation;
return processTaskRelationLog;
} }
private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) { private void updateProcessDefiniteVersion(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition) {
@ -227,7 +207,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result; return result;
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) { if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result; return result;
@ -245,6 +226,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ",")); putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(downstreamList, ","));
return result; return result;
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList); updateRelation(loginUser, result, processDefinition, processTaskRelationList);
if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType()) if (TaskType.CONDITIONS.getDesc().equals(taskDefinition.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType()) || TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())
@ -261,7 +243,6 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition, private void updateRelation(User loginUser, Map<String, Object> result, ProcessDefinition processDefinition,
List<ProcessTaskRelation> processTaskRelationList) { List<ProcessTaskRelation> processTaskRelationList) {
updateProcessDefiniteVersion(loginUser, result, processDefinition);
List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE); processDefinition.getVersion(), relationLogs, Lists.newArrayList(), Boolean.TRUE);
@ -312,7 +293,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode()); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, upstreamList.get(0).getProcessDefinitionCode());
return result; return result;
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList(); List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
if (preTaskCodeList.size() > 1) { if (preTaskCodeList.size() > 1) {
@ -331,6 +313,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
} }
} }
processTaskRelationList.removeAll(processTaskRelationWaitRemove); processTaskRelationList.removeAll(processTaskRelationWaitRemove);
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList); updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result; return result;
} }
@ -372,8 +355,10 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode()); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, downstreamList.get(0).getProcessDefinitionCode());
return result; return result;
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode); processTaskRelationList.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) && processTaskRelation.getPreTaskCode() == taskCode);
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList); updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result; return result;
} }
@ -474,7 +459,8 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result; return result;
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) { if (CollectionUtils.isEmpty(processTaskRelationList)) {
putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList"); putMsg(result, Status.DATA_IS_NULL, "processTaskRelationList");
return result; return result;
@ -511,6 +497,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
processTaskRelation.setPreTaskCode(0L); processTaskRelation.setPreTaskCode(0L);
processTaskRelationList.add(processTaskRelation); processTaskRelationList.add(processTaskRelation);
} }
updateProcessDefiniteVersion(loginUser, result, processDefinition);
updateRelation(loginUser, result, processDefinition, processTaskRelationList); updateRelation(loginUser, result, processDefinition, processTaskRelationList);
return result; return result;
} }

202
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo; import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -50,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
@ -97,14 +97,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
private ProcessTaskRelationMapper processTaskRelationMapper; private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired @Autowired
private ProcessTaskRelationLogMapper processTaskRelationLogMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
/** /**
* create task definition * create task definition
* *
@ -314,17 +311,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (delete > 0) { if (delete > 0) {
List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) { if (!taskRelationList.isEmpty()) {
int deleteRelation = 0;
for (ProcessTaskRelation processTaskRelation : taskRelationList) {
deleteRelation += processTaskRelationMapper.deleteById(processTaskRelation.getId());
}
if (deleteRelation == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
}
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
updateProcessDefiniteVersion(loginUser, processDefinitionCode); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
List<ProcessTaskRelation> relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList());
} else {
putMsg(result, Status.SUCCESS);
} }
putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); putMsg(result, Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
@ -332,7 +325,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result; return result;
} }
private int updateProcessDefiniteVersion(User loginUser, long processDefinitionCode) { private void updateDag(User loginUser, Map<String, Object> result, long processDefinitionCode, List<ProcessTaskRelation> processTaskRelationList,
List<TaskDefinitionLog> taskDefinitionLogs) {
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) { if (processDefinition == null) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST); throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST);
@ -341,7 +335,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (insertVersion <= 0) { if (insertVersion <= 0) {
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
return insertVersion; List<ProcessTaskRelationLog> relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(),
insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
} }
/** /**
@ -356,50 +359,55 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Override @Override
public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) { public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
if (version <= 0) { if (taskDefinitionToUpdate == null) {
return result; return result;
} }
handleRelation(loginUser, taskCode, version); List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
if (!taskRelationList.isEmpty()) {
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
}
result.put(Constants.DATA_LIST, taskCode); result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
private int updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) { private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode)); result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode));
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
return Constants.EXIT_CODE_FAILURE; return null;
} }
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) { if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return Constants.EXIT_CODE_FAILURE; return null;
} }
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
return Constants.EXIT_CODE_FAILURE; return null;
} }
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition.equals(taskDefinitionToUpdate)) { if (taskDefinition.equals(taskDefinitionToUpdate)) {
return taskDefinition.getVersion(); return null;
} }
if (taskDefinitionToUpdate == null) { if (taskDefinitionToUpdate == null) {
logger.error("taskDefinitionJson is not valid json"); logger.error("taskDefinitionJson is not valid json");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return Constants.EXIT_CODE_FAILURE; return null;
} }
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) { if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) {
logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName()); logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
return Constants.EXIT_CODE_FAILURE; return null;
} }
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) { if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
return Constants.EXIT_CODE_FAILURE; return null;
} }
Date now = new Date(); Date now = new Date();
taskDefinitionToUpdate.setCode(taskCode); taskDefinitionToUpdate.setCode(taskCode);
@ -419,42 +427,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} }
return version; return taskDefinitionToUpdate;
}
private void handleRelation(User loginUser, long taskCode, Integer version) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
int delete = 0;
Date now = new Date();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
}
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLogList.add(processTaskRelationLog);
}
if (delete == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
if ((insertRelation & insertRelationLog) == 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
}
} }
/** /**
@ -470,13 +443,23 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Override @Override
public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) { public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
if (version <= 0) { if (result.get(Constants.STATUS) != Status.SUCCESS && taskDefinitionToUpdate == null) {
return result; return result;
} }
List<ProcessTaskRelation> upstreamTaskRelations = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
Set<Long> upstreamCodeSet = upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet());
Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) {
putMsg(result, Status.SUCCESS);
return result;
} else {
if (taskDefinitionToUpdate == null) {
taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
}
}
Map<Long, TaskDefinition> queryUpStreamTaskCodeMap; Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
if (StringUtils.isNotBlank(upstreamCodes)) { if (!upstreamTaskCodes.isEmpty()) {
Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition));
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
@ -488,76 +471,48 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} else { } else {
queryUpStreamTaskCodeMap = new HashMap<>(); queryUpStreamTaskCodeMap = new HashMap<>();
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); if (!upstreamTaskRelations.isEmpty()) {
if (!queryUpStreamTaskCodeMap.isEmpty() && processTaskRelationList.isEmpty()) { ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA)); List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode());
throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST); List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
} List<ProcessTaskRelation> relationList = Lists.newArrayList();
if (!processTaskRelationList.isEmpty()) {
long processDefinitionCode = processTaskRelationList.get(0).getProcessDefinitionCode();
int definiteVersion = updateProcessDefiniteVersion(loginUser, processDefinitionCode);
List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
Date now = new Date();
int delete = 0;
int deleteLog = 0;
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation); if (processTaskRelation.getPostTaskCode() == taskCode) {
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog); if (queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) && processTaskRelation.getPreTaskCode() != 0L) {
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog); queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode());
processTaskRelationLog.setOperator(loginUser.getId()); } else {
processTaskRelationLog.setOperateTime(now); processTaskRelation.setPreTaskCode(0L);
processTaskRelationLog.setUpdateTime(now); processTaskRelation.setPreTaskVersion(0);
if (processTaskRelationLog.getPreTaskCode() == taskCode) { relationList.add(processTaskRelation);
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
TaskDefinition definition = queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode());
if (definition == null) {
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
} }
} }
processTaskRelationLog.setProcessDefinitionVersion(definiteVersion);
relationLogs.add(processTaskRelationLog);
}
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} }
if (!queryUpStreamTaskCodeMap.isEmpty()) { processTaskRelationList.removeAll(relationList);
ProcessTaskRelationLog taskRelationLogDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), ProcessTaskRelationLog.class); for (Map.Entry<Long, TaskDefinition> queryUpStreamTask : queryUpStreamTaskCodeMap.entrySet()) {
assert taskRelationLogDeepCopy != null; taskRelation.setPreTaskCode(queryUpStreamTask.getKey());
for (TaskDefinition upstreamTask : queryUpStreamTaskCodeMap.values()) { taskRelation.setPreTaskVersion(queryUpStreamTask.getValue().getVersion());
taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode()); processTaskRelationList.add(taskRelation);
taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion());
relationLogs.add(taskRelationLogDeepCopy);
}
} }
Map<Long, ProcessTaskRelationLog> taskRelationLogMap = if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) {
relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelationLog -> processTaskRelationLog)); processTaskRelationList.add(processTaskRelationList.get(0));
if (taskRelationLogMap.containsKey(0L) && taskRelationLogMap.size() >= 3) {
taskRelationLogMap.remove(0L);
}
int insertRelation = processTaskRelationMapper.batchInsert(relationLogs);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(relationLogs);
if ((insertRelation & insertRelationLog) == 0) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
} }
updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate));
} }
result.put(Constants.DATA_LIST, taskCode); result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
/** /**
* Switch task definition * switch task definition
* *
* @param loginUser login user * @param loginUser login user
* @param projectCode project code * @param projectCode project code
* @param taskCode task code * @param taskCode task code
* @param version the version user want to switch * @param version the version user want to switch
*/ */
@Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public Map<String, Object> switchVersion(User loginUser, long projectCode, long taskCode, int version) { public Map<String, Object> switchVersion(User loginUser, long projectCode, long taskCode, int version) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
@ -581,9 +536,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionUpdate.setId(taskDefinition.getId()); taskDefinitionUpdate.setId(taskDefinition.getId());
int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate);
if (switchVersion > 0) { if (switchVersion > 0) {
handleRelation(loginUser, taskCode, version); List<ProcessTaskRelation> taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode);
result.put(Constants.DATA_LIST, taskCode); if (!taskRelationList.isEmpty()) {
putMsg(result, Status.SUCCESS); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate));
} else {
putMsg(result, Status.SUCCESS);
}
} else { } else {
putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR); putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
} }

Loading…
Cancel
Save