From 069e9f980ead8c3f35259a1df5e39fcd45ebb0b4 Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Fri, 12 Feb 2021 08:43:44 +0800 Subject: [PATCH] [Feature][JsonSplit] modify ProcessService createTaskAndRelation (#4770) * add task query * modify codestyle * add task delete/update/swich method * add task delete/update/swich method * codestyle * use updateById save task definition * modify method name * code style * code style * modify ProcessService createTaskAndRelation Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../service/process/ProcessService.java | 78 +++++++++++-------- .../service/process/ProcessServiceTest.java | 2 +- 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index b6a6026879..e47a5ac567 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -97,8 +96,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -2278,11 +2277,12 @@ public class ProcessService { /** * create task definition and task relations */ - public int createTaskAndRelation(User operator, - Long projectCode, - ProcessDefinition processDefinition, - ProcessData processData) { + public void createTaskAndRelation(User operator, + Long projectCode, + ProcessDefinition processDefinition, + ProcessData processData) { List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); + Map taskNameAndCode = new HashMap<>(); for (TaskNode taskNode : taskNodeList) { TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName()); if (taskDefinition == null) { @@ -2292,44 +2292,60 @@ public class ProcessService { taskDefinition = new TaskDefinition(); taskDefinition.setCode(code); } catch (SnowFlakeException e) { - logger.error("Task code get error, ", e); - return -1; + throw new ServiceException("Task code get error", e); } saveTaskDefinition(operator, projectCode, taskNode, taskDefinition); } else { if (isTaskOnline(taskDefinition.getCode())) { - // TODO return something for fail - return -1; + throw new ServiceException(String.format("The task %s is on line in process", taskNode.getName())); } updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); } + taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode()); } List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); if (!processTaskRelationList.isEmpty()) { processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode()); } - // TODO parse taskNodeList for preTaskCode and postTaskCode - List taskNodeRelationList = DagHelper.getProcessDag(taskNodeList).getEdges(); + List builderRelationList = new ArrayList<>(); Date now = new Date(); - ProcessTaskRelation processTaskRelation = new ProcessTaskRelation("",// todo relation name - processDefinition.getVersion(), - projectCode, - processDefinition.getCode(), - 0L, // todo pre task code - 0L, // todo post task code - ConditionType.of(""), // todo conditionType - "", // todo conditionParams - now, - now); - // save process task relation - int insert = processTaskRelationMapper.insert(processTaskRelation); - // save process task relation log - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.set(processTaskRelation); - processTaskRelationLog.setOperator(operator.getId()); - processTaskRelationLog.setOperateTime(now); - int logInsert = processTaskRelationLogMapper.insert(processTaskRelationLog); - return insert & logInsert; + for (TaskNode taskNode : taskNodeList) { + List depList = taskNode.getDepList(); + if (CollectionUtils.isNotEmpty(depList)) { + for (String preTaskName : depList) { + builderRelationList.add(new ProcessTaskRelation("",// todo relation name + processDefinition.getVersion(), + projectCode, + processDefinition.getCode(), + taskNameAndCode.get(preTaskName), + taskNameAndCode.get(taskNode.getName()), + ConditionType.of("none"), // todo conditionType + taskNode.getConditionResult(), + now, + now)); + } + } else { + builderRelationList.add(new ProcessTaskRelation("",// todo relation name + processDefinition.getVersion(), + projectCode, + processDefinition.getCode(), + 0L, + taskNameAndCode.get(taskNode.getName()), + ConditionType.of("none"), // todo conditionType + taskNode.getConditionResult(), + now, + now)); + } + } + for (ProcessTaskRelation processTaskRelation : builderRelationList) { + processTaskRelationMapper.insert(processTaskRelation); + // save process task relation log + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.set(processTaskRelation); + processTaskRelationLog.setOperator(operator.getId()); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLogMapper.insert(processTaskRelationLog); + } } public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 551c9bb09c..b6d518c9b4 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -439,7 +439,7 @@ public class ProcessServiceTest { String expect = JSONUtils.toJsonString(exceptProcessData); String oldJson = JSONUtils.toJsonString(oldProcessData); - Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson)); + Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson)); } }