Browse Source

[Feature][JsonSplit] add gen taskNodeList method (#4861)

* add gen taskNodeList method

* modify code style

* modify code style

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
a32b40ee95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 66
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/PreviousTaskNode.java
  2. 28
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  3. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  4. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  5. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  6. 149
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

66
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/PreviousTaskNode.java

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.model;
public class PreviousTaskNode {
/**
* code
*/
private long code;
/**
* name
*/
private String name;
/**
* version
*/
private int version;
public PreviousTaskNode(long code, String name, int version) {
this.code = code;
this.name = name;
this.version = version;
}
public long getCode() {
return code;
}
public void setCode(long code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
}

28
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -22,18 +22,15 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
public class TaskNode {
@ -101,6 +98,11 @@ public class TaskNode {
@JsonSerialize(using = JSONUtils.JsonDataSerializer.class)
private String preTasks;
/**
* node dependency list
*/
private List<PreviousTaskNode> preTaskNodeList;
/**
* users store additional information
*/
@ -197,7 +199,7 @@ public class TaskNode {
return preTasks;
}
public void setPreTasks(String preTasks) throws IOException {
public void setPreTasks(String preTasks) {
this.preTasks = preTasks;
this.depList = JSONUtils.toList(preTasks, String.class);
}
@ -214,7 +216,7 @@ public class TaskNode {
return depList;
}
public void setDepList(List<String> depList) throws JsonProcessingException {
public void setDepList(List<String> depList) {
this.depList = depList;
this.preTasks = JSONUtils.toJsonString(depList);
}
@ -373,6 +375,14 @@ public class TaskNode {
return TaskType.CONDITIONS.toString().equalsIgnoreCase(this.getType());
}
public List<PreviousTaskNode> getPreTaskNodeList() {
return preTaskNodeList;
}
public void setPreTaskNodeList(List<PreviousTaskNode> preTaskNodeList) {
this.preTaskNodeList = preTaskNodeList;
}
@Override
public String toString() {
return "TaskNode{"

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -167,6 +167,14 @@ public class TaskDefinition {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
public TaskDefinition() {
}
public TaskDefinition(long code, int version) {
this.code = code;
this.version = version;
}
public String getName() {
return name;
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -38,7 +40,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @return task definition log list
*/
List<TaskDefinitionLog> queryByDefinitionName(@Param("projectCode") Long projectCode,
@Param("taskDefinitionName") String name);
@Param("taskDefinitionName") String name);
/**
* query task definition log list
@ -57,4 +59,12 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
*/
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long taskDefinitionCode,
@Param("version") int version);
/**
* query task definition log
*
* @param taskDefinitions taskDefinition collection
* @return task definition log
*/
List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions") Collection<TaskDefinition> taskDefinitions);
}

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -47,4 +47,17 @@
WHERE code = #{taskDefinitionCode}
and version = #{version}
</select>
<select id="queryByTaskDefinitions" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_task_definition_log
WHERE 1 = 1
<if test="taskDefinitions != null and taskDefinitions.length != 0">
and
<foreach collection="taskDefinitions" index="index" item="item" open="(" separator=" or " close=")">
(code = #{item.code}
and version = #{item.version})
</foreach>
</if>
</select>
</mapper>

149
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -46,13 +46,14 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.PreviousTaskNode;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -499,7 +500,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentId parentId
* @param ids ids
* @param ids ids
*/
public void recurseFindSubProcessId(int parentId, List<Integer> ids) {
List<TaskNode> taskNodeList = this.getTaskNodeListByDefinitionId(parentId);
@ -524,7 +525,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand
* @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
@ -576,7 +577,7 @@ public class ProcessService {
/**
* get schedule time from command
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return date
*/
@ -592,8 +593,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
* @param command command
* @param cmdParam cmdParam map
* @param command command
* @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
@ -667,7 +668,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
* @param userId userId
* @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
@ -690,7 +691,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
@ -710,7 +711,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
* @param host host
* @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host) {
@ -868,7 +869,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
* @param command command
* @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
@ -883,8 +884,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
* @param processInstance processInstance
* @param cmdParam cmdParam
* @param processInstance processInstance
* @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
@ -956,7 +957,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
* @param subGlobalParams subGlobalParams
* @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
@ -1026,7 +1027,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
@ -1055,7 +1056,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
@ -1081,7 +1082,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
* @param task task
* @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
if (!task.isSubProcess()) {
@ -1180,7 +1181,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
* @param childDefinitionId childDefinitionId
* @param childDefinitionId childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, int childDefinitionId) {
ProcessDefinition fatherDefinition = this.findProcessDefineById(parentProcessInstance.getProcessDefinitionId());
@ -1194,7 +1195,7 @@ public class ProcessService {
/**
* submit task to mysql
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
@ -1248,7 +1249,7 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstanceState processInstanceState
* @return process instance state
*/
@ -1424,7 +1425,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
* @param state state
* @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
@ -1479,7 +1480,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
@ -1501,7 +1502,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
@ -1533,12 +1534,12 @@ public class ProcessService {
/**
* change task state
*
* @param state state
* @param startTime startTime
* @param host host
* @param state state
* @param startTime startTime
* @param host host
* @param executePath executePath
* @param logPath logPath
* @param taskInstId taskInstId
* @param logPath logPath
* @param taskInstId taskInstId
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state, Date startTime, String host,
String executePath,
@ -1566,12 +1567,12 @@ public class ProcessService {
* update the process instance
*
* @param processInstanceId processInstanceId
* @param processJson processJson
* @param globalParams globalParams
* @param scheduleTime scheduleTime
* @param flag flag
* @param locations locations
* @param connects connects
* @param processJson processJson
* @param globalParams globalParams
* @param scheduleTime scheduleTime
* @param flag flag
* @param locations locations
* @param connects connects
* @return update process instance result
*/
public int updateProcessInstance(Integer processInstanceId, String processJson,
@ -1592,10 +1593,10 @@ public class ProcessService {
/**
* change task state
*
* @param state state
* @param endTime endTime
* @param state state
* @param endTime endTime
* @param taskInstId taskInstId
* @param varPool varPool
* @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
@ -1763,7 +1764,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
@ -1800,7 +1801,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
* @param resName resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
@ -1824,9 +1825,9 @@ public class ProcessService {
/**
* get dependency cycle by work process define id and scheduler fire time
*
* @param masterId masterId
* @param masterId masterId
* @param processDefinitionId processDefinitionId
* @param scheduledFireTime the time the task schedule is expected to trigger
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency
* @throws Exception if error throws Exception
*/
@ -1839,8 +1840,8 @@ public class ProcessService {
/**
* get dependency cycle list by work process define id list and scheduler fire time
*
* @param masterId masterId
* @param ids ids
* @param masterId masterId
* @param ids ids
* @param scheduledFireTime the time the task schedule is expected to trigger
* @return CycleDependency list
* @throws Exception if error throws Exception
@ -1935,8 +1936,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionId process definition id
* @param startTime start time
* @param endTime end time
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
@ -2036,7 +2037,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
* @param userId user id
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
@ -2227,7 +2228,6 @@ public class ProcessService {
/**
* save processDefinition (including create or update processDefinition)
*
*/
public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations,
String connects, ProcessData processData, ProcessDefinition processDefinition) {
@ -2403,10 +2403,6 @@ public class ProcessService {
/**
* get process task relation list
* this function can be query relation list from log record
*
* @param processCode
* @param processVersion
* @return
*/
public List<ProcessTaskRelation> getProcessTaskRelationList(Long processCode, int processVersion) {
List<ProcessTaskRelationLog> taskRelationLogs = processTaskRelationLogMapper.queryByProcessCodeAndVersion(
@ -2419,4 +2415,57 @@ public class ProcessService {
return processTaskRelations;
}
public List<TaskNode> genTaskNodeList(Long processCode, int processVersion) {
List<ProcessTaskRelation> processTaskRelations = this.getProcessTaskRelationList(processCode, processVersion);
Set<TaskDefinition> taskDefinitionSet = new HashSet<>();
Map<Long, TaskNode> taskNodeMap = new HashMap<>();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreNodeVersion()));
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionSet.add(new TaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostNodeVersion()));
}
taskNodeMap.compute(processTaskRelation.getPostTaskCode(), (k, v) -> {
if (v == null) {
v = new TaskNode();
v.setCode(processTaskRelation.getPostTaskCode());
v.setVersion(processTaskRelation.getPostNodeVersion());
v.setConditionResult(processTaskRelation.getConditionParams());
List<PreviousTaskNode> preTaskNodeList = new ArrayList<>();
if (processTaskRelation.getPreTaskCode() > 0) {
preTaskNodeList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreNodeVersion()));
}
v.setPreTaskNodeList(preTaskNodeList);
} else {
List<PreviousTaskNode> preTaskDefinitionList = v.getPreTaskNodeList();
preTaskDefinitionList.add(new PreviousTaskNode(processTaskRelation.getPreTaskCode(), "", processTaskRelation.getPreNodeVersion()));
}
return v;
});
}
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = taskDefinitionLogs.stream().collect(Collectors.toMap(TaskDefinitionLog::getCode, log -> log));
taskNodeMap.forEach((k, v) -> {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMap.get(k);
v.setId("task-" + taskDefinitionLog.getId());
v.setCode(taskDefinitionLog.getCode());
v.setName(taskDefinitionLog.getName());
v.setDesc(taskDefinitionLog.getDescription());
v.setType(taskDefinitionLog.getTaskType().getDescp());
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : "NORMAL");
v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
v.setParams(taskDefinitionLog.getTaskParams());
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTaskTimeoutStrategy(),
taskDefinitionLog.getTimeout())));
// TODO name will be remove
v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));
v.setPreTasks(StringUtils.join(v.getPreTaskNodeList().stream().map(PreviousTaskNode::getName).collect(Collectors.toList()), ","));
});
return new ArrayList<>(taskNodeMap.values());
}
}

Loading…
Cancel
Save