From 6033e98a2d2668193e1a08003d8b0618ed6dc088 Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 28 Dec 2021 10:10:11 +0800 Subject: [PATCH] [cherry-pick-2.0.2][ApiServer] workflow copy (#7657) * fix 7597 (#7598) * cherry-pick #7647 Co-authored-by: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> --- .../dolphinscheduler/api/enums/Status.java | 6 +++-- .../impl/ProcessDefinitionServiceImpl.java | 23 +++++++++++++++---- .../impl/TaskDefinitionServiceImpl.java | 8 +++---- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index c4243f0e9e..87f658b8bf 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -267,9 +267,9 @@ public enum Status { EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), - TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"), + TASK_DEFINE_NOT_EXIST(50030, "task definition [{0}] does not exist", "任务定义[{0}]不存在"), CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"), - PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"), + PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation [{0}] does not exist", "工作流任务关系[{0}]不存在"), PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"), CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"), @@ -292,6 +292,8 @@ public enum Status { MAIN_TABLE_USING_VERSION(50053, "the version that the master table is using", "主表正在使用该版本"), PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"), DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"), + NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"), + NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy", "不支持复制的任务类型[{}]"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 9dff036c69..237148cd47 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; @@ -1308,10 +1309,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch move process definition - * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * Will be deleted + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1378,6 +1379,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); processDefinition.setProjectCode(targetProjectCode); if (isCopy) { + List taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations); + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType()) + || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType()) + || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) { + putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType()); + throw new ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE); + } + taskDefinitionLog.setCode(0L); + taskDefinitionLog.setVersion(0); + taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); + } try { processDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); } catch (CodeGenerateException e) { @@ -1388,7 +1401,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setUserId(loginUser.getId()); processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); try { - result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, Lists.newArrayList())); + result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs)); } catch (Exception e) { putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 97ecb6ce39..3277c7ac38 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -247,15 +247,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - if (processService.isTaskOnline(taskCode)) { - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE); - return result; - } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } + if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { + putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); + return result; + } TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); if (taskDefinitionToUpdate == null) { logger.error("taskDefinitionJson is not valid json");