Browse Source

[Bug][ApiServer] workflow copy (#7694)

* fix workflow copy

* fix copy

* fix copy

* code style
2.0.7-release
JinYong Li 3 years ago committed by GitHub
parent
commit
890faffd22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 46
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -293,7 +293,7 @@ public enum Status {
PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"), PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"),
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"), DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"), NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{}] does not support copy", "不支持复制的任务类型[{}]"), NOT_SUPPORT_COPY_TASK_TYPE(50057, "task type [{0}] does not support copy", "不支持复制的任务类型[{0}]"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
/** /**

46
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -76,8 +76,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.yetus.audience.InterfaceAudience.Public;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
@ -108,6 +106,7 @@ import org.springframework.web.multipart.MultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -1099,13 +1098,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result; return result;
} }
HashMap<Long, Project> userProjects = new HashMap(Constants.DEFAULT_HASH_MAP_SIZE); HashMap<Long, Project> userProjects = new HashMap<>(Constants.DEFAULT_HASH_MAP_SIZE);
projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId()) projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())
.forEach(userProject -> userProjects.put(userProject.getCode(), userProject)); .forEach(userProject -> userProjects.put(userProject.getCode(), userProject));
// check processDefinition exist in project // check processDefinition exist in project
List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream(). List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream()
filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList()); .filter(o -> userProjects.containsKey(o.getProjectCode())).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processDefinitionListInProject)) { if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result; return result;
@ -1309,6 +1308,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
List<String> failedProcessList = new ArrayList<>(); List<String> failedProcessList = new ArrayList<>();
doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true); doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true);
if (result.get(Constants.STATUS) == Status.NOT_SUPPORT_COPY_TASK_TYPE) {
return result;
}
checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true); checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true);
return result; return result;
} }
@ -1386,18 +1388,35 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode); processDefinition.setProjectCode(targetProjectCode);
if (isCopy) { if (isCopy) {
List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations); List<TaskDefinitionLog> taskDefinitionLogs = processService.genTaskDefineList(processTaskRelations);
Map<Long, Long> taskCodeMap = new HashMap<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType()) if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType()) || TaskType.SWITCH.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())) { || TaskType.SUB_PROCESS.getDesc().equals(taskDefinitionLog.getTaskType())
|| TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) {
putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType()); putMsg(result, Status.NOT_SUPPORT_COPY_TASK_TYPE, taskDefinitionLog.getTaskType());
throw new ServiceException(Status.NOT_SUPPORT_COPY_TASK_TYPE); return;
}
try {
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskCodeMap.put(taskDefinitionLog.getCode(), taskCode);
taskDefinitionLog.setCode(taskCode);
} catch (CodeGenerateException e) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
throw new ServiceException(Status.INTERNAL_SERVER_ERROR_ARGS);
} }
taskDefinitionLog.setProjectCode(targetProjectCode); taskDefinitionLog.setProjectCode(targetProjectCode);
taskDefinitionLog.setCode(0L);
taskDefinitionLog.setVersion(0); taskDefinitionLog.setVersion(0);
taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); taskDefinitionLog.setName(taskDefinitionLog.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
} }
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (processTaskRelationLog.getPreTaskCode() > 0) {
processTaskRelationLog.setPreTaskCode(taskCodeMap.get(processTaskRelationLog.getPreTaskCode()));
}
if (processTaskRelationLog.getPostTaskCode() > 0) {
processTaskRelationLog.setPostTaskCode(taskCodeMap.get(processTaskRelationLog.getPostTaskCode()));
}
}
try { try {
processDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); processDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) { } catch (CodeGenerateException e) {
@ -1407,6 +1426,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setId(0); processDefinition.setId(0);
processDefinition.setUserId(loginUser.getId()); processDefinition.setUserId(loginUser.getId());
processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
if (StringUtils.isNotBlank(processDefinition.getLocations())) {
ArrayNode jsonNodes = JSONUtils.parseArray(processDefinition.getLocations());
for (int i = 0; i < jsonNodes.size(); i++) {
ObjectNode node = (ObjectNode) jsonNodes.path(i);
node.put("taskCode", taskCodeMap.get(node.get("taskCode").asLong()));
jsonNodes.set(i, node);
}
processDefinition.setLocations(JSONUtils.toJsonString(jsonNodes));
}
try { try {
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs)); result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs));
} catch (Exception e) { } catch (Exception e) {

Loading…
Cancel
Save