Browse Source

[Feature][JsonSplit] modify ProcessService createTaskAndRelation (#4770)

* add task query

* modify codestyle

* add task delete/update/swich method

* add task delete/update/swich method

* codestyle

* use updateById save task definition

* modify method name

* code style

* code style

* modify ProcessService createTaskAndRelation

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 3 years ago committed by GitHub
parent
commit
069e9f980e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 78
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  2. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

78
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -44,7 +44,6 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
@ -97,8 +96,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@ -2278,11 +2277,12 @@ public class ProcessService {
/** /**
* create task definition and task relations * create task definition and task relations
*/ */
public int createTaskAndRelation(User operator, public void createTaskAndRelation(User operator,
Long projectCode, Long projectCode,
ProcessDefinition processDefinition, ProcessDefinition processDefinition,
ProcessData processData) { ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
Map<String, Long> taskNameAndCode = new HashMap<>();
for (TaskNode taskNode : taskNodeList) { for (TaskNode taskNode : taskNodeList) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName()); TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
if (taskDefinition == null) { if (taskDefinition == null) {
@ -2292,44 +2292,60 @@ public class ProcessService {
taskDefinition = new TaskDefinition(); taskDefinition = new TaskDefinition();
taskDefinition.setCode(code); taskDefinition.setCode(code);
} catch (SnowFlakeException e) { } catch (SnowFlakeException e) {
logger.error("Task code get error, ", e); throw new ServiceException("Task code get error", e);
return -1;
} }
saveTaskDefinition(operator, projectCode, taskNode, taskDefinition); saveTaskDefinition(operator, projectCode, taskNode, taskDefinition);
} else { } else {
if (isTaskOnline(taskDefinition.getCode())) { if (isTaskOnline(taskDefinition.getCode())) {
// TODO return something for fail throw new ServiceException(String.format("The task %s is on line in process", taskNode.getName()));
return -1;
} }
updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);
} }
taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode());
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode()); processTaskRelationMapper.deleteByCode(projectCode, processDefinition.getCode());
} }
// TODO parse taskNodeList for preTaskCode and postTaskCode List<ProcessTaskRelation> builderRelationList = new ArrayList<>();
List<TaskNodeRelation> taskNodeRelationList = DagHelper.getProcessDag(taskNodeList).getEdges();
Date now = new Date(); Date now = new Date();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation("",// todo relation name for (TaskNode taskNode : taskNodeList) {
processDefinition.getVersion(), List<String> depList = taskNode.getDepList();
projectCode, if (CollectionUtils.isNotEmpty(depList)) {
processDefinition.getCode(), for (String preTaskName : depList) {
0L, // todo pre task code builderRelationList.add(new ProcessTaskRelation("",// todo relation name
0L, // todo post task code processDefinition.getVersion(),
ConditionType.of(""), // todo conditionType projectCode,
"", // todo conditionParams processDefinition.getCode(),
now, taskNameAndCode.get(preTaskName),
now); taskNameAndCode.get(taskNode.getName()),
// save process task relation ConditionType.of("none"), // todo conditionType
int insert = processTaskRelationMapper.insert(processTaskRelation); taskNode.getConditionResult(),
// save process task relation log now,
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); now));
processTaskRelationLog.set(processTaskRelation); }
processTaskRelationLog.setOperator(operator.getId()); } else {
processTaskRelationLog.setOperateTime(now); builderRelationList.add(new ProcessTaskRelation("",// todo relation name
int logInsert = processTaskRelationLogMapper.insert(processTaskRelationLog); processDefinition.getVersion(),
return insert & logInsert; projectCode,
processDefinition.getCode(),
0L,
taskNameAndCode.get(taskNode.getName()),
ConditionType.of("none"), // todo conditionType
taskNode.getConditionResult(),
now,
now));
}
}
for (ProcessTaskRelation processTaskRelation : builderRelationList) {
processTaskRelationMapper.insert(processTaskRelation);
// save process task relation log
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.set(processTaskRelation);
processTaskRelationLog.setOperator(operator.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLogMapper.insert(processTaskRelationLog);
}
} }
public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { public int saveTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -439,7 +439,7 @@ public class ProcessServiceTest {
String expect = JSONUtils.toJsonString(exceptProcessData); String expect = JSONUtils.toJsonString(exceptProcessData);
String oldJson = JSONUtils.toJsonString(oldProcessData); String oldJson = JSONUtils.toJsonString(oldProcessData);
Assert.assertEquals(expect, processService.changeJson(newProcessData,oldJson)); Assert.assertEquals(expect, processService.changeJson(newProcessData, oldJson));
} }
} }

Loading…
Cancel
Save