From 26dad5e2a8ee66e599e7c7da74bc408d4b9f67f6 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 20 Sep 2022 22:23:16 +0800 Subject: [PATCH] Fix update TaskDefinition error (#12060) --- .../impl/TaskDefinitionServiceImpl.java | 296 +++++++++++------- .../service/process/ProcessServiceImpl.java | 45 ++- 2 files changed, 221 insertions(+), 120 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 9e30359d76..947dfc2639 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -17,8 +17,16 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.permission.PermissionCheck; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -46,7 +54,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; -import org.apache.dolphinscheduler.api.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; @@ -73,8 +80,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*; - /** * task definition service impl */ @@ -122,8 +127,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe long projectCode, String taskDefinitionJson) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -153,7 +159,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } Map resData = new HashMap<>(); resData.put("total", taskDefinitionLogs.size()); - resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); + resData.put("code", StringUtils + .join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, resData); return result; @@ -177,8 +184,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe String taskDefinitionJsonObj, String upstreamCodes) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_CREATE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -211,25 +219,27 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } long taskCode = taskDefinition.getCode(); if (taskCode == 0) { - try { - taskCode = CodeGenerateUtils.getInstance().genCode(); - taskDefinition.setCode(taskCode); - } catch (CodeGenerateException e) { - logger.error("Generate task definition code error.", e); - putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, taskDefinitionJsonObj); - return result; - } + taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); } - List processTaskRelationLogList = Lists.newArrayList(); + List processTaskRelationLogList = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode) + .stream() + .map(ProcessTaskRelationLog::new) + .collect(Collectors.toList()); + if (StringUtils.isNotBlank(upstreamCodes)) { - Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong) + .collect(Collectors.toSet()); List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); - Set queryUpStreamTaskCodes = upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); + Set queryUpStreamTaskCodes = + upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); // upstreamTaskCodes - queryUpStreamTaskCodes - Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet()); + Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)) + .collect(Collectors.toSet()); if (!diffCode.isEmpty()) { String taskCodes = StringUtils.join(diffCode, Constants.COMMA); - logger.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", taskCodes); + logger.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", + taskCodes); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes); return result; } @@ -243,10 +253,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLog.setConditionParams("{}"); processTaskRelationLogList.add(processTaskRelationLog); } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - if (!processTaskRelationList.isEmpty()) { - processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList())); - } } else { ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); processTaskRelationLog.setPreTaskCode(0); @@ -257,22 +263,30 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLog.setConditionParams("{}"); processTaskRelationLogList.add(processTaskRelationLog); } - int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(), - processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); + int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), + processDefinition.getVersion(), + processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); if (insertResult != Constants.EXIT_CODE_SUCCESS) { - logger.error("Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.", processDefinition.getCode(), processDefinition.getVersion()); + logger.error( + "Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.", + processDefinition.getCode(), processDefinition.getVersion()); putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); } else - logger.info("Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.", processDefinition.getCode(), processDefinition.getVersion()); + logger.info( + "Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.", + processDefinition.getCode(), processDefinition.getVersion()); - int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); + int saveTaskResult = + processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); if (saveTaskResult == Constants.DEFINITION_FAILURE) { - logger.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskDefinition.getCode()); + logger.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskDefinition.getCode()); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } else - logger.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskDefinition.getCode()); + logger.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskDefinition.getCode()); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, taskDefinition); return result; @@ -287,10 +301,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @param taskName task name */ @Override - public Map queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) { + public Map queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, + String taskName) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -318,8 +334,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_DELETE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_DELETE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -335,33 +352,42 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { - logger.warn("Task definition can not be deleted due to task state online, taskDefinitionCode:{}.", taskCode); + logger.warn("Task definition can not be deleted due to task state online, taskDefinitionCode:{}.", + taskCode); putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode); return result; } - List processTaskRelationList = processTaskRelationMapper.queryDownstreamByTaskCode(taskCode); + List processTaskRelationList = + processTaskRelationMapper.queryDownstreamByTaskCode(taskCode); if (!processTaskRelationList.isEmpty()) { Set postTaskCodes = processTaskRelationList .stream() .map(ProcessTaskRelation::getPostTaskCode) .collect(Collectors.toSet()); String postTaskCodesStr = StringUtils.join(postTaskCodes, ","); - logger.warn("Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}", + logger.warn( + "Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}", taskCode, postTaskCodesStr); putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr); return result; } int delete = taskDefinitionMapper.deleteByCode(taskCode); if (delete > 0) { - List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { - logger.info("Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", taskCode); + logger.info( + "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", + taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - List relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List relationList = processTaskRelations.stream() + .filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList()); } else { - logger.info("Task definition delete complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); + logger.info("Task definition delete complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskCode); putMsg(result, Status.SUCCESS); } } else { @@ -372,7 +398,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - private void updateDag(User loginUser, Map result, long processDefinitionCode, List processTaskRelationList, + private void updateDag(User loginUser, Map result, long processDefinitionCode, + List processTaskRelationList, List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { @@ -381,19 +408,27 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion <= 0) { - logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinitionCode); + logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", + processDefinition.getProjectCode(), processDefinitionCode); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } else - logger.info("Save new version process definition complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", processDefinition.getProjectCode(), processDefinitionCode, insertVersion); - List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), + logger.info( + "Save new version process definition complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", + processDefinition.getProjectCode(), processDefinitionCode, insertVersion); + List relationLogs = + processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { - logger.info("Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", processDefinition.getProjectCode(), processDefinitionCode, insertVersion); + logger.info( + "Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", + processDefinition.getProjectCode(), processDefinitionCode, insertVersion); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); } else { - logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}.", processDefinition.getProjectCode(), processDefinitionCode); + logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}.", + processDefinition.getProjectCode(), processDefinitionCode); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } @@ -409,18 +444,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe */ @Transactional @Override - public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) { + public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, + String taskDefinitionJsonObj) { Map result = new HashMap<>(); - TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + TaskDefinitionLog taskDefinitionToUpdate = + updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); if (taskDefinitionToUpdate == null) { return result; } - List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { - logger.info("Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", taskCode); + logger.info( + "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", + taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, + Lists.newArrayList(taskDefinitionToUpdate)); } logger.info("Update task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); result.put(Constants.DATA_LIST, taskCode); @@ -428,10 +470,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map result) { + private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, + Map result) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_UPDATE)); + // check user access for project + result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_UPDATE)); if (result.get(Constants.STATUS) != Status.SUCCESS) { return null; } @@ -444,12 +487,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { // if stream, can update task definition without online check if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) { - logger.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.", TaskExecuteType.STREAM, taskCode); + logger.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.", + TaskExecuteType.STREAM, taskCode); putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); return null; } } - TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + TaskDefinitionLog taskDefinitionToUpdate = + JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); if (taskDefinition.equals(taskDefinitionToUpdate)) { logger.warn("Task definition does not need update because no change, taskDefinitionCode:{}.", taskCode); return null; @@ -464,13 +509,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe .taskParams(taskDefinitionToUpdate.getTaskParams()) .dependence(taskDefinitionToUpdate.getDependence()) .build())) { - logger.warn("Task definition parameters are invalid, taskDefinitionName:{}.", taskDefinitionToUpdate.getName()); + logger.warn("Task definition parameters are invalid, taskDefinitionName:{}.", + taskDefinitionToUpdate.getName()); putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); return null; } Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); if (version == null || version == 0) { - logger.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.", taskCode); + logger.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.", + taskCode); putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); return null; } @@ -490,11 +537,13 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskDefinitionToUpdate.setId(null); int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); if ((update & insert) != 1) { - logger.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); + logger.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.", + projectCode, taskCode); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } else - logger.info("Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.", + logger.info( + "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.", projectCode, taskCode, taskDefinitionToUpdate.getVersion()); return taskDefinitionToUpdate; } @@ -510,17 +559,22 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @return update result code */ @Override - public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) { + public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, + String taskDefinitionJsonObj, String upstreamCodes) { Map result = new HashMap<>(); - TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + TaskDefinitionLog taskDefinitionToUpdate = + updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); if (result.get(Constants.STATUS) != Status.SUCCESS && taskDefinitionToUpdate == null) { return result; } - List upstreamTaskRelations = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); - Set upstreamCodeSet = upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet()); + List upstreamTaskRelations = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + Set upstreamCodeSet = + upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet()); Set upstreamTaskCodes = Collections.emptySet(); if (StringUtils.isNotEmpty(upstreamCodes)) { - upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong) + .collect(Collectors.toSet()); } if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) { putMsg(result, Status.SUCCESS); @@ -533,12 +587,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Map queryUpStreamTaskCodeMap; if (!upstreamTaskCodes.isEmpty()) { List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); - queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); + queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream() + .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet()); if (!upstreamTaskCodes.isEmpty()) { String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA); - logger.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.", notExistTaskCodes); + logger.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.", + notExistTaskCodes); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes); return result; } @@ -547,12 +603,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } if (!upstreamTaskRelations.isEmpty()) { ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); List processTaskRelationList = Lists.newArrayList(processTaskRelations); List relationList = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { if (processTaskRelation.getPostTaskCode() == taskCode) { - if (queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) && processTaskRelation.getPreTaskCode() != 0L) { + if (queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) + && processTaskRelation.getPreTaskCode() != 0L) { queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode()); } else { processTaskRelation.setPreTaskCode(0L); @@ -570,16 +628,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) { processTaskRelationList.add(processTaskRelationList.get(0)); } - updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); + updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, + Lists.newArrayList(taskDefinitionToUpdate)); } - logger.info("Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.", + logger.info( + "Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.", projectCode, taskCode, upstreamTaskCodes); result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); return result; } - /** * switch task definition * @@ -592,13 +651,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map switchVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project,projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_SWITCH_TO_THIS_VERSION); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } if (processService.isTaskOnline(taskCode)) { - logger.warn("Task definition version can not be switched due to process definition is {}, taskDefinitionCode:{}.", + logger.warn( + "Task definition version can not be switched due to process definition is {}, taskDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), taskCode); putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE); return result; @@ -609,20 +670,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); return result; } - TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); + TaskDefinitionLog taskDefinitionUpdate = + taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); taskDefinitionUpdate.setUserId(loginUser.getId()); taskDefinitionUpdate.setUpdateTime(new Date()); taskDefinitionUpdate.setId(taskDefinition.getId()); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); if (switchVersion > 0) { - List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { - logger.info("Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", taskCode); + logger.info( + "Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", + taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate)); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, + Lists.newArrayList(taskDefinitionUpdate)); } else { - logger.info("Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}.", version, taskCode); + logger.info( + "Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}.", + version, taskCode); putMsg(result, Status.SUCCESS); } } else { @@ -641,7 +710,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); // check user access for project - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_VERSION_VIEW); + Map checkResult = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_VERSION_VIEW); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { putMsg(result, resultStatus); @@ -649,7 +719,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); Page page = new Page<>(pageNo, pageSize); - IPage taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, projectCode); + IPage taskDefinitionVersionsPaging = + taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, projectCode); List taskDefinitionLogs = taskDefinitionVersionsPaging.getRecords(); pageInfo.setTotalList(taskDefinitionLogs); @@ -662,8 +733,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map deleteByCodeAndVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_DELETE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_DELETE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -674,14 +746,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); } else { if (taskDefinition.getVersion() == version) { - logger.warn("Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.", + logger.warn( + "Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.", projectCode, taskCode, version); putMsg(result, Status.MAIN_TABLE_USING_VERSION); return result; } int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version); if (delete > 0) { - logger.info("Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}.", + logger.info( + "Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}.", projectCode, taskCode, version); putMsg(result, Status.SUCCESS); } else { @@ -696,8 +770,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map queryTaskDefinitionDetail(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -724,8 +799,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Integer pageSize) { Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); + // check user access for project + Map checkResult = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { putMsg(result, resultStatus); @@ -733,8 +809,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } taskType = taskType == null ? StringUtils.EMPTY : taskType; Page page = new Page<>(pageNo, pageSize); - IPage taskMainInfoIPage = taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName, - searchTaskName, taskType, taskExecuteType); + IPage taskMainInfoIPage = + taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName, + searchTaskName, taskType, taskExecuteType); List records = taskMainInfoIPage.getRecords(); if (!records.isEmpty()) { Map taskMainInfoMap = new HashMap<>(); @@ -800,10 +877,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe */ @Transactional @Override - public Map releaseTaskDefinition(User loginUser, long projectCode, long code, ReleaseState releaseState) { + public Map releaseTaskDefinition(User loginUser, long projectCode, long code, + ReleaseState releaseState) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,null); + // check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); Status resultStatus = (Status) result.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return result; @@ -817,7 +895,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); return result; } - TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion()); + TaskDefinitionLog taskDefinitionLog = + taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion()); if (taskDefinitionLog == null) { logger.error("Task definition does not exist, taskDefinitionCode:{}.", code); putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); @@ -831,8 +910,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe case ONLINE: String resourceIds = taskDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { - Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); - PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); + Integer[] resourceIdArray = + Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); + PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, + processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { @@ -856,7 +937,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } - logger.error("Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}.", code); + logger.error("Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}.", + code); putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 6536a6b4ed..09ccbd7d10 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -602,7 +602,8 @@ public class ProcessServiceImpl implements ProcessService { @Override public void removeTaskLogFile(Integer processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); - List taskInstanceList = findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag()); + List taskInstanceList = + findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); if (CollectionUtils.isEmpty(taskInstanceList)) { return; } @@ -623,7 +624,8 @@ public class ProcessServiceImpl implements ProcessService { @Override public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); - List taskInstanceList = findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag()); + List taskInstanceList = + findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag()); if (CollectionUtils.isEmpty(taskInstanceList)) { return; } @@ -1037,7 +1039,8 @@ public class ProcessServiceImpl implements ProcessService { case COMPLEMENT_DATA: // delete all the valid tasks when complement data if id is not null if (processInstance.getId() != null) { - List taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + List taskInstanceList = + this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { taskInstance.setFlag(Flag.NO); this.updateTaskInstance(taskInstance); @@ -1051,7 +1054,8 @@ public class ProcessServiceImpl implements ProcessService { processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); } // delete all the valid tasks when repeat running - List validTaskList = findValidTaskListByProcessId(processInstance.getId(),processInstance.getTestFlag()); + List validTaskList = + findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); for (TaskInstance taskInstance : validTaskList) { taskInstance.setFlag(Flag.NO); updateTaskInstance(taskInstance); @@ -1654,7 +1658,8 @@ public class ProcessServiceImpl implements ProcessService { if (failureStrategy == FailureStrategy.CONTINUE) { return true; } - List taskInstances = this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(),taskInstance.getTestFlag()); + List taskInstances = + this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), taskInstance.getTestFlag()); for (TaskInstance task : taskInstances) { if (task.getState() == TaskExecutionStatus.FAILURE @@ -1862,7 +1867,8 @@ public class ProcessServiceImpl implements ProcessService { @Override public List findPreviousTaskListByWorkProcessId(Integer processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); - return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, processInstance.getTestFlag()); + return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, + processInstance.getTestFlag()); } /** @@ -2194,7 +2200,8 @@ public class ProcessServiceImpl implements ProcessService { * @return process instance */ @Override - public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { + public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, + int testFlag) { return processInstanceMapper.queryLastSchedulerProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime(), @@ -2535,6 +2542,7 @@ public class ProcessServiceImpl implements ProcessService { taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime()); updateTaskDefinitionLogs.add(taskDefinitionLog); } + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) { List taskDefinitionCodes = updateTaskDefinitionLogs .stream() @@ -2554,15 +2562,24 @@ public class ProcessServiceImpl implements ProcessService { // for each taskDefinitionLog, we will insert a new version into db // and update the origin one if exist - int updateResult = updateTaskDefinitionLogs.size(); - int insertResult = newTaskDefinitionLogs.size(); - if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { - insertResult = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs); + int updateResult = 0; + int insertResult = 0; + if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) { + insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); + } + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) { + insertResult += taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs); } if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); } + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { + for (TaskDefinitionLog taskDefinitionLog : updateTaskDefinitionLogs) { + updateResult += taskDefinitionMapper.updateById(taskDefinitionLog); + } + } + return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS; } @@ -3157,7 +3174,8 @@ public class ProcessServiceImpl implements ProcessService { ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null); if (processInstance != null && (processInstance.getState().isFailure() || processInstance.getState().isStop())) { - List validTaskList = findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); + List validTaskList = + findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()); List instanceTaskCodeList = validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList()); List taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(), @@ -3183,7 +3201,8 @@ public class ProcessServiceImpl implements ProcessService { @Override public Integer queryTestDataSourceId(Integer onlineDataSourceId) { Integer testDataSourceId = dataSourceMapper.queryTestDataSourceId(onlineDataSourceId); - if (testDataSourceId!=null) return testDataSourceId; + if (testDataSourceId != null) + return testDataSourceId; return null; } }