From f4babb773ed6bf24acd353008d5a4b4647527052 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 20 Sep 2022 22:24:11 +0800 Subject: [PATCH] Fix update TaskDefinition error (#12064) (cherry picked from commit 96c12f95925bde5a137be2eb6241c0d60d4106f0) --- .../impl/TaskDefinitionServiceImpl.java | 294 ++++++++++++------ .../service/process/ProcessServiceImpl.java | 18 +- 2 files changed, 220 insertions(+), 92 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 87ba643188..6494cb8bb8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -17,8 +17,16 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_CREATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_DELETE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION_UPDATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; +import org.apache.dolphinscheduler.api.permission.PermissionCheck; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -46,7 +54,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; -import org.apache.dolphinscheduler.api.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; @@ -73,8 +80,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.google.common.collect.Lists; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*; - /** * task definition service impl */ @@ -122,8 +127,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe long projectCode, String taskDefinitionJson) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -152,7 +158,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } Map resData = new HashMap<>(); resData.put("total", taskDefinitionLogs.size()); - resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); + resData.put("code", StringUtils + .join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, resData); return result; @@ -176,8 +183,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe String taskDefinitionJsonObj, String upstreamCodes) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_CREATE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_CREATE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -207,24 +215,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } long taskCode = taskDefinition.getCode(); if (taskCode == 0) { - try { - taskCode = CodeGenerateUtils.getInstance().genCode(); - taskDefinition.setCode(taskCode); - } catch (CodeGenerateException e) { - logger.error("Task code get error, ", e); - putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, taskDefinitionJsonObj); - return result; - } + taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); } - List processTaskRelationLogList = Lists.newArrayList(); + List processTaskRelationLogList = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode) + .stream() + .map(ProcessTaskRelationLog::new) + .collect(Collectors.toList()); + if (StringUtils.isNotBlank(upstreamCodes)) { - Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + Set upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong) + .collect(Collectors.toSet()); List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); - Set queryUpStreamTaskCodes = upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); + Set queryUpStreamTaskCodes = + upstreamTaskDefinitionList.stream().map(TaskDefinition::getCode).collect(Collectors.toSet()); // upstreamTaskCodes - queryUpStreamTaskCodes - Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)).collect(Collectors.toSet()); + Set diffCode = upstreamTaskCodes.stream().filter(code -> !queryUpStreamTaskCodes.contains(code)) + .collect(Collectors.toSet()); if (!diffCode.isEmpty()) { - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(diffCode, Constants.COMMA)); + String taskCodes = StringUtils.join(diffCode, Constants.COMMA); + logger.error("Some task definitions with parameter upstreamCodes do not exist, taskDefinitionCodes:{}.", + taskCodes); + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCodes); return result; } for (TaskDefinition upstreamTask : upstreamTaskDefinitionList) { @@ -237,10 +249,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLog.setConditionParams("{}"); processTaskRelationLogList.add(processTaskRelationLog); } - List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - if (!processTaskRelationList.isEmpty()) { - processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList())); - } } else { ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); processTaskRelationLog.setPreTaskCode(0); @@ -251,18 +259,30 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe processTaskRelationLog.setConditionParams("{}"); processTaskRelationLogList.add(processTaskRelationLog); } - int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(), - processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); + int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), + processDefinition.getVersion(), + processTaskRelationLogList, Lists.newArrayList(), Boolean.TRUE); if (insertResult != Constants.EXIT_CODE_SUCCESS) { + logger.error( + "Save new version process task relations error, processDefinitionCode:{}, processDefinitionVersion:{}.", + processDefinition.getCode(), processDefinition.getVersion()); putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); - } + } else + logger.info( + "Save new version process task relations complete, processDefinitionCode:{}, processDefinitionVersion:{}.", + processDefinition.getCode(), processDefinition.getVersion()); - int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); + int saveTaskResult = + processService.saveTaskDefine(loginUser, projectCode, Lists.newArrayList(taskDefinition), Boolean.TRUE); if (saveTaskResult == Constants.DEFINITION_FAILURE) { + logger.error("Save task definition error, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskDefinition.getCode()); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); - } + } else + logger.info("Save task definition complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskDefinition.getCode()); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, taskDefinition); return result; @@ -277,10 +297,12 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @param taskName task name */ @Override - public Map queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, String taskName) { + public Map queryTaskDefinitionByName(User loginUser, long projectCode, long processCode, + String taskName) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -307,8 +329,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_DELETE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_DELETE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -322,27 +345,42 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { + logger.warn("Task definition can not be deleted due to task state online, taskDefinitionCode:{}.", + taskCode); putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode); return result; } - List processTaskRelationList = processTaskRelationMapper.queryDownstreamByTaskCode(taskCode); + List processTaskRelationList = + processTaskRelationMapper.queryDownstreamByTaskCode(taskCode); if (!processTaskRelationList.isEmpty()) { Set postTaskCodes = processTaskRelationList .stream() .map(ProcessTaskRelation::getPostTaskCode) .collect(Collectors.toSet()); - putMsg(result, Status.TASK_HAS_DOWNSTREAM, StringUtils.join(postTaskCodes, ",")); + String postTaskCodesStr = StringUtils.join(postTaskCodes, ","); + logger.warn( + "Task definition can not be deleted due to downstream tasks, taskDefinitionCode:{}, downstreamTaskCodes:{}", + taskCode, postTaskCodesStr); + putMsg(result, Status.TASK_HAS_DOWNSTREAM, postTaskCodesStr); return result; } int delete = taskDefinitionMapper.deleteByCode(taskCode); if (delete > 0) { - List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { + logger.info( + "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", + taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - List relationList = processTaskRelations.stream().filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + List relationList = processTaskRelations.stream() + .filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); updateDag(loginUser, result, processDefinitionCode, relationList, Lists.newArrayList()); } else { + logger.info("Task definition delete complete, projectCode:{}, taskDefinitionCode:{}.", projectCode, + taskCode); putMsg(result, Status.SUCCESS); } } else { @@ -352,7 +390,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - private void updateDag(User loginUser, Map result, long processDefinitionCode, List processTaskRelationList, + private void updateDag(User loginUser, Map result, long processDefinitionCode, + List processTaskRelationList, List taskDefinitionLogs) { ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); if (processDefinition == null) { @@ -360,15 +399,27 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion <= 0) { + logger.error("Update process definition error, projectCode:{}, processDefinitionCode:{}.", + processDefinition.getProjectCode(), processDefinitionCode); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); - } - List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), + } else + logger.info( + "Save new version process definition complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", + processDefinition.getProjectCode(), processDefinitionCode, insertVersion); + List relationLogs = + processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, relationLogs, taskDefinitionLogs, Boolean.TRUE); if (insertResult == Constants.EXIT_CODE_SUCCESS) { + logger.info( + "Save new version task relations complete, projectCode:{}, processDefinitionCode:{}, newVersion:{}.", + processDefinition.getProjectCode(), processDefinitionCode, insertVersion); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); } else { + logger.error("Update task relations error, projectCode:{}, processDefinitionCode:{}.", + processDefinition.getProjectCode(), processDefinitionCode); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } @@ -384,27 +435,36 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe */ @Transactional @Override - public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) { + public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, + String taskDefinitionJsonObj) { Map result = new HashMap<>(); - TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + TaskDefinitionLog taskDefinitionToUpdate = + updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); if (taskDefinitionToUpdate == null) { return result; } - List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { + logger.info( + "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", + taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, + Lists.newArrayList(taskDefinitionToUpdate)); } result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); return result; } - private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map result) { + private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, + Map result) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_UPDATE)); + // check user access for project + result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_UPDATE)); if (result.get(Constants.STATUS) != Status.SUCCESS) { return null; } @@ -416,11 +476,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { // if stream, can update task definition without online check if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) { + logger.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.", + TaskExecuteType.STREAM, taskCode); putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); return null; } } - TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + TaskDefinitionLog taskDefinitionToUpdate = + JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); if (taskDefinition.equals(taskDefinitionToUpdate)) { return null; } @@ -434,12 +497,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe .taskParams(taskDefinitionToUpdate.getTaskParams()) .dependence(taskDefinitionToUpdate.getDependence()) .build())) { - logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName()); + logger.warn("Task definition parameters are invalid, taskDefinitionName:{}.", + taskDefinitionToUpdate.getName()); putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); return null; } Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); if (version == null || version == 0) { + logger.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.", + taskCode); putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); return null; } @@ -456,11 +522,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskDefinitionToUpdate.setOperator(loginUser.getId()); taskDefinitionToUpdate.setOperateTime(now); taskDefinitionToUpdate.setCreateTime(now); + taskDefinitionToUpdate.setId(null); int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); if ((update & insert) != 1) { + logger.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.", + projectCode, taskCode); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); - } + } else + logger.info( + "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.", + projectCode, taskCode, taskDefinitionToUpdate.getVersion()); return taskDefinitionToUpdate; } @@ -475,17 +547,22 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @return update result code */ @Override - public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) { + public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, + String taskDefinitionJsonObj, String upstreamCodes) { Map result = new HashMap<>(); - TaskDefinitionLog taskDefinitionToUpdate = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + TaskDefinitionLog taskDefinitionToUpdate = + updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); if (result.get(Constants.STATUS) != Status.SUCCESS && taskDefinitionToUpdate == null) { return result; } - List upstreamTaskRelations = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); - Set upstreamCodeSet = upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet()); + List upstreamTaskRelations = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + Set upstreamCodeSet = + upstreamTaskRelations.stream().map(ProcessTaskRelation::getPreTaskCode).collect(Collectors.toSet()); Set upstreamTaskCodes = Collections.emptySet(); if (StringUtils.isNotEmpty(upstreamCodes)) { - upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); + upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong) + .collect(Collectors.toSet()); } if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) { putMsg(result, Status.SUCCESS); @@ -498,11 +575,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Map queryUpStreamTaskCodeMap; if (!upstreamTaskCodes.isEmpty()) { List upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes); - queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); + queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream() + .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition)); // upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet()); if (!upstreamTaskCodes.isEmpty()) { - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(upstreamTaskCodes, Constants.COMMA)); + String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA); + logger.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.", + notExistTaskCodes); + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes); return result; } } else { @@ -510,12 +591,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } if (!upstreamTaskRelations.isEmpty()) { ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); List processTaskRelationList = Lists.newArrayList(processTaskRelations); List relationList = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { if (processTaskRelation.getPostTaskCode() == taskCode) { - if (queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) && processTaskRelation.getPreTaskCode() != 0L) { + if (queryUpStreamTaskCodeMap.containsKey(processTaskRelation.getPreTaskCode()) + && processTaskRelation.getPreTaskCode() != 0L) { queryUpStreamTaskCodeMap.remove(processTaskRelation.getPreTaskCode()); } else { processTaskRelation.setPreTaskCode(0L); @@ -533,14 +616,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (queryUpStreamTaskCodeMap.isEmpty() && !processTaskRelationList.isEmpty()) { processTaskRelationList.add(processTaskRelationList.get(0)); } - updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); + updateDag(loginUser, result, taskRelation.getProcessDefinitionCode(), processTaskRelations, + Lists.newArrayList(taskDefinitionToUpdate)); } + logger.info( + "Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.", + projectCode, taskCode, upstreamTaskCodes); result.put(Constants.DATA_LIST, taskCode); putMsg(result, Status.SUCCESS); return result; } - /** * switch task definition * @@ -553,12 +639,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map switchVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project,projectCode,WORKFLOW_SWITCH_TO_THIS_VERSION); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_SWITCH_TO_THIS_VERSION); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } if (processService.isTaskOnline(taskCode)) { + logger.warn( + "Task definition version can not be switched due to process definition is {}, taskDefinitionCode:{}.", + ReleaseState.ONLINE.getDescp(), taskCode); putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE); return result; } @@ -567,18 +657,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); return result; } - TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); + TaskDefinitionLog taskDefinitionUpdate = + taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); taskDefinitionUpdate.setUserId(loginUser.getId()); taskDefinitionUpdate.setUpdateTime(new Date()); taskDefinitionUpdate.setId(taskDefinition.getId()); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); if (switchVersion > 0) { - List taskRelationList = processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + List taskRelationList = + processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); if (!taskRelationList.isEmpty()) { + logger.info( + "Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", + taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); - updateDag(loginUser, result, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate)); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + updateDag(loginUser, result, processDefinitionCode, processTaskRelations, + Lists.newArrayList(taskDefinitionUpdate)); } else { + logger.info( + "Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}.", + version, taskCode); putMsg(result, Status.SUCCESS); } } else { @@ -596,7 +696,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); // check user access for project - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_VERSION_VIEW); + Map checkResult = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_VERSION_VIEW); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { putMsg(result, resultStatus); @@ -604,7 +705,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); Page page = new Page<>(pageNo, pageSize); - IPage taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, projectCode); + IPage taskDefinitionVersionsPaging = + taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, projectCode); List taskDefinitionLogs = taskDefinitionVersionsPaging.getRecords(); pageInfo.setTotalList(taskDefinitionLogs); @@ -617,8 +719,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map deleteByCodeAndVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION_DELETE); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION_DELETE); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -628,11 +731,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); } else { if (taskDefinition.getVersion() == version) { + logger.warn( + "Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.", + projectCode, taskCode, version); putMsg(result, Status.MAIN_TABLE_USING_VERSION); return result; } int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version); if (delete > 0) { + logger.info( + "Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}.", + projectCode, taskCode, version); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.DELETE_TASK_DEFINITION_VERSION_ERROR); @@ -644,8 +753,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe @Override public Map queryTaskDefinitionDetail(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); + // check user access for project + Map result = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -671,8 +781,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe Integer pageSize) { Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode,TASK_DEFINITION); + // check user access for project + Map checkResult = + projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_DEFINITION); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { putMsg(result, resultStatus); @@ -680,8 +791,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } taskType = taskType == null ? StringUtils.EMPTY : taskType; Page page = new Page<>(pageNo, pageSize); - IPage taskMainInfoIPage = taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName, - searchTaskName, taskType, taskExecuteType); + IPage taskMainInfoIPage = + taskDefinitionMapper.queryDefineListPaging(page, projectCode, searchWorkflowName, + searchTaskName, taskType, taskExecuteType); List records = taskMainInfoIPage.getRecords(); if (!records.isEmpty()) { Map taskMainInfoMap = new HashMap<>(); @@ -747,10 +859,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe */ @Transactional @Override - public Map releaseTaskDefinition(User loginUser, long projectCode, long code, ReleaseState releaseState) { + public Map releaseTaskDefinition(User loginUser, long projectCode, long code, + ReleaseState releaseState) { Project project = projectMapper.queryByCode(projectCode); - //check user access for project - Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode,null); + // check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null); Status resultStatus = (Status) result.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { return result; @@ -764,7 +877,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); return result; } - TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion()); + TaskDefinitionLog taskDefinitionLog = + taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion()); if (taskDefinitionLog == null) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); return result; @@ -777,8 +891,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe case ONLINE: String resourceIds = taskDefinition.getResourceIds(); if (StringUtils.isNotBlank(resourceIds)) { - Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); - PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); + Integer[] resourceIdArray = + Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); + PermissionCheck permissionCheck = new PermissionCheck(AuthorizationType.RESOURCE_FILE_ID, + processService, resourceIdArray, loginUser.getId(), logger); try { permissionCheck.checkPermission(); } catch (Exception e) { @@ -800,6 +916,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } + logger.error("Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}.", + code); putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 6c0bdb6d99..aa0398b209 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2516,6 +2516,7 @@ public class ProcessServiceImpl implements ProcessService { taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime()); updateTaskDefinitionLogs.add(taskDefinitionLog); } + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) { List taskDefinitionCodes = updateTaskDefinitionLogs .stream() @@ -2535,15 +2536,24 @@ public class ProcessServiceImpl implements ProcessService { // for each taskDefinitionLog, we will insert a new version into db // and update the origin one if exist - int updateResult = updateTaskDefinitionLogs.size(); - int insertResult = newTaskDefinitionLogs.size(); - if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { - insertResult = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs); + int updateResult = 0; + int insertResult = 0; + if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) { + insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); + } + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) { + insertResult += taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs); } if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); } + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { + for (TaskDefinitionLog taskDefinitionLog : updateTaskDefinitionLogs) { + updateResult += taskDefinitionMapper.updateById(taskDefinitionLog); + } + } + return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS; }