Browse Source

[cherry-pick-2.0.2][ApiServer] workflow copy (#7657)

* fix 7597 (#7598)

* cherry-pick #7647

Co-authored-by: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com>
2.0.7-release
wind 3 years ago committed by GitHub
parent
commit
6033e98a2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 23
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -267,9 +267,9 @@ public enum Status {
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"),
TASK_DEFINE_NOT_EXIST(50030, "task definition [{0}] does not exist", "任务定义[{0}]不存在"),
CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation [{0}] does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
PROCESS_DAG_IS_EMPTY(50035, "process dag is empty", "工作流dag是空"),
CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"),
@ -292,6 +292,8 @@ public enum Status {
MAIN_TABLE_USING_VERSION(50053, "the version that the master table is using", "主表正在使用该版本"),
PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"),
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy", "不支持复制的任务类型[{}]"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/**

23
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
@ -1308,10 +1309,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch move process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* Will be deleted
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@ -1378,6 +1379,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
if (isCopy) {
List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) {
putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType());
throw new ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE);
}
taskDefinitionLog.setCode(0L);
taskDefinitionLog.setVersion(0);
taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
}
try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
@ -1388,7 +1401,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setUserId(loginUser.getId());
processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
try {
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, Lists.newArrayList()));
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs));
} catch (Exception e) {
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -247,15 +247,15 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
if (processService.isTaskOnline(taskCode)) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
return result;
}
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinitionToUpdate == null) {
logger.error("taskDefinitionJson is not valid json");

Loading…
Cancel
Save