From e6d94632b9b98987940c628e2ee2b5f62f3e6fe3 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 14 Jul 2023 18:40:01 +0800 Subject: [PATCH] Add WorkflowExecuteContext (#14544) --- .../api/dto/gantt/GanttDto.java | 48 +- .../impl/ProcessDefinitionServiceImpl.java | 22 +- .../impl/ProcessInstanceServiceImpl.java | 12 +- .../service/ProcessInstanceServiceTest.java | 6 +- .../common/model/TaskNodeRelation.java | 60 +- .../event/WorkflowStartEventHandler.java | 11 +- .../event/WorkflowStateEventHandler.java | 3 +- .../WorkflowTimeoutStateEventHandler.java | 3 +- .../server/master/graph/IWorkflowGraph.java | 33 ++ .../server/master/graph/WorkflowGraph.java | 69 +++ .../master/graph/WorkflowGraphFactory.java | 132 +++++ .../master/runner/EventExecuteService.java | 3 +- .../runner/IWorkflowExecuteContext.java | 33 ++ .../runner/IWorkflowExecuteRunnable.java | 32 + .../runner/MasterSchedulerBootstrap.java | 3 +- .../runner/StateWheelExecuteThread.java | 6 +- .../master/runner/WorkflowExecuteContext.java | 54 ++ .../runner/WorkflowExecuteContextFactory.java | 99 ++++ .../runner/WorkflowExecuteRunnable.java | 555 ++++++++---------- .../WorkflowExecuteRunnableFactory.java | 49 +- .../runner/WorkflowExecuteThreadPool.java | 31 +- ...itStatus.java => WorkflowStartStatus.java} | 2 +- .../DefaultTaskExecuteRunnableFactory.java | 2 +- .../task/blocking/BlockingLogicTask.java | 3 +- .../task/switchtask/SwitchLogicTask.java | 6 +- .../master/service/ExecutingService.java | 3 +- .../master/service/WorkerFailoverService.java | 3 +- ...ocessInstanceExecCacheManagerImplTest.java | 8 +- .../runner/WorkflowExecuteRunnableTest.java | 58 +- .../master/service/FailoverServiceTest.java | 6 +- .../service/model/TaskNode.java | 8 +- .../service/process/ProcessService.java | 2 +- .../service/process/ProcessServiceImpl.java | 2 +- .../service/utils/DagHelper.java | 170 +++--- .../service/process/ProcessServiceTest.java | 2 +- .../service/utils/DagHelperTest.java | 184 +++--- .../plugin/task/api/model/SwitchResultVo.java | 12 +- .../api/parameters/ConditionsParameters.java | 39 +- .../task/api/parameters/SwitchParameters.java | 14 +- 39 files changed, 1035 insertions(+), 753 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/IWorkflowGraph.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraph.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraphFactory.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteRunnable.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/{WorkflowSubmitStatus.java => WorkflowStartStatus.java} (96%) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/gantt/GanttDto.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/gantt/GanttDto.java index 401e3f6319..06edac8fd5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/gantt/GanttDto.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/gantt/GanttDto.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.dto.gantt; import java.util.ArrayList; @@ -21,9 +22,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -/** - * gantt DTO - */ +import lombok.Data; + +@Data public class GanttDto { /** @@ -37,9 +38,9 @@ public class GanttDto { private List tasks = new ArrayList<>(); /** - * task name list + * task code list */ - private List taskNames; + private List taskNames; /** * task status map @@ -50,48 +51,19 @@ public class GanttDto { this.taskStatus = new HashMap<>(); taskStatus.put("success", "success"); } - public GanttDto(int height, List tasks, List taskNames) { + + public GanttDto(int height, List tasks, List taskNames) { this(); this.height = height; this.tasks = tasks; this.taskNames = taskNames; } - public GanttDto(int height, List tasks, List taskNames, Map taskStatus) { - this.height = height; - this.tasks = tasks; - this.taskNames = taskNames; - this.taskStatus = taskStatus; - } - public int getHeight() { - return height; - } - - public void setHeight(int height) { + public GanttDto(int height, List tasks, List taskNames, Map taskStatus) { this.height = height; - } - - public List getTasks() { - return tasks; - } - - public void setTasks(List tasks) { this.tasks = tasks; - } - - public List getTaskNames() { - return taskNames; - } - - public void setTaskNames(List taskNames) { this.taskNames = taskNames; - } - - public Map getTaskStatus() { - return taskStatus; - } - - public void setTaskStatus(Map taskStatus) { this.taskStatus = taskStatus; } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 77e82312d4..c26c9bb2c9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -1894,12 +1894,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); return result; } - DAG dag = processService.genDagGraph(processDefinition); + DAG dag = processService.genDagGraph(processDefinition); // nodes that are running - Map> runningNodeMap = new ConcurrentHashMap<>(); + Map> runningNodeMap = new ConcurrentHashMap<>(); // nodes that are waiting to run - Map> waitingRunningNodeMap = new ConcurrentHashMap<>(); + Map> waitingRunningNodeMap = new ConcurrentHashMap<>(); // List of process instances List processInstanceList = processInstanceService.queryByProcessDefineCode(code, limit); @@ -1937,16 +1937,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List parentTreeViewDtoList = new ArrayList<>(); parentTreeViewDtoList.add(parentTreeViewDto); // Here is the encapsulation task instance - for (String startNode : dag.getBeginNode()) { + for (Long startNode : dag.getBeginNode()) { runningNodeMap.put(startNode, parentTreeViewDtoList); } while (!ServerLifeCycleManager.isStopped()) { - Set postNodeList; - Iterator>> iter = runningNodeMap.entrySet().iterator(); + Set postNodeList; + Iterator>> iter = runningNodeMap.entrySet().iterator(); while (iter.hasNext()) { - Map.Entry> en = iter.next(); - String nodeCode = en.getKey(); + Map.Entry> en = iter.next(); + Long nodeCode = en.getKey(); parentTreeViewDtoList = en.getValue(); TreeViewDto treeViewDto = new TreeViewDto(); @@ -1957,8 +1957,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro // set treeViewDto instances for (int i = limit - 1; i >= 0; i--) { ProcessInstance processInstance = processInstanceList.get(i); - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), - Long.parseLong(nodeCode)); + TaskInstance taskInstance = + taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(), nodeCode); if (taskInstance == null) { treeViewDto.getInstances().add(new Instance(-1, "not running", 0, "null")); } else { @@ -1985,7 +1985,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } postNodeList = dag.getSubsequentNodes(nodeCode); if (CollectionUtils.isNotEmpty(postNodeList)) { - for (String nextNodeCode : postNodeList) { + for (Long nextNodeCode : postNodeList) { List treeViewDtoList = waitingRunningNodeMap.get(nextNodeCode); if (CollectionUtils.isEmpty(treeViewDtoList)) { treeViewDtoList = new ArrayList<>(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index f9fe36dd0d..db6c7d1ae7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -1016,22 +1016,20 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } GanttDto ganttDto = new GanttDto(); - DAG dag = processService.genDagGraph(processDefinition); + DAG dag = processService.genDagGraph(processDefinition); // topological sort - List nodeList = dag.topologicalSort(); + List nodeList = dag.topologicalSort(); ganttDto.setTaskNames(nodeList); List taskList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(nodeList)) { - List taskCodes = nodeList.stream().map(Long::parseLong).collect(Collectors.toList()); List taskInstances = taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes( - Collections.singletonList(processInstanceId), taskCodes); - for (String node : nodeList) { + Collections.singletonList(processInstanceId), nodeList); + for (Long node : nodeList) { TaskInstance taskInstance = null; for (TaskInstance instance : taskInstances) { - if (instance.getProcessInstanceId() == processInstanceId - && instance.getTaskCode() == Long.parseLong(node)) { + if (instance.getProcessInstanceId() == processInstanceId && instance.getTaskCode() == node) { taskInstance = instance; break; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 88f1bc99e5..a0d83c4649 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -745,9 +745,9 @@ public class ProcessInstanceServiceTest { processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog()); when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance); when(taskInstanceMapper.queryByInstanceIdAndName(Mockito.anyInt(), Mockito.any())).thenReturn(taskInstance); - DAG graph = new DAG<>(); - for (int i = 1; i <= 7; ++i) { - graph.addNode(i + "", new TaskNode()); + DAG graph = new DAG<>(); + for (long i = 1; i <= 7; ++i) { + graph.addNode(i, new TaskNode()); } when(processService.genDagGraph(Mockito.any(ProcessDefinition.class))) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java index 6abedd39a3..a9a232263a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNodeRelation.java @@ -17,63 +17,23 @@ package org.apache.dolphinscheduler.common.model; -import java.util.Objects; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +@Data +@AllArgsConstructor +@NoArgsConstructor public class TaskNodeRelation { /** - * task start node name + * task start node Code */ - private String startNode; + private Long startNode; /** - * task end node name + * task end node Code */ - private String endNode; + private Long endNode; - public TaskNodeRelation() { - } - - public TaskNodeRelation(String startNode, String endNode) { - this.startNode = startNode; - this.endNode = endNode; - } - - public String getStartNode() { - return startNode; - } - - public void setStartNode(String startNode) { - this.startNode = startNode; - } - - public String getEndNode() { - return endNode; - } - - public void setEndNode(String endNode) { - this.endNode = endNode; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof TaskNodeRelation)) { - return false; - } - TaskNodeRelation relation = (TaskNodeRelation) o; - return (relation.getStartNode().equals(this.startNode) && relation.getEndNode().equals(this.endNode)); - } - - @Override - public String toString() { - return "TaskNodeRelation{" - + "startNode='" + startNode + '\'' - + ", endNode='" + endNode + '\'' - + '}'; - } - - @Override - public int hashCode() { - return Objects.hash(startNode, endNode); - } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java index 2c29acd13f..2526e3f145 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStartEventHandler.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; -import org.apache.dolphinscheduler.server.master.runner.WorkflowSubmitStatus; +import org.apache.dolphinscheduler.server.master.runner.WorkflowStartStatus; import java.util.concurrent.CompletableFuture; @@ -56,17 +56,18 @@ public class WorkflowStartEventHandler implements WorkflowEventHandler { throw new WorkflowEventHandleError( "The workflow start event is invalid, cannot find the workflow instance from cache"); } - ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessInstance processInstance = + workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(); ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("submit", processInstance.getProcessDefinitionCode().toString()); CompletableFuture.supplyAsync(workflowExecuteRunnable::call, workflowExecuteThreadPool) - .thenAccept(workflowSubmitStatus -> { - if (WorkflowSubmitStatus.SUCCESS == workflowSubmitStatus) { + .thenAccept(workflowStartStatus -> { + if (WorkflowStartStatus.SUCCESS == workflowStartStatus) { log.info("Success submit the workflow instance"); if (processInstance.getTimeout() > 0) { stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } - } else if (WorkflowSubmitStatus.FAILED == workflowSubmitStatus) { + } else if (WorkflowStartStatus.FAILED == workflowStartStatus) { log.error( "Failed to submit the workflow instance, will resend the workflow start event: {}", workflowEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java index 11ad217d81..6db84dadc1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowStateEventHandler.java @@ -35,7 +35,8 @@ public class WorkflowStateEventHandler implements StateEventHandler { public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) throws StateEventHandleException { WorkflowStateEvent workflowStateEvent = (WorkflowStateEvent) stateEvent; - ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessInstance processInstance = + workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(); ProcessDefinition processDefinition = processInstance.getProcessDefinition(); measureProcessState(workflowStateEvent, processInstance.getProcessDefinitionCode().toString()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java index 3d280673a6..c4d0555c1c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/WorkflowTimeoutStateEventHandler.java @@ -33,7 +33,8 @@ public class WorkflowTimeoutStateEventHandler implements StateEventHandler { @Override public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) { log.info("Handle workflow instance timeout event"); - ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessInstance processInstance = + workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(); ProcessInstanceMetrics.incProcessInstanceByStateAndProcessDefinitionCode("timeout", processInstance.getProcessDefinitionCode().toString()); workflowExecuteRunnable.processTimeout(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/IWorkflowGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/IWorkflowGraph.java new file mode 100644 index 0000000000..51937c8501 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/IWorkflowGraph.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.graph; + +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.service.model.TaskNode; + +public interface IWorkflowGraph { + + TaskNode getTaskNodeByCode(Long taskCode); + + // todo: refactor DAG class + DAG getDag(); + + boolean isForbiddenTask(Long taskCode); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraph.java new file mode 100644 index 0000000000..708723e3a9 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraph.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.graph; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.service.model.TaskNode; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class WorkflowGraph implements IWorkflowGraph { + + private final Map taskNodeMap; + private final DAG dag; + + private final Set forbiddenTaskCodes; + + public WorkflowGraph(List taskNodes, + DAG dag) { + checkNotNull(taskNodes, "taskNodes can not be null"); + checkNotNull(dag, "dag can not be null"); + + this.taskNodeMap = taskNodes.stream().collect(Collectors.toMap(TaskNode::getCode, Function.identity())); + this.dag = dag; + forbiddenTaskCodes = + taskNodes.stream().filter(TaskNode::isForbidden).map(TaskNode::getCode).collect(Collectors.toSet()); + } + + @Override + public TaskNode getTaskNodeByCode(Long taskCode) { + TaskNode taskNode = taskNodeMap.get(taskCode); + if (taskNode == null) { + throw new IllegalArgumentException("task node not found, taskCode: " + taskCode); + } + return taskNode; + } + + @Override + public DAG getDag() { + return dag; + } + + @Override + public boolean isForbiddenTask(Long taskCode) { + return forbiddenTaskCodes.contains(taskCode); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraphFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraphFactory.java new file mode 100644 index 0000000000..f88481b596 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/graph/WorkflowGraphFactory.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.graph; + +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING; +import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES; +import static org.apache.dolphinscheduler.common.constants.Constants.COMMA; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; +import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.service.model.TaskNode; +import org.apache.dolphinscheduler.service.process.ProcessDag; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.apache.dolphinscheduler.service.utils.DagHelper; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowGraphFactory { + + @Autowired + private ProcessService processService; + + @Autowired + private TaskInstanceDao taskInstanceDao; + + @Autowired + private TaskDefinitionLogDao taskDefinitionLogDao; + + public IWorkflowGraph createWorkflowGraph(ProcessInstance workflowInstance) throws Exception { + + List processTaskRelations = + processService.findRelationByCode(workflowInstance.getProcessDefinitionCode(), + workflowInstance.getProcessDefinitionVersion()); + List taskDefinitionLogs = + taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); + List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); + + // generate process to get DAG info + List recoveryTaskNodeCodeList = getRecoveryTaskNodeCodeList(workflowInstance.getCommandParam()); + List startNodeNameList = parseStartNodeName(workflowInstance.getCommandParam()); + ProcessDag processDag = DagHelper.generateFlowDag(taskNodeList, startNodeNameList, recoveryTaskNodeCodeList, + workflowInstance.getTaskDependType()); + if (processDag == null) { + log.error("ProcessDag is null"); + throw new IllegalArgumentException("Create WorkflowGraph failed, ProcessDag is null"); + } + // generate process dag + DAG dagGraph = DagHelper.buildDagGraph(processDag); + log.debug("Build dag success, dag: {}", dagGraph); + + return new WorkflowGraph(taskNodeList, dagGraph); + } + + /** + * generate start node code list from parsing command param; + * if "StartNodeIdList" exists in command param, return StartNodeIdList + * + * @return recovery node code list + */ + private List getRecoveryTaskNodeCodeList(String cmdParam) { + Map paramMap = JSONUtils.toMap(cmdParam); + + // todo: Can we use a better way to set the recover taskInstanceId list? rather then use the cmdParam + if (paramMap != null && paramMap.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { + List startTaskInstanceIds = Arrays.stream(paramMap.get(CMD_PARAM_RECOVERY_START_NODE_STRING) + .split(COMMA)) + .filter(StringUtils::isNotEmpty) + .map(Integer::valueOf) + .collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(startTaskInstanceIds)) { + return taskInstanceDao.queryByIds(startTaskInstanceIds).stream().map(TaskInstance::getTaskCode) + .collect(Collectors.toList()); + } + } + return Collections.emptyList(); + } + + private List parseStartNodeName(String cmdParam) { + List startNodeNameList = new ArrayList<>(); + Map paramMap = JSONUtils.toMap(cmdParam); + if (paramMap == null) { + return startNodeNameList; + } + if (paramMap.containsKey(CMD_PARAM_START_NODES)) { + startNodeNameList = Arrays.asList(paramMap.get(CMD_PARAM_START_NODES).split(Constants.COMMA)) + .stream() + .map(String::trim) + .map(Long::valueOf) + .collect(Collectors.toList()); + } + return startNodeNameList; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 735fb227b7..cad36e8b68 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -78,7 +78,8 @@ public class EventExecuteService extends BaseDaemonThread { private void workflowEventHandler() { for (WorkflowExecuteRunnable workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { try { - LogUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); + LogUtils.setWorkflowInstanceIdMDC( + workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance().getId()); workflowExecuteThreadPool.executeEvent(workflowExecuteThread); } finally { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java new file mode 100644 index 0000000000..9c012a2355 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteContext.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; + +// todo: Add method to manage the task instance +public interface IWorkflowExecuteContext { + + ProcessDefinition getWorkflowDefinition(); + + ProcessInstance getWorkflowInstance(); + + IWorkflowGraph getWorkflowGraph(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteRunnable.java new file mode 100644 index 0000000000..10b456e517 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/IWorkflowExecuteRunnable.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import java.util.concurrent.Callable; + +public interface IWorkflowExecuteRunnable extends Callable { + // todo: add control method to manage the workflow runnable e.g. pause/stop .... + + @Override + default WorkflowStartStatus call() { + return startWorkflow(); + } + + WorkflowStartStatus startWorkflow(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index cde57407a2..0f86cadfe8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -134,7 +134,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl try { WorkflowExecuteRunnable workflowExecuteRunnable = workflowExecuteRunnableFactory.createWorkflowExecuteRunnable(command); - ProcessInstance processInstance = workflowExecuteRunnable.getProcessInstance(); + ProcessInstance processInstance = workflowExecuteRunnable + .getWorkflowExecuteContext().getWorkflowInstance(); if (processInstanceExecCacheManager.contains(processInstance.getId())) { log.error( "The workflow instance is already been cached, this case shouldn't be happened"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 5c569b4744..6ca37a3fe8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -141,7 +141,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread { processInstanceTimeoutCheckList.remove(processInstanceId); continue; } - ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + ProcessInstance processInstance = + workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance(); if (processInstance == null) { log.warn("Check workflow timeout failed, the workflowInstance is null"); continue; @@ -284,7 +285,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread { Optional taskInstanceOptional = workflowExecuteThread.getRetryTaskInstanceByTaskCode(taskCode); - ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + ProcessInstance processInstance = + workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance(); if (processInstance.getState().isReadyStop()) { log.warn( diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java new file mode 100644 index 0000000000..fd6c4d44db --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; + +import lombok.Getter; + +public class WorkflowExecuteContext implements IWorkflowExecuteContext { + + @Getter + private final ProcessDefinition workflowDefinition; + + @Getter + private final ProcessInstance workflowInstance; + + // This is the task definition graph + // todo: we need to add a task instance graph, then move the task instance from WorkflowExecuteRunnable to + // WorkflowExecuteContext + @Getter + private final IWorkflowGraph workflowGraph; + + public WorkflowExecuteContext(ProcessDefinition workflowDefinition, + ProcessInstance workflowInstance, + IWorkflowGraph workflowGraph) { + checkNotNull(workflowDefinition, "workflowDefinition is null"); + checkNotNull(workflowInstance, "workflowInstance is null"); + checkNotNull(workflowGraph, "workflowGraph is null"); + + this.workflowDefinition = workflowDefinition; + this.workflowInstance = workflowInstance; + this.workflowGraph = workflowGraph; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java new file mode 100644 index 0000000000..c30f791e5e --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.enums.SlotCheckState; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; +import org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory; +import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.service.exceptions.CronParseException; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class WorkflowExecuteContextFactory { + + @Autowired + private ServerNodeManager serverNodeManager; + + @Autowired + private ProcessService processService; + + @Autowired + private WorkflowGraphFactory workflowGraphFactory; + + @Autowired + private MasterConfig masterConfig; + + public IWorkflowExecuteContext createWorkflowExecuteRunnableContext(Command command) throws Exception { + ProcessInstance workflowInstance = createWorkflowInstance(command); + ProcessDefinition workflowDefinition = processService.findProcessDefinition( + workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); + workflowInstance.setProcessDefinition(workflowDefinition); + + IWorkflowGraph workflowGraph = workflowGraphFactory.createWorkflowGraph(workflowInstance); + + return new WorkflowExecuteContext( + workflowDefinition, + workflowInstance, + workflowGraph); + } + + private ProcessInstance createWorkflowInstance(Command command) throws CronParseException { + long commandTransformStartTime = System.currentTimeMillis(); + // Note: this check is not safe, the slot may change after command transform. + // We use the database transaction in `handleCommand` so that we can guarantee the command will + // always be executed + // by only one master + SlotCheckState slotCheckState = slotCheck(command); + if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) { + log.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState); + throw new RuntimeException("Slot check failed the current state: " + slotCheckState); + } + ProcessInstance processInstance = processService.handleCommand(masterConfig.getMasterAddress(), command); + log.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId()); + ProcessInstanceMetrics + .recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime); + return processInstance; + } + + private SlotCheckState slotCheck(Command command) { + int slot = serverNodeManager.getSlot(); + int masterSize = serverNodeManager.getMasterSize(); + SlotCheckState state; + if (masterSize <= 0) { + state = SlotCheckState.CHANGE; + } else if (command.getId() % masterSize == slot) { + state = SlotCheckState.PASS; + } else { + state = SlotCheckState.INJECT; + } + return state; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 5caab6b709..9b1738636d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.StateEventType; -import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.graph.DAG; @@ -51,14 +50,11 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; -import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; @@ -83,6 +79,7 @@ import org.apache.dolphinscheduler.server.master.event.StateEventHandlerManager; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException; +import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; @@ -94,14 +91,12 @@ import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.model.TaskNode; -import org.apache.dolphinscheduler.service.process.ProcessDag; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.tuple.Pair; import java.util.ArrayList; @@ -117,7 +112,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -135,38 +129,21 @@ import com.google.common.collect.Sets; * Workflow execute task, used to execute a workflow instance. */ @Slf4j -public class WorkflowExecuteRunnable implements Callable { +public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { private final ProcessService processService; private final CommandService commandService; - private ProcessInstanceDao processInstanceDao; + private final ProcessInstanceDao processInstanceDao; - private TaskInstanceDao taskInstanceDao; - - private TaskDefinitionLogDao taskDefinitionLogDao; + private final TaskInstanceDao taskInstanceDao; private final ProcessAlertManager processAlertManager; private final MasterRpcClient masterRpcClient; - private final ProcessInstance processInstance; - - private ProcessDefinition processDefinition; - - private DAG dag; - - /** - * full task node map, key is task node id, value is task node - * # TODO: This field can be removed later if the dag is complete - */ - private Map taskNodesMap; - - /** - * unique key of workflow - */ - private String key; + private final IWorkflowExecuteContext workflowExecuteContext; private WorkflowRunnableStatus workflowRunnableStatus = WorkflowRunnableStatus.CREATED; @@ -214,14 +191,10 @@ public class WorkflowExecuteRunnable implements Callable { private final Set dependFailedTaskSet = Sets.newConcurrentHashSet(); /** - * forbidden task map, code as key - */ - private final Map forbiddenTaskMap = new ConcurrentHashMap<>(); - - /** + * todo: remove this field * skip task map, code as key */ - private final Map skipTaskNodeMap = new ConcurrentHashMap<>(); + private final Map skipTaskNodeMap = new ConcurrentHashMap<>(); /** * complement date list @@ -253,7 +226,7 @@ public class WorkflowExecuteRunnable implements Callable { private final MasterConfig masterConfig; public WorkflowExecuteRunnable( - @NonNull ProcessInstance processInstance, + @NonNull IWorkflowExecuteContext workflowExecuteContext, @NonNull CommandService commandService, @NonNull ProcessService processService, @NonNull ProcessInstanceDao processInstanceDao, @@ -263,23 +236,18 @@ public class WorkflowExecuteRunnable implements Callable { @NonNull StateWheelExecuteThread stateWheelExecuteThread, @NonNull CuringParamsService curingParamsService, @NonNull TaskInstanceDao taskInstanceDao, - @NonNull TaskDefinitionLogDao taskDefinitionLogDao, @NonNull DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory) { this.processService = processService; this.commandService = commandService; this.processInstanceDao = processInstanceDao; - this.processInstance = processInstance; + this.workflowExecuteContext = workflowExecuteContext; this.masterRpcClient = masterRpcClient; this.masterConfig = masterConfig; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; this.curingParamsService = curingParamsService; this.taskInstanceDao = taskInstanceDao; - this.taskDefinitionLogDao = taskDefinitionLogDao; this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory; - this.processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); - this.processInstance.setProcessDefinition(processDefinition); TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); } @@ -352,20 +320,12 @@ public class WorkflowExecuteRunnable implements Callable { } } - public String getKey() { - if (StringUtils.isNotEmpty(key) || this.processDefinition == null) { - return key; - } - - key = String.format("%d_%d_%d", - this.processDefinition.getCode(), - this.processDefinition.getVersion(), - this.processInstance.getId()); - return key; + public IWorkflowExecuteContext getWorkflowExecuteContext() { + return workflowExecuteContext; } public boolean addStateEvent(StateEvent stateEvent) { - if (processInstance.getId() != stateEvent.getProcessInstanceId()) { + if (workflowExecuteContext.getWorkflowInstance().getId() != stateEvent.getProcessInstanceId()) { log.info("state event would be abounded :{}", stateEvent); return false; } @@ -377,10 +337,6 @@ public class WorkflowExecuteRunnable implements Callable { return this.stateEvents.size(); } - public ProcessInstance getProcessInstance() { - return this.processInstance; - } - public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { TaskGroupQueue taskGroupQueue = processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { @@ -420,49 +376,51 @@ public class WorkflowExecuteRunnable implements Callable { } public void processTimeout() { - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - this.processAlertManager.sendProcessTimeoutAlert(this.processInstance, projectUser); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); + this.processAlertManager.sendProcessTimeoutAlert(workflowInstance, projectUser); } public void taskTimeout(TaskInstance taskInstance) { - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendTaskTimeoutAlert(processInstance, taskInstance, projectUser); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); + processAlertManager.sendTaskTimeoutAlert(workflowInstance, taskInstance, projectUser); } public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException { log.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState()); try { - + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); // Do we need to remove? taskExecuteRunnableMap.remove(taskInstance.getTaskCode()); - stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance); - stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); + stateWheelExecuteThread.removeTask4TimeoutCheck(workflowInstance, taskInstance); + stateWheelExecuteThread.removeTask4RetryCheck(workflowInstance, taskInstance); if (taskInstance.getState().isSuccess()) { completeTaskSet.add(taskInstance.getTaskCode()); mergeTaskInstanceVarPool(taskInstance); - processInstanceDao.upsertProcessInstance(processInstance); + processInstanceDao.upsertProcessInstance(workflowInstance); // save the cacheKey only if the task is defined as cache task and the task is success if (taskInstance.getIsCache().equals(Flag.YES)) { saveCacheTaskInstance(taskInstance); } - if (!processInstance.isBlocked()) { - submitPostNode(Long.toString(taskInstance.getTaskCode())); + if (!workflowInstance.isBlocked()) { + submitPostNode(taskInstance.getTaskCode()); } - } else if (taskInstance.taskCanRetry() && !processInstance.getState().isReadyStop()) { + } else if (taskInstance.taskCanRetry() && !workflowInstance.getState().isReadyStop()) { // retry task log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); retryTaskInstance(taskInstance); } else if (taskInstance.getState().isFailure()) { completeTaskSet.add(taskInstance.getTaskCode()); // There are child nodes and the failure policy is: CONTINUE - if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( - Long.toString(taskInstance.getTaskCode()), - dag)) { - submitPostNode(Long.toString(taskInstance.getTaskCode())); + if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( + taskInstance.getTaskCode(), + workflowExecuteContext.getWorkflowGraph().getDag())) { + submitPostNode(taskInstance.getTaskCode()); } else { errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); - if (processInstance.getFailureStrategy() == FailureStrategy.END) { + if (workflowInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } } @@ -489,6 +447,7 @@ public class WorkflowExecuteRunnable implements Callable { * */ public void releaseTaskGroup(TaskInstance taskInstance) throws RemotingException, InterruptedException { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); // todo: use Integer if (taskInstance.getTaskGroupId() <= 0) { log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup", @@ -503,7 +462,7 @@ public class WorkflowExecuteRunnable implements Callable { } if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) { TaskStateEvent nextEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) + .processInstanceId(workflowInstance.getId()) .taskInstanceId(nextTaskInstance.getId()) .type(StateEventType.WAKE_UP_TASK_GROUP) .build(); @@ -523,6 +482,7 @@ public class WorkflowExecuteRunnable implements Callable { * */ private void retryTaskInstance(TaskInstance taskInstance) throws StateEventHandleException { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (!taskInstance.taskCanRetry()) { return; } @@ -537,11 +497,11 @@ public class WorkflowExecuteRunnable implements Callable { if (!taskInstance.retryTaskIntervalOverTime()) { log.info( "Failure task will be submitted, process id: {}, task instance code: {}, state: {}, retry times: {} / {}, interval: {}", - processInstance.getId(), newTaskInstance.getTaskCode(), + workflowInstance.getId(), newTaskInstance.getTaskCode(), newTaskInstance.getState(), newTaskInstance.getRetryTimes(), newTaskInstance.getMaxRetryTimes(), newTaskInstance.getRetryInterval()); - stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, newTaskInstance); - stateWheelExecuteThread.addTask4RetryCheck(processInstance, newTaskInstance); + stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, newTaskInstance); + stateWheelExecuteThread.addTask4RetryCheck(workflowInstance, newTaskInstance); } else { addTaskToStandByList(newTaskInstance); submitStandByTask(); @@ -549,18 +509,23 @@ public class WorkflowExecuteRunnable implements Callable { } } - /** - * update process instance - */ + // todo: remove this method, it's not a good practice to expose method to reload the workflow instance from db. + // all the update method should use RPC public void refreshProcessInstance(int processInstanceId) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + ProcessDefinition workflowDefinition = workflowExecuteContext.getWorkflowDefinition(); + log.info("process instance update: {}", processInstanceId); ProcessInstance newProcessInstance = processService.findProcessInstanceById(processInstanceId); // just update the processInstance field(this is soft copy) - BeanUtils.copyProperties(newProcessInstance, processInstance); + BeanUtils.copyProperties(newProcessInstance, workflowInstance); + + ProcessDefinition newWorkflowDefinition = processService.findProcessDefinition( + workflowInstance.getProcessDefinitionCode(), workflowInstance.getProcessDefinitionVersion()); + workflowInstance.setProcessDefinition(workflowDefinition); - processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion()); - processInstance.setProcessDefinition(processDefinition); + // just update the processInstance field(this is soft copy) + BeanUtils.copyProperties(newWorkflowDefinition, workflowDefinition); } /** @@ -573,7 +538,7 @@ public class WorkflowExecuteRunnable implements Callable { log.error("can not find task instance, id:{}", taskInstanceId); return; } - processService.packageTaskInstance(taskInstance, processInstance); + processService.packageTaskInstance(taskInstance, workflowExecuteContext.getWorkflowInstance()); taskInstanceMap.put(taskInstance.getId(), taskInstance); taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance); @@ -587,7 +552,7 @@ public class WorkflowExecuteRunnable implements Callable { * check process instance by state event */ public void checkProcessInstance(StateEvent stateEvent) throws StateEventHandleError { - if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) { + if (workflowExecuteContext.getWorkflowInstance().getId() != stateEvent.getProcessInstanceId()) { throw new StateEventHandleError("The event doesn't contains process instance id"); } } @@ -642,44 +607,47 @@ public class WorkflowExecuteRunnable implements Callable { } public void processBlock() { - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendProcessBlockingAlert(processInstance, projectUser); - log.info("processInstance {} block alert send successful!", processInstance.getId()); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); + processAlertManager.sendProcessBlockingAlert(workflowInstance, projectUser); + log.info("processInstance {} block alert send successful!", workflowInstance.getId()); } public boolean processComplementData() { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (!needComplementProcess()) { return false; } // when the serial complement is executed, the next complement instance is created, // and this method does not need to be executed when the parallel complement is used. - if (processInstance.getState().isReadyStop() || !processInstance.getState().isFinished()) { + if (workflowInstance.getState().isReadyStop() || !workflowInstance.getState().isFinished()) { return false; } - Date scheduleDate = processInstance.getScheduleTime(); + Date scheduleDate = workflowInstance.getScheduleTime(); if (scheduleDate == null) { if (CollectionUtils.isEmpty(complementListDate)) { - log.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId()); + log.info("complementListDate is empty, process complement end. process id:{}", + workflowInstance.getId()); return true; } scheduleDate = complementListDate.get(0); - } else if (processInstance.getState().isFinished()) { + } else if (workflowInstance.getState().isFinished()) { endProcess(); if (complementListDate.isEmpty()) { - log.info("process complement end. process id:{}", processInstance.getId()); + log.info("process complement end. process id:{}", workflowInstance.getId()); return true; } int index = complementListDate.indexOf(scheduleDate); - if (index >= complementListDate.size() - 1 || !processInstance.getState().isSuccess()) { - log.info("process complement end. process id:{}", processInstance.getId()); + if (index >= complementListDate.size() - 1 || !workflowInstance.getState().isSuccess()) { + log.info("process complement end. process id:{}", workflowInstance.getId()); // complement data ends || no success return true; } log.info("process complement continue. process id:{}, schedule time:{} complementListDate:{}", - processInstance.getId(), processInstance.getScheduleTime(), complementListDate); + workflowInstance.getId(), workflowInstance.getScheduleTime(), complementListDate); scheduleDate = complementListDate.get(index + 1); } // the next process complement @@ -691,11 +659,13 @@ public class WorkflowExecuteRunnable implements Callable { } private int createComplementDataCommand(Date scheduleDate) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + Command command = new Command(); command.setScheduleTime(scheduleDate); command.setCommandType(CommandType.COMPLEMENT_DATA); - command.setProcessDefinitionCode(processInstance.getProcessDefinitionCode()); - Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); + command.setProcessDefinitionCode(workflowInstance.getProcessDefinitionCode()); + Map cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam()); if (cmdParam.containsKey(CMD_PARAM_RECOVERY_START_NODE_STRING)) { cmdParam.remove(CMD_PARAM_RECOVERY_START_NODE_STRING); } @@ -711,48 +681,45 @@ public class WorkflowExecuteRunnable implements Callable { DateUtils.format(scheduleDate, YYYY_MM_DD_HH_MM_SS, null)); } command.setCommandParam(JSONUtils.toJsonString(cmdParam)); - command.setTaskDependType(processInstance.getTaskDependType()); - command.setFailureStrategy(processInstance.getFailureStrategy()); - command.setWarningType(processInstance.getWarningType()); - command.setWarningGroupId(processInstance.getWarningGroupId()); + command.setTaskDependType(workflowInstance.getTaskDependType()); + command.setFailureStrategy(workflowInstance.getFailureStrategy()); + command.setWarningType(workflowInstance.getWarningType()); + command.setWarningGroupId(workflowInstance.getWarningGroupId()); command.setStartTime(new Date()); - command.setExecutorId(processInstance.getExecutorId()); + command.setExecutorId(workflowInstance.getExecutorId()); command.setUpdateTime(new Date()); - command.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - command.setWorkerGroup(processInstance.getWorkerGroup()); - command.setEnvironmentCode(processInstance.getEnvironmentCode()); - command.setDryRun(processInstance.getDryRun()); + command.setProcessInstancePriority(workflowInstance.getProcessInstancePriority()); + command.setWorkerGroup(workflowInstance.getWorkerGroup()); + command.setEnvironmentCode(workflowInstance.getEnvironmentCode()); + command.setDryRun(workflowInstance.getDryRun()); command.setProcessInstanceId(0); - command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion()); - command.setTestFlag(processInstance.getTestFlag()); + command.setProcessDefinitionVersion(workflowInstance.getProcessDefinitionVersion()); + command.setTestFlag(workflowInstance.getTestFlag()); int create = commandService.createCommand(command); - processService.saveCommandTrigger(command.getId(), processInstance.getId()); + processService.saveCommandTrigger(command.getId(), workflowInstance.getId()); return create; } private boolean needComplementProcess() { - return processInstance.isComplementData() && Flag.NO == processInstance.getIsSubProcess(); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + return workflowInstance.isComplementData() && Flag.NO == workflowInstance.getIsSubProcess(); } /** * ProcessInstance start entrypoint. */ @Override - public WorkflowSubmitStatus call() { + public WorkflowStartStatus startWorkflow() { try { - LogUtils.setWorkflowInstanceIdMDC(processInstance.getId()); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstance.getId()); if (isStart()) { // This case should not been happened log.warn("The workflow has already been started, current state: {}", workflowRunnableStatus); - return WorkflowSubmitStatus.DUPLICATED_SUBMITTED; + return WorkflowStartStatus.DUPLICATED_SUBMITTED; } if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) { - buildFlowDag(); - workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG; - log.info("workflowStatue changed to :{}", workflowRunnableStatus); - } - if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) { initTaskQueue(); workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE; log.info("workflowStatue changed to :{}", workflowRunnableStatus); @@ -762,10 +729,10 @@ public class WorkflowExecuteRunnable implements Callable { workflowRunnableStatus = WorkflowRunnableStatus.STARTED; log.info("workflowStatue changed to :{}", workflowRunnableStatus); } - return WorkflowSubmitStatus.SUCCESS; + return WorkflowStartStatus.SUCCESS; } catch (Exception e) { log.error("Start workflow error", e); - return WorkflowSubmitStatus.FAILED; + return WorkflowStartStatus.FAILED; } finally { LogUtils.removeWorkflowInstanceIdMDC(); } @@ -776,33 +743,37 @@ public class WorkflowExecuteRunnable implements Callable { */ public void endProcess() { this.stateEvents.clear(); - if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType() + ProcessDefinition workflowDefinition = workflowExecuteContext.getWorkflowDefinition(); + if (workflowDefinition.getExecutionType().typeIsSerialWait() || workflowDefinition.getExecutionType() .typeIsSerialPriority()) { - checkSerialProcess(processDefinition); + checkSerialProcess(workflowDefinition); } - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); - if (processInstance.getState().isSuccess()) { - processAlertManager.closeAlert(processInstance); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); + processAlertManager.sendAlertProcessInstance(workflowInstance, getValidTaskList(), projectUser); + if (workflowInstance.getState().isSuccess()) { + processAlertManager.closeAlert(workflowInstance); } if (checkTaskQueue()) { // release task group - processService.releaseAllTaskGroup(processInstance.getId()); + processService.releaseAllTaskGroup(workflowInstance.getId()); } } public void checkSerialProcess(ProcessDefinition processDefinition) { - int nextInstanceId = processInstance.getNextProcessInstanceId(); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + + int nextInstanceId = workflowInstance.getNextProcessInstanceId(); if (nextInstanceId == 0) { ProcessInstance nextProcessInstance = - this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), - WorkflowExecutionStatus.SERIAL_WAIT.getCode(), processInstance.getId()); + this.processService.loadNextProcess4Serial(workflowInstance.getProcessDefinition().getCode(), + WorkflowExecutionStatus.SERIAL_WAIT.getCode(), workflowInstance.getId()); if (nextProcessInstance == null) { return; } ProcessInstance nextReadyStopProcessInstance = - this.processService.loadNextProcess4Serial(processInstance.getProcessDefinition().getCode(), - WorkflowExecutionStatus.READY_STOP.getCode(), processInstance.getId()); + this.processService.loadNextProcess4Serial(workflowInstance.getProcessDefinition().getCode(), + WorkflowExecutionStatus.READY_STOP.getCode(), workflowInstance.getId()); if (processDefinition.getExecutionType().typeIsSerialPriority() && nextReadyStopProcessInstance != null) { return; } @@ -823,43 +794,6 @@ public class WorkflowExecuteRunnable implements Callable { commandService.createCommand(command); } - // todo: move the initialize code to constructor - private void buildFlowDag() throws Exception { - - List recoverNodeList = getRecoverTaskInstanceList(processInstance.getCommandParam()); - - List processTaskRelations = - processService.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); - List taskDefinitionLogs = - taskDefinitionLogDao.queryTaskDefineLogList(processTaskRelations); - List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); - forbiddenTaskMap.clear(); - - taskNodeList.forEach(taskNode -> { - if (taskNode.isForbidden()) { - forbiddenTaskMap.put(taskNode.getCode(), taskNode); - } - }); - - taskNodesMap = taskNodeList.stream().collect(Collectors.toMap(TaskNode::getCode, taskNode -> taskNode)); - - // generate process to get DAG info - List recoveryNodeCodeList = getRecoveryNodeCodeList(recoverNodeList); - List startNodeNameList = parseStartNodeName(processInstance.getCommandParam()); - ProcessDag processDag = generateFlowDag(taskNodeList, startNodeNameList, recoveryNodeCodeList, - processInstance.getTaskDependType()); - if (processDag == null) { - log.error("ProcessDag is null"); - return; - } - // generate process dag - dag = DagHelper.buildDagGraph(processDag); - log.info("Build dag success, dag: {}", dag); - } - - /** - * init task queue - */ private void initTaskQueue() throws StateEventHandleException, CronParseException { taskFailedSubmit = false; @@ -869,13 +803,16 @@ public class WorkflowExecuteRunnable implements Callable { completeTaskSet.clear(); errorTaskMap.clear(); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + ProcessDefinition workflowDefinition = workflowExecuteContext.getWorkflowDefinition(); + if (!isNewProcessInstance()) { log.info("The workflowInstance is not a newly running instance, runtimes: {}, recover flag: {}", - processInstance.getRunTimes(), - processInstance.getRecovery()); + workflowInstance.getRunTimes(), + workflowInstance.getRecovery()); List validTaskInstanceList = - taskInstanceDao.queryValidTaskListByWorkflowInstanceId(processInstance.getId(), - processInstance.getTestFlag()); + taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId(), + workflowInstance.getTestFlag()); for (TaskInstance task : validTaskInstanceList) { try ( final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = @@ -906,8 +843,8 @@ public class WorkflowExecuteRunnable implements Callable { completeTaskSet.add(task.getTaskCode()); continue; } - if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), - dag)) { + if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getTaskCode(), + workflowExecuteContext.getWorkflowGraph().getDag())) { continue; } if (task.taskCanRetry()) { @@ -935,11 +872,11 @@ public class WorkflowExecuteRunnable implements Callable { log.info("The current workflowInstance is a newly running workflowInstance"); } - if (processInstance.isComplementData() && complementListDate.isEmpty()) { - Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); + if (workflowInstance.isComplementData() && complementListDate.isEmpty()) { + Map cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam()); if (cmdParam != null) { // reset global params while there are start parameters - setGlobalParamIfCommanded(processDefinition, cmdParam); + setGlobalParamIfCommanded(workflowDefinition, cmdParam); Date start = null; Date end = null; @@ -951,25 +888,25 @@ public class WorkflowExecuteRunnable implements Callable { if (complementListDate.isEmpty() && needComplementProcess()) { if (start != null && end != null) { List schedules = processService.queryReleaseSchedulerListByProcessDefinitionCode( - processInstance.getProcessDefinitionCode()); + workflowInstance.getProcessDefinitionCode()); complementListDate = CronUtils.getSelfFireDateList(start, end, schedules); } if (cmdParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) { complementListDate = CronUtils.getSelfScheduleDateList(cmdParam); } log.info(" process definition code:{} complement data: {}", - processInstance.getProcessDefinitionCode(), complementListDate); + workflowInstance.getProcessDefinitionCode(), complementListDate); - if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) { - processInstance.setScheduleTime(complementListDate.get(0)); - String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(), - processDefinition.getGlobalParamMap(), - processDefinition.getGlobalParamList(), + if (!complementListDate.isEmpty() && Flag.NO == workflowInstance.getIsSubProcess()) { + workflowInstance.setScheduleTime(complementListDate.get(0)); + String globalParams = curingParamsService.curingGlobalParams(workflowInstance.getId(), + workflowDefinition.getGlobalParamMap(), + workflowDefinition.getGlobalParamList(), CommandType.COMPLEMENT_DATA, - processInstance.getScheduleTime(), + workflowInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE)); - processInstance.setGlobalParams(globalParams); - processInstanceDao.updateById(processInstance); + workflowInstance.setGlobalParams(globalParams); + processInstanceDao.updateById(workflowInstance); } } } @@ -982,12 +919,13 @@ public class WorkflowExecuteRunnable implements Callable { private boolean executeTask(TaskInstance taskInstance) { try { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); // package task instance before submit, inject the process instance to task instance // todo: we need to use task execute context rather than packege a lot of pojo into task instance // 1. submit the task instance to db - processService.packageTaskInstance(taskInstance, processInstance); + processService.packageTaskInstance(taskInstance, workflowInstance); // todo: remove this method - if (!processService.submitTask(processInstance, taskInstance)) { + if (!processService.submitTask(workflowInstance, taskInstance)) { log.error("Submit standby task: {} failed", taskInstance.getName()); return true; } @@ -1036,7 +974,7 @@ public class WorkflowExecuteRunnable implements Callable { // 4. submit to dispatch queue taskExecuteRunnable.dispatch(); - stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); + stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, taskInstance); return true; } } catch (Exception e) { @@ -1085,16 +1023,13 @@ public class WorkflowExecuteRunnable implements Callable { * @return taskInstance */ public TaskInstance cloneRetryTaskInstance(TaskInstance taskInstance) { - TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); - if (taskNode == null) { - log.error("Clone retry taskInstance error because taskNode is null, taskCode:{}", - taskInstance.getTaskCode()); - return null; - } - TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + TaskNode taskNode = + workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()); + TaskInstance newTaskInstance = newTaskInstance(workflowInstance, taskNode); newTaskInstance.setTaskDefine(taskInstance.getTaskDefine()); newTaskInstance.setProcessDefine(taskInstance.getProcessDefine()); - newTaskInstance.setProcessInstance(processInstance); + newTaskInstance.setProcessInstance(workflowInstance); newTaskInstance.setRetryTimes(taskInstance.getRetryTimes() + 1); // todo relative function: TaskInstance.retryTaskIntervalOverTime newTaskInstance.setState(taskInstance.getState()); @@ -1114,16 +1049,13 @@ public class WorkflowExecuteRunnable implements Callable { * @return taskInstance */ public TaskInstance cloneTolerantTaskInstance(TaskInstance taskInstance) { - TaskNode taskNode = dag.getNode(Long.toString(taskInstance.getTaskCode())); - if (taskNode == null) { - log.error("Clone tolerant taskInstance error because taskNode is null, taskCode:{}", - taskInstance.getTaskCode()); - return null; - } - TaskInstance newTaskInstance = newTaskInstance(processInstance, taskNode); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + TaskNode taskNode = + workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()); + TaskInstance newTaskInstance = newTaskInstance(workflowInstance, taskNode); newTaskInstance.setTaskDefine(taskInstance.getTaskDefine()); newTaskInstance.setProcessDefine(taskInstance.getProcessDefine()); - newTaskInstance.setProcessInstance(processInstance); + newTaskInstance.setProcessInstance(workflowInstance); newTaskInstance.setRetryTimes(taskInstance.getRetryTimes()); newTaskInstance.setState(taskInstance.getState()); newTaskInstance.setAppLink(taskInstance.getAppLink()); @@ -1226,12 +1158,13 @@ public class WorkflowExecuteRunnable implements Callable { return taskInstance; } - public void getPreVarPool(TaskInstance taskInstance, Set preTask) { + public void getPreVarPool(TaskInstance taskInstance, Set preTask) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); Map allProperty = new HashMap<>(); Map allTaskInstance = new HashMap<>(); if (CollectionUtils.isNotEmpty(preTask)) { - for (String preTaskCode : preTask) { - Optional existTaskInstanceOptional = getTaskInstance(Long.parseLong(preTaskCode)); + for (Long preTaskCode : preTask) { + Optional existTaskInstanceOptional = getTaskInstance(preTaskCode); if (!existTaskInstanceOptional.isPresent()) { continue; } @@ -1256,8 +1189,8 @@ public class WorkflowExecuteRunnable implements Callable { taskInstance.setVarPool(JSONUtils.toJsonString(allProperty.values())); } } else { - if (StringUtils.isNotEmpty(processInstance.getVarPool())) { - taskInstance.setVarPool(processInstance.getVarPool()); + if (StringUtils.isNotEmpty(workflowInstance.getVarPool())) { + taskInstance.setVarPool(workflowInstance.getVarPool()); } } } @@ -1266,7 +1199,8 @@ public class WorkflowExecuteRunnable implements Callable { return taskInstanceMap.values(); } - private void setVarPoolValue(Map allProperty, Map allTaskInstance, + private void setVarPoolValue(Map allProperty, + Map allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) { // for this taskInstance all the param in this part is IN. thisProperty.setDirect(Direct.IN); @@ -1301,14 +1235,14 @@ public class WorkflowExecuteRunnable implements Callable { /** * get complete task instance map, taskCode as key */ - private Map getCompleteTaskInstanceMap() { - Map completeTaskInstanceMap = new HashMap<>(); + private Map getCompleteTaskInstanceMap() { + Map completeTaskInstanceMap = new HashMap<>(); completeTaskSet.forEach(taskCode -> { Optional existTaskInstanceOptional = getTaskInstance(taskCode); if (existTaskInstanceOptional.isPresent()) { TaskInstance taskInstance = existTaskInstanceOptional.get(); - completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance); + completeTaskInstanceMap.put(taskCode, taskInstance); } else { // This case will happen when we submit to db failed, then the taskInstanceId is 0 log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}", @@ -1330,11 +1264,14 @@ public class WorkflowExecuteRunnable implements Callable { return validTaskInstanceList; } - private void submitPostNode(String parentNodeCode) throws StateEventHandleException { - Set submitTaskNodeList = + private void submitPostNode(Long parentNodeCode) throws StateEventHandleException { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + DAG dag = workflowExecuteContext.getWorkflowGraph().getDag(); + + Set submitTaskNodeList = DagHelper.parsePostNodes(parentNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); List taskInstances = new ArrayList<>(); - for (String taskNode : submitTaskNodeList) { + for (Long taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); Optional existTaskInstanceOptional = getTaskInstance(taskNodeObject.getCode()); if (existTaskInstanceOptional.isPresent()) { @@ -1358,25 +1295,25 @@ public class WorkflowExecuteRunnable implements Callable { } taskInstances.add(existTaskInstance); } else { - taskInstances.add(createTaskInstance(processInstance, taskNodeObject)); + taskInstances.add(createTaskInstance(workflowInstance, taskNodeObject)); } } // the end node of the branch of the dag - if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) { - Optional existTaskInstanceOptional = getTaskInstance(NumberUtils.toLong(parentNodeCode)); + if (parentNodeCode != null && dag.getEndNode().contains(parentNodeCode)) { + Optional existTaskInstanceOptional = getTaskInstance(parentNodeCode); if (existTaskInstanceOptional.isPresent()) { TaskInstance endTaskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); String taskInstanceVarPool = endTaskInstance.getVarPool(); if (StringUtils.isNotEmpty(taskInstanceVarPool)) { Set taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class)); - String processInstanceVarPool = processInstance.getVarPool(); + String processInstanceVarPool = workflowInstance.getVarPool(); if (StringUtils.isNotEmpty(processInstanceVarPool)) { Set properties = new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class)); properties.addAll(taskProperties); - processInstance.setVarPool(JSONUtils.toJsonString(properties)); + workflowInstance.setVarPool(JSONUtils.toJsonString(properties)); } else { - processInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); + workflowInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); } } } @@ -1406,6 +1343,7 @@ public class WorkflowExecuteRunnable implements Callable { } private boolean tryToTakeOverTaskInstance(TaskInstance taskInstance) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (TaskUtils.isMasterTask(taskInstance.getTaskType())) { return false; } @@ -1435,8 +1373,8 @@ public class WorkflowExecuteRunnable implements Callable { taskInstanceMap.put(taskInstance.getId(), taskInstance); taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance); - stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); - stateWheelExecuteThread.addTask4RetryCheck(processInstance, taskInstance); + stateWheelExecuteThread.addTask4TimeoutCheck(workflowInstance, taskInstance); + stateWheelExecuteThread.addTask4RetryCheck(workflowInstance, taskInstance); return true; } catch (RemotingException | InterruptedException | TaskExecuteRunnableCreateException e) { log.error( @@ -1451,25 +1389,25 @@ public class WorkflowExecuteRunnable implements Callable { * * @return DependResult */ - private DependResult isTaskDepsComplete(String taskCode) { + private DependResult isTaskDepsComplete(Long taskCode) { + DAG dag = workflowExecuteContext.getWorkflowGraph().getDag(); - Collection startNodes = dag.getBeginNode(); + Collection startNodes = dag.getBeginNode(); // if vertex,returns true directly if (startNodes.contains(taskCode)) { return DependResult.SUCCESS; } TaskNode taskNode = dag.getNode(taskCode); - List indirectDepCodeList = new ArrayList<>(); + List indirectDepCodeList = new ArrayList<>(); setIndirectDepList(taskCode, indirectDepCodeList); - for (String depsNode : indirectDepCodeList) { + for (Long depsNode : indirectDepCodeList) { if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) { // dependencies must be fully completed - long despNodeTaskCode = Long.parseLong(depsNode); - if (!completeTaskSet.contains(despNodeTaskCode)) { + if (!completeTaskSet.contains(depsNode)) { return DependResult.WAITING; } - Optional existTaskInstanceOptional = getTaskInstance(despNodeTaskCode); + Optional existTaskInstanceOptional = getTaskInstance(depsNode); if (!existTaskInstanceOptional.isPresent()) { return DependResult.NON_EXEC; } @@ -1506,14 +1444,17 @@ public class WorkflowExecuteRunnable implements Callable { * @param taskCode taskCode * @param indirectDepCodeList All indirectly dependent nodes */ - private void setIndirectDepList(String taskCode, List indirectDepCodeList) { + private void setIndirectDepList(Long taskCode, List indirectDepCodeList) { + IWorkflowGraph workflowGraph = workflowExecuteContext.getWorkflowGraph(); + DAG dag = workflowGraph.getDag(); TaskNode taskNode = dag.getNode(taskCode); // If workflow start with startNode or recoveryNode, taskNode may be null if (taskNode == null) { return; } - for (String depsNode : taskNode.getDepList()) { - if (forbiddenTaskMap.containsKey(Long.parseLong(depsNode))) { + + for (Long depsNode : taskNode.getDepList()) { + if (workflowGraph.isForbiddenTask(depsNode)) { setIndirectDepList(depsNode, indirectDepCodeList); } else { indirectDepCodeList.add(depsNode); @@ -1524,11 +1465,12 @@ public class WorkflowExecuteRunnable implements Callable { /** * depend node is completed, but here need check the condition task branch is the next node */ - private boolean dependTaskSuccess(String dependNodeCode, String nextNodeCode) { + private boolean dependTaskSuccess(Long dependNodeCode, Long nextNodeCode) { + DAG dag = workflowExecuteContext.getWorkflowGraph().getDag(); TaskNode dependentNode = dag.getNode(dependNodeCode); if (dependentNode.isConditionsTask()) { // condition task need check the branch to run - List nextTaskList = + List nextTaskList = DagHelper.parseConditionTask(dependNodeCode, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); if (!nextTaskList.contains(nextNodeCode)) { log.info( @@ -1545,8 +1487,7 @@ public class WorkflowExecuteRunnable implements Callable { return switchParameters.getDependTaskList().get(switchParameters.getResultConditionLocation()).getNextNode() .contains(nextNodeCode); } - long taskCode = Long.parseLong(dependNodeCode); - Optional existTaskInstanceOptional = getTaskInstance(taskCode); + Optional existTaskInstanceOptional = getTaskInstance(dependNodeCode); if (!existTaskInstanceOptional.isPresent()) { return false; } @@ -1616,12 +1557,13 @@ public class WorkflowExecuteRunnable implements Callable { * @return Boolean whether process instance failed */ private boolean processFailed() { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (hasFailedTask()) { log.info("The current process has failed task, the current process failed"); - if (processInstance.getFailureStrategy() == FailureStrategy.END) { + if (workflowInstance.getFailureStrategy() == FailureStrategy.END) { return true; } - if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { + if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { return readyToSubmitTaskQueue.size() == 0 && taskExecuteRunnableMap.size() == 0 && waitToRetryTaskInstanceMap.size() == 0; } @@ -1638,12 +1580,13 @@ public class WorkflowExecuteRunnable implements Callable { * @return ExecutionStatus */ private WorkflowExecutionStatus processReadyPause() { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); if (hasRetryTaskInStandBy()) { return WorkflowExecutionStatus.FAILURE; } List pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE); - if (CollectionUtils.isNotEmpty(pauseList) || processInstance.isBlocked() || !isComplementEnd() + if (CollectionUtils.isNotEmpty(pauseList) || workflowInstance.isBlocked() || !isComplementEnd() || readyToSubmitTaskQueue.size() > 0) { return WorkflowExecutionStatus.PAUSE; } else { @@ -1750,13 +1693,14 @@ public class WorkflowExecuteRunnable implements Callable { * @return Boolean whether is complement end */ private boolean isComplementEnd() { - if (!processInstance.isComplementData()) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + if (!workflowInstance.isComplementData()) { return true; } - Map cmdParam = JSONUtils.toMap(processInstance.getCommandParam()); + Map cmdParam = JSONUtils.toMap(workflowInstance.getCommandParam()); Date endTime = DateUtils.stringToDate(cmdParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE)); - return processInstance.getScheduleTime().equals(endTime); + return workflowInstance.getScheduleTime().equals(endTime); } /** @@ -1764,23 +1708,24 @@ public class WorkflowExecuteRunnable implements Callable { * after each batch of tasks is executed, the status of the process instance is updated */ private void updateProcessInstanceState() throws StateEventHandleException { - WorkflowExecutionStatus state = getProcessInstanceState(processInstance); - if (processInstance.getState() != state) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + WorkflowExecutionStatus state = getProcessInstanceState(workflowInstance); + if (workflowInstance.getState() != state) { log.info("Update workflowInstance states, origin state: {}, target state: {}", - processInstance.getState(), + workflowInstance.getState(), state); updateWorkflowInstanceStatesToDB(state); WorkflowStateEvent stateEvent = WorkflowStateEvent.builder() - .processInstanceId(processInstance.getId()) - .status(processInstance.getState()) + .processInstanceId(workflowInstance.getId()) + .status(workflowInstance.getState()) .type(StateEventType.PROCESS_STATE_CHANGE) .build(); // replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks this.stateEvents.add(stateEvent); } else { log.info("There is no need to update the workflow instance state, origin state: {}, target state: {}", - processInstance.getState(), + workflowInstance.getState(), state); } } @@ -1794,22 +1739,23 @@ public class WorkflowExecuteRunnable implements Callable { } private void updateWorkflowInstanceStatesToDB(WorkflowExecutionStatus newStates) throws StateEventHandleException { - WorkflowExecutionStatus originStates = processInstance.getState(); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + WorkflowExecutionStatus originStates = workflowInstance.getState(); if (originStates != newStates) { log.info("Begin to update workflow instance state , state will change from {} to {}", originStates, newStates); - processInstance.setStateWithDesc(newStates, "update by workflow executor"); + workflowInstance.setStateWithDesc(newStates, "update by workflow executor"); if (newStates.isFinished()) { - processInstance.setEndTime(new Date()); + workflowInstance.setEndTime(new Date()); } try { - processInstanceDao.updateById(processInstance); + processInstanceDao.updateById(workflowInstance); } catch (Exception ex) { // recover the status - processInstance.setStateWithDesc(originStates, "recover state by DB error"); - processInstance.setEndTime(null); + workflowInstance.setStateWithDesc(originStates, "recover state by DB error"); + workflowInstance.setEndTime(null); throw new StateEventHandleException("Update process instance status to DB error", ex); } } @@ -1822,7 +1768,7 @@ public class WorkflowExecuteRunnable implements Callable { * @return DependResult */ private DependResult getDependResultForTask(TaskInstance taskInstance) { - return isTaskDepsComplete(Long.toString(taskInstance.getTaskCode())); + return isTaskDepsComplete(taskInstance.getTaskCode()); } /** @@ -1871,8 +1817,9 @@ public class WorkflowExecuteRunnable implements Callable { * close the on going tasks */ public void killAllTasks() { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); log.info("kill called on process instance id: {}, num: {}", - processInstance.getId(), + workflowInstance.getId(), taskExecuteRunnableMap.size()); if (readyToSubmitTaskQueue.size() > 0) { @@ -1886,7 +1833,7 @@ public class WorkflowExecuteRunnable implements Callable { } try ( final LogUtils.MDCAutoClosableContext mdcAutoClosableContext = - LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId)) { + LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstance.getId(), taskInstanceId)) { TaskInstance taskInstance = taskInstanceDao.queryById(taskInstanceId); if (taskInstance == null || taskInstance.getState().isFinished()) { continue; @@ -1895,7 +1842,7 @@ public class WorkflowExecuteRunnable implements Callable { defaultTaskExecuteRunnable.kill(); if (defaultTaskExecuteRunnable.getTaskInstance().getState().isFinished()) { TaskStateEvent taskStateEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) + .processInstanceId(workflowInstance.getId()) .taskInstanceId(taskInstance.getId()) .status(defaultTaskExecuteRunnable.getTaskInstance().getState()) .type(StateEventType.TASK_STATE_CHANGE) @@ -1907,13 +1854,14 @@ public class WorkflowExecuteRunnable implements Callable { } public boolean workFlowFinish() { - return this.processInstance.getState().isFinished(); + return workflowExecuteContext.getWorkflowInstance().getState().isFinished(); } /** * handling the list of tasks to be submitted */ public void submitStandByTask() throws StateEventHandleException { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); TaskInstance task; while ((task = readyToSubmitTaskQueue.peek()) != null) { // stop tasks which is retrying if forced success happens @@ -1928,7 +1876,7 @@ public class WorkflowExecuteRunnable implements Callable { completeTaskSet.add(task.getTaskCode()); taskInstanceMap.put(task.getId(), task); taskCodeInstanceMap.put(task.getTaskCode(), task); - submitPostNode(Long.toString(task.getTaskCode())); + submitPostNode(task.getTaskCode()); continue; } } @@ -1936,8 +1884,9 @@ public class WorkflowExecuteRunnable implements Callable { if (task.isFirstRun()) { // get pre task ,get all the task varPool to this task // Do not use dag.getPreviousNodes because of the dag may be miss the upstream node - String preTasks = taskNodesMap.get(task.getTaskCode()).getPreTasks(); - Set preTaskList = new HashSet<>(JSONUtils.toList(preTasks, String.class)); + String preTasks = workflowExecuteContext.getWorkflowGraph() + .getTaskNodeByCode(task.getTaskCode()).getPreTasks(); + Set preTaskList = new HashSet<>(JSONUtils.toList(preTasks, Long.class)); getPreVarPool(task, preTaskList); } DependResult dependResult = getDependResultForTask(task); @@ -1949,7 +1898,7 @@ public class WorkflowExecuteRunnable implements Callable { if (!removeTaskFromStandbyList(task)) { log.error( "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}", - processInstance.getId(), + workflowInstance.getId(), task.getTaskCode()); } completeTaskSet.add(task.getTaskCode()); @@ -2038,26 +1987,6 @@ public class WorkflowExecuteRunnable implements Callable { return recoveryNodeCodeList; } - /** - * generate flow dag - * - * @param totalTaskNodeList total task node list - * @param startNodeNameList start node name list - * @param recoveryNodeCodeList recovery node code list - * @param depNodeType depend node type - * @return ProcessDag process dag - * @throws Exception exception - */ - public ProcessDag generateFlowDag(List totalTaskNodeList, - List startNodeNameList, - List recoveryNodeCodeList, - TaskDependType depNodeType) throws Exception { - return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); - } - - /** - * check task queue - */ private boolean checkTaskQueue() { AtomicBoolean result = new AtomicBoolean(false); taskInstanceMap.forEach((id, taskInstance) -> { @@ -2068,23 +1997,21 @@ public class WorkflowExecuteRunnable implements Callable { return result.get(); } - /** - * is new process instance - */ private boolean isNewProcessInstance() { - if (Flag.YES.equals(processInstance.getRecovery())) { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + if (Flag.YES.equals(workflowInstance.getRecovery())) { log.info("This workInstance will be recover by this execution"); return false; } - if (WorkflowExecutionStatus.RUNNING_EXECUTION == processInstance.getState() - && processInstance.getRunTimes() == 1) { + if (WorkflowExecutionStatus.RUNNING_EXECUTION == workflowInstance.getState() + && workflowInstance.getRunTimes() == 1) { return true; } log.info( "The workflowInstance has been executed before, this execution is to reRun, processInstance status: {}, runTimes: {}", - processInstance.getState(), - processInstance.getRunTimes()); + workflowInstance.getState(), + workflowInstance.getRunTimes()); return false; } @@ -2144,23 +2071,24 @@ public class WorkflowExecuteRunnable implements Callable { * @return task instance */ protected void clearDataIfExecuteTask() { + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); // only clear data if command is EXECUTE_TASK - if (!processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) { + if (!workflowInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) { return; } // Records the key of varPool data to be removed - Set taskCodesString = dag.getAllNodesList(); + DAG dag = workflowExecuteContext.getWorkflowGraph().getDag(); + Set allNodesList = dag.getAllNodesList(); List removeTaskInstances = new ArrayList<>(); - for (String taskCodeString : taskCodesString) { - long taskCode = Long.parseLong(taskCodeString); + for (Long taskCode : allNodesList) { TaskInstance taskInstance; if (validTaskMap.containsKey(taskCode)) { taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode)); } else { - taskInstance = taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(), taskCode); + taskInstance = taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(workflowInstance.getId(), taskCode); } if (taskInstance == null) { continue; @@ -2189,25 +2117,25 @@ public class WorkflowExecuteRunnable implements Callable { // remove varPool data and update process instance // TODO: we can remove this snippet if : we get varPool from pre taskInstance instead of process instance when // task can not get pre task from incomplete dag - List processProperties = JSONUtils.toList(processInstance.getVarPool(), Property.class); + List processProperties = JSONUtils.toList(workflowInstance.getVarPool(), Property.class); processProperties = processProperties.stream() .filter(property -> !(property.getDirect().equals(Direct.IN) && removeSet.contains(String.format("%s_%s", property.getProp(), property.getType())))) .collect(Collectors.toList()); - processInstance.setVarPool(JSONUtils.toJsonString(processProperties)); - processInstanceDao.updateById(processInstance); + workflowInstance.setVarPool(JSONUtils.toJsonString(processProperties)); + processInstanceDao.updateById(workflowInstance); // remove task instance from taskInstanceMap, completeTaskSet, validTaskMap, errorTaskMap // completeTaskSet remove dependency taskInstanceMap, so the sort can't change completeTaskSet.removeIf(taskCode -> { Optional existTaskInstanceOptional = getTaskInstance(taskCode); return existTaskInstanceOptional - .filter(taskInstance -> dag.containsNode(Long.toString(taskInstance.getTaskCode()))).isPresent(); + .filter(taskInstance -> dag.containsNode(taskInstance.getTaskCode())).isPresent(); }); - taskInstanceMap.entrySet().removeIf(entry -> dag.containsNode(Long.toString(entry.getValue().getTaskCode()))); - validTaskMap.entrySet().removeIf(entry -> dag.containsNode(Long.toString(entry.getKey()))); - errorTaskMap.entrySet().removeIf(entry -> dag.containsNode(Long.toString(entry.getKey()))); + taskInstanceMap.entrySet().removeIf(entry -> dag.containsNode(entry.getValue().getTaskCode())); + validTaskMap.entrySet().removeIf(entry -> dag.containsNode(entry.getKey())); + errorTaskMap.entrySet().removeIf(entry -> dag.containsNode(entry.getKey())); } private void saveCacheTaskInstance(TaskInstance taskInstance) { @@ -2224,7 +2152,7 @@ public class WorkflowExecuteRunnable implements Callable { } private enum WorkflowRunnableStatus { - CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED, + CREATED, INITIALIZE_QUEUE, STARTED, ; } @@ -2241,9 +2169,10 @@ public class WorkflowExecuteRunnable implements Callable { if (StringUtils.isEmpty(taskVarPoolJson)) { return; } - String processVarPoolJson = processInstance.getVarPool(); + ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); + String processVarPoolJson = workflowInstance.getVarPool(); if (StringUtils.isEmpty(processVarPoolJson)) { - processInstance.setVarPool(taskVarPoolJson); + workflowInstance.setVarPool(taskVarPoolJson); return; } List processVarPool = new ArrayList<>(JSONUtils.toList(processVarPoolJson, Property.class)); @@ -2254,6 +2183,6 @@ public class WorkflowExecuteRunnable implements Callable { processVarPool.addAll(taskVarPool); - processInstance.setVarPool(JSONUtils.toJsonString(processVarPool)); + workflowInstance.setVarPool(JSONUtils.toJsonString(processVarPool)); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java index 00510756b7..da086718e2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableFactory.java @@ -17,16 +17,12 @@ package org.apache.dolphinscheduler.server.master.runner; -import org.apache.dolphinscheduler.common.enums.SlotCheckState; import org.apache.dolphinscheduler.dao.entity.Command; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; -import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; -import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -43,9 +39,6 @@ import org.springframework.stereotype.Component; @Component public class WorkflowExecuteRunnableFactory { - @Autowired - private ServerNodeManager serverNodeManager; - @Autowired private CommandService commandService; @@ -79,10 +72,15 @@ public class WorkflowExecuteRunnableFactory { @Autowired private DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory; + @Autowired + private WorkflowExecuteContextFactory workflowExecuteContextFactory; + public WorkflowExecuteRunnable createWorkflowExecuteRunnable(Command command) throws WorkflowCreateException { try { - ProcessInstance workflowInstance = createWorkflowInstance(command); - return new WorkflowExecuteRunnable(workflowInstance, + IWorkflowExecuteContext workflowExecuteRunnableContext = + workflowExecuteContextFactory.createWorkflowExecuteRunnableContext(command); + return new WorkflowExecuteRunnable( + workflowExecuteRunnableContext, commandService, processService, processInstanceDao, @@ -92,43 +90,10 @@ public class WorkflowExecuteRunnableFactory { stateWheelExecuteThread, curingGlobalParamsService, taskInstanceDao, - taskDefinitionLogDao, defaultTaskExecuteRunnableFactory); } catch (Exception ex) { throw new WorkflowCreateException("Create workflow execute runnable failed", ex); } } - private ProcessInstance createWorkflowInstance(Command command) throws Exception { - long commandTransformStartTime = System.currentTimeMillis(); - // Note: this check is not safe, the slot may change after command transform. - // We use the database transaction in `handleCommand` so that we can guarantee the command will - // always be executed - // by only one master - SlotCheckState slotCheckState = slotCheck(command); - if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) { - log.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState); - throw new RuntimeException("Slot check failed the current state: " + slotCheckState); - } - ProcessInstance processInstance = processService.handleCommand(masterConfig.getMasterAddress(), command); - log.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId()); - ProcessInstanceMetrics - .recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime); - return processInstance; - } - - private SlotCheckState slotCheck(Command command) { - int slot = serverNodeManager.getSlot(); - int masterSize = serverNodeManager.getMasterSize(); - SlotCheckState state; - if (masterSize <= 0) { - state = SlotCheckState.CHANGE; - } else if (command.getId() % masterSize == slot) { - state = SlotCheckState.PASS; - } else { - state = SlotCheckState.INJECT; - } - return state; - } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 6a62be507b..fb59d3dd29 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -74,7 +74,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { /** * multi-thread filter, avoid handling workflow at the same time */ - private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap<>(); @PostConstruct private void init() { @@ -106,22 +106,26 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { return; } - if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { + IWorkflowExecuteContext workflowExecuteRunnableContext = + workflowExecuteThread.getWorkflowExecuteContext(); + Integer workflowInstanceId = workflowExecuteRunnableContext.getWorkflowInstance().getId(); + + if (multiThreadFilterMap.containsKey(workflowInstanceId)) { log.debug("The workflow has been executed by another thread"); return; } - multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); - int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); + multiThreadFilterMap.put(workflowInstanceId, workflowExecuteThread); ListenableFuture future = this.submitListenable(workflowExecuteThread::handleEvents); future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { - LogUtils.setWorkflowInstanceIdMDC(processInstanceId); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); try { log.error("Workflow instance events handle failed", ex); - notifyProcessChanged(workflowExecuteThread.getProcessInstance()); - multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + notifyProcessChanged( + workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance()); + multiThreadFilterMap.remove(workflowInstanceId); } finally { LogUtils.removeWorkflowInstanceIdMDC(); } @@ -130,19 +134,22 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { @Override public void onSuccess(Object result) { try { - LogUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId()); + LogUtils.setWorkflowInstanceIdMDC( + workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance().getId()); if (workflowExecuteThread.workFlowFinish() && workflowExecuteThread.eventSize() == 0) { stateWheelExecuteThread - .removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId()); - processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); - notifyProcessChanged(workflowExecuteThread.getProcessInstance()); + .removeProcess4TimeoutCheck(workflowExecuteThread.getWorkflowExecuteContext() + .getWorkflowInstance().getId()); + processInstanceExecCacheManager.removeByProcessInstanceId(workflowInstanceId); + notifyProcessChanged( + workflowExecuteThread.getWorkflowExecuteContext().getWorkflowInstance()); log.info("Workflow instance is finished."); } } catch (Exception e) { log.error("Workflow instance is finished, but notify changed error", e); } finally { // make sure the process has been removed from multiThreadFilterMap - multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + multiThreadFilterMap.remove(workflowInstanceId); LogUtils.removeWorkflowInstanceIdMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatus.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowStartStatus.java similarity index 96% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatus.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowStartStatus.java index b965079bc8..dd4d9030b4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowSubmitStatus.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowStartStatus.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner; -public enum WorkflowSubmitStatus { +public enum WorkflowStartStatus { /** * Submit success */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java index d5c53d0c35..7c447ba02b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java @@ -48,7 +48,7 @@ public class DefaultTaskExecuteRunnableFactory implements TaskExecuteRunnableFac processInstanceExecCacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()); try { return new DefaultTaskExecuteRunnable( - workflowExecuteRunnable.getProcessInstance(), + workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(), taskInstance, taskExecutionContextFactory.createTaskExecutionContext(taskInstance), taskOperatorManager); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java index 845a78101c..acc05aaf2d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/blocking/BlockingLogicTask.java @@ -80,7 +80,8 @@ public class BlockingLogicTask extends BaseSyncLogicTask { boolean isBlocked = (expected == conditionResult); log.info("blocking opportunity: expected-->{}, actual-->{}", expected, conditionResult); ProcessInstance workflowInstance = processInstanceExecCacheManager - .getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()).getProcessInstance(); + .getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()).getWorkflowExecuteContext() + .getWorkflowInstance(); workflowInstance.setBlocked(isBlocked); if (isBlocked) { workflowInstance.setStateWithDesc(WorkflowExecutionStatus.READY_BLOCK, "ready block"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java index c6e20950e0..294f880c77 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java @@ -66,7 +66,7 @@ public class SwitchLogicTask extends BaseSyncLogicTask { .getSwitchDependency()); WorkflowExecuteRunnable workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()); - this.processInstance = workflowExecuteRunnable.getProcessInstance(); + this.processInstance = workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(); this.taskInstance = workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId()) .orElseThrow(() -> new LogicTaskInitializeException( "Cannot find the task instance in workflow execute runnable")); @@ -168,8 +168,8 @@ public class SwitchLogicTask extends BaseSyncLogicTask { if (CollectionUtils.isEmpty(switchResult.getNextNode())) { return false; } - for (String nextNode : switchResult.getNextNode()) { - if (StringUtils.isEmpty(nextNode)) { + for (Long nextNode : switchResult.getNextNode()) { + if (nextNode == null) { return false; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java index 91b8edb127..80c9984054 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/ExecutingService.java @@ -55,7 +55,8 @@ public class ExecutingService { } try { WorkflowExecuteDto workflowExecuteDto = new WorkflowExecuteDto(); - BeanUtils.copyProperties(workflowExecuteDto, workflowExecuteRunnable.getProcessInstance()); + BeanUtils.copyProperties(workflowExecuteDto, + workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance()); List taskInstanceList = Lists.newArrayList(); if (CollectionUtils.isNotEmpty(workflowExecuteRunnable.getAllTaskInstances())) { for (TaskInstance taskInstance : workflowExecuteRunnable.getAllTaskInstances()) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index 8b8160dba4..25d9527e75 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -131,7 +131,8 @@ public class WorkerFailoverService { if (workflowExecuteRunnable == null) { return null; } - return workflowExecuteRunnable.getProcessInstance(); + return workflowExecuteRunnable.getWorkflowExecuteContext() + .getWorkflowInstance(); }); if (!checkTaskInstanceNeedFailover(needFailoverWorkerStartTime, processInstance, taskInstance)) { log.info("Worker[{}] the current taskInstance doesn't need to failover", workerHost); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java index 55b54fd75a..05b8f7aa13 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -44,14 +43,13 @@ public class ProcessInstanceExecCacheManagerImplTest { @BeforeEach public void before() { - Mockito.when(workflowExecuteThread.getKey()).thenReturn("workflowExecuteThread1"); processInstanceExecCacheManager.cache(1, workflowExecuteThread); } @Test public void testGetByProcessInstanceId() { WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1); - Assertions.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey()); + Assertions.assertNotNull(workflowExecuteThread); } @Test @@ -61,9 +59,7 @@ public class ProcessInstanceExecCacheManagerImplTest { @Test public void testCacheNull() { - Assertions.assertThrows(NullPointerException.class, () -> { - processInstanceExecCacheManager.cache(2, null); - }); + Assertions.assertThrows(NullPointerException.class, () -> processInstanceExecCacheManager.cache(2, null)); WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2); Assertions.assertNull(workflowExecuteThread); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 332a2617d7..3aff6d65e8 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -98,6 +99,8 @@ public class WorkflowExecuteRunnableTest { private DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory; + private WorkflowExecuteContextFactory workflowExecuteContextFactory; + @BeforeEach public void init() throws Exception { applicationContext = Mockito.mock(ApplicationContext.class); @@ -112,6 +115,8 @@ public class WorkflowExecuteRunnableTest { taskInstanceDao = Mockito.mock(TaskInstanceDao.class); taskDefinitionLogDao = Mockito.mock(TaskDefinitionLogDao.class); defaultTaskExecuteRunnableFactory = Mockito.mock(DefaultTaskExecuteRunnableFactory.class); + workflowExecuteContextFactory = Mockito.mock(WorkflowExecuteContextFactory.class); + Map cmdParam = new HashMap<>(); cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE, "2020-01-01 00:00:00"); cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, "2020-01-20 23:00:00"); @@ -124,14 +129,25 @@ public class WorkflowExecuteRunnableTest { curingGlobalParamsService = Mockito.mock(CuringParamsService.class); MasterRpcClient masterRpcClient = Mockito.mock(MasterRpcClient.class); ProcessAlertManager processAlertManager = Mockito.mock(ProcessAlertManager.class); + WorkflowExecuteContext workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class); + Mockito.when(workflowExecuteContext.getWorkflowInstance()).thenReturn(processInstance); + IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class); + Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph); + Mockito.when(workflowGraph.getDag()).thenReturn(new DAG<>()); + workflowExecuteThread = Mockito.spy( - new WorkflowExecuteRunnable(processInstance, commandService, processService, processInstanceDao, + new WorkflowExecuteRunnable( + workflowExecuteContext, + commandService, + processService, + processInstanceDao, masterRpcClient, - processAlertManager, config, stateWheelExecuteThread, curingGlobalParamsService, - taskInstanceDao, taskDefinitionLogDao, defaultTaskExecuteRunnableFactory)); - Field dag = WorkflowExecuteRunnable.class.getDeclaredField("dag"); - dag.setAccessible(true); - dag.set(workflowExecuteThread, new DAG()); + processAlertManager, + config, + stateWheelExecuteThread, + curingGlobalParamsService, + taskInstanceDao, + defaultTaskExecuteRunnableFactory)); } @Test @@ -187,9 +203,9 @@ public class WorkflowExecuteRunnableTest { @Test public void testGetPreVarPool() { try { - Set preTaskName = new HashSet<>(); - preTaskName.add(Long.toString(1)); - preTaskName.add(Long.toString(2)); + Set preTaskName = new HashSet<>(); + preTaskName.add(1L); + preTaskName.add(2L); TaskInstance taskInstance = new TaskInstance(); @@ -271,7 +287,7 @@ public class WorkflowExecuteRunnableTest { Mockito.when(processService.findProcessInstanceById(222)).thenReturn(processInstance9); workflowExecuteThread.checkSerialProcess(processDefinition1); } catch (Exception e) { - Assertions.fail(); + Assertions.fail(e); } } @@ -314,17 +330,23 @@ public class WorkflowExecuteRunnableTest { Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK); Mockito.when(processInstance.getId()).thenReturn(123); - DAG dag = Mockito.mock(DAG.class); - Set taskCodesString = new HashSet<>(); - taskCodesString.add("1"); - taskCodesString.add("2"); + DAG dag = Mockito.mock(DAG.class); + Set taskCodesString = new HashSet<>(); + taskCodesString.add(1L); + taskCodesString.add(2L); Mockito.when(dag.getAllNodesList()).thenReturn(taskCodesString); - Mockito.when(dag.containsNode("1")).thenReturn(true); - Mockito.when(dag.containsNode("2")).thenReturn(false); + Mockito.when(dag.containsNode(1L)).thenReturn(true); + Mockito.when(dag.containsNode(2L)).thenReturn(false); + + WorkflowExecuteContext workflowExecuteContext = Mockito.mock(WorkflowExecuteContext.class); + Mockito.when(workflowExecuteContext.getWorkflowInstance()).thenReturn(processInstance); + IWorkflowGraph workflowGraph = Mockito.mock(IWorkflowGraph.class); + Mockito.when(workflowExecuteContext.getWorkflowGraph()).thenReturn(workflowGraph); + Mockito.when(workflowGraph.getDag()).thenReturn(dag); - Field dagField = masterExecThreadClass.getDeclaredField("dag"); + Field dagField = masterExecThreadClass.getDeclaredField("workflowExecuteContext"); dagField.setAccessible(true); - dagField.set(workflowExecuteThread, dag); + dagField.set(workflowExecuteThread, workflowExecuteContext); Mockito.when(taskInstanceDao.queryByWorkflowInstanceIdAndTaskCode(processInstance.getId(), taskInstance1.getTaskCode())) diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 142932b0cd..b84b227712 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.event.StateEvent; +import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -199,7 +200,10 @@ public class FailoverServiceTest { workerTaskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class); Mockito.when(workflowExecuteRunnable.getAllTaskInstances()).thenReturn(Lists.newArrayList(workerTaskInstance)); - Mockito.when(workflowExecuteRunnable.getProcessInstance()).thenReturn(processInstance); + + IWorkflowExecuteContext workflowExecuteRunnableContext = Mockito.mock(IWorkflowExecuteContext.class); + Mockito.when(workflowExecuteRunnable.getWorkflowExecuteContext()).thenReturn(workflowExecuteRunnableContext); + Mockito.when(workflowExecuteRunnableContext.getWorkflowInstance()).thenReturn(processInstance); Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable)); Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java index 1931e9e2e9..954003ec41 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/model/TaskNode.java @@ -133,7 +133,7 @@ public class TaskNode { /** * node dependency list */ - private List depList; + private List depList; /** * outer dependency information @@ -242,7 +242,7 @@ public class TaskNode { public void setPreTasks(String preTasks) { this.preTasks = preTasks; - this.depList = JSONUtils.toList(preTasks, String.class); + this.depList = JSONUtils.toList(preTasks, Long.class); } public String getExtras() { @@ -253,11 +253,11 @@ public class TaskNode { this.extras = extras; } - public List getDepList() { + public List getDepList() { return depList; } - public void setDepList(List depList) { + public void setDepList(List depList) { if (depList != null) { this.depList = depList; this.preTasks = JSONUtils.toJsonString(depList); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 4015c1c1ff..8d755fe325 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -171,7 +171,7 @@ public interface ProcessService { boolean isTaskOnline(long taskCode); - DAG genDagGraph(ProcessDefinition processDefinition); + DAG genDagGraph(ProcessDefinition processDefinition); DagData genDagData(ProcessDefinition processDefinition); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 3c6fd9f7c5..1d5e1f960b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2125,7 +2125,7 @@ public class ProcessServiceImpl implements ProcessService { * @return dag graph */ @Override - public DAG genDagGraph(ProcessDefinition processDefinition) { + public DAG genDagGraph(ProcessDefinition processDefinition) { List taskRelations = this.findRelationByCode(processDefinition.getCode(), processDefinition.getVersion()); List taskNodeList = transformTask(taskRelations, Lists.newArrayList()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java index f8f858b18e..b966d93997 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java @@ -61,11 +61,11 @@ public class DagHelper { List nodeRelationList = new ArrayList<>(); for (TaskNode taskNode : taskNodeList) { String preTasks = taskNode.getPreTasks(); - List preTaskList = JSONUtils.toList(preTasks, String.class); + List preTaskList = JSONUtils.toList(preTasks, Long.class); if (preTaskList != null) { - for (String depNodeCode : preTaskList) { + for (Long depNodeCode : preTaskList) { if (null != findNodeByCode(taskNodeList, depNodeCode)) { - nodeRelationList.add(new TaskNodeRelation(depNodeCode, Long.toString(taskNode.getCode()))); + nodeRelationList.add(new TaskNodeRelation(depNodeCode, taskNode.getCode())); } } } @@ -83,11 +83,11 @@ public class DagHelper { * @return task node list */ public static List generateFlowNodeListByStartNode(List taskNodeList, - List startNodeNameList, - List recoveryNodeCodeList, + List startNodeNameList, + List recoveryNodeCodeList, TaskDependType taskDependType) { List destFlowNodeList = new ArrayList<>(); - List startNodeList = startNodeNameList; + List startNodeList = startNodeNameList; if (taskDependType != TaskDependType.TASK_POST && CollectionUtils.isEmpty(startNodeList)) { log.error("start node list is empty! cannot continue run the process "); @@ -106,7 +106,7 @@ public class DagHelper { tmpTaskNodeList = taskNodeList; } else { // specified start nodes or resume execution - for (String startNodeCode : startNodeList) { + for (Long startNodeCode : startNodeList) { TaskNode startNode = findNodeByCode(taskNodeList, startNodeCode); List childNodeList = new ArrayList<>(); if (startNode == null) { @@ -115,10 +115,10 @@ public class DagHelper { taskNodeList); continue; } else if (TaskDependType.TASK_POST == taskDependType) { - List visitedNodeCodeList = new ArrayList<>(); + List visitedNodeCodeList = new ArrayList<>(); childNodeList = getFlowNodeListPost(startNode, taskNodeList, visitedNodeCodeList); } else if (TaskDependType.TASK_PRE == taskDependType) { - List visitedNodeCodeList = new ArrayList<>(); + List visitedNodeCodeList = new ArrayList<>(); childNodeList = getFlowNodeListPre(startNode, recoveryNodeCodeList, taskNodeList, visitedNodeCodeList); } else { @@ -129,7 +129,7 @@ public class DagHelper { } for (TaskNode taskNode : tmpTaskNodeList) { - if (null == findNodeByCode(destTaskNodeList, Long.toString(taskNode.getCode()))) { + if (null == findNodeByCode(destTaskNodeList, taskNode.getCode())) { destTaskNodeList.add(taskNode); } } @@ -143,19 +143,20 @@ public class DagHelper { * @param taskNodeList taskNodeList * @return task node list */ - private static List getFlowNodeListPost(TaskNode startNode, List taskNodeList, - List visitedNodeCodeList) { + private static List getFlowNodeListPost(TaskNode startNode, + List taskNodeList, + List visitedNodeCodeList) { List resultList = new ArrayList<>(); for (TaskNode taskNode : taskNodeList) { - List depList = taskNode.getDepList(); - if (null != depList && null != startNode && depList.contains(Long.toString(startNode.getCode())) - && !visitedNodeCodeList.contains(Long.toString(taskNode.getCode()))) { + List depList = taskNode.getDepList(); + if (null != depList && null != startNode && depList.contains(startNode.getCode()) + && !visitedNodeCodeList.contains(taskNode.getCode())) { resultList.addAll(getFlowNodeListPost(taskNode, taskNodeList, visitedNodeCodeList)); } } // why add (startNode != null) condition? for SonarCloud Quality Gate passed if (null != startNode) { - visitedNodeCodeList.add(Long.toString(startNode.getCode())); + visitedNodeCodeList.add(startNode.getCode()); } resultList.add(startNode); @@ -170,12 +171,14 @@ public class DagHelper { * @param taskNodeList taskNodeList * @return task node list */ - private static List getFlowNodeListPre(TaskNode startNode, List recoveryNodeCodeList, - List taskNodeList, List visitedNodeCodeList) { + private static List getFlowNodeListPre(TaskNode startNode, + List recoveryNodeCodeList, + List taskNodeList, + List visitedNodeCodeList) { List resultList = new ArrayList<>(); - List depList = new ArrayList<>(); + List depList = new ArrayList<>(); if (null != startNode) { depList = startNode.getDepList(); resultList.add(startNode); @@ -183,7 +186,7 @@ public class DagHelper { if (CollectionUtils.isEmpty(depList)) { return resultList; } - for (String depNodeCode : depList) { + for (Long depNodeCode : depList) { TaskNode start = findNodeByCode(taskNodeList, depNodeCode); if (recoveryNodeCodeList.contains(depNodeCode)) { resultList.add(start); @@ -193,7 +196,7 @@ public class DagHelper { } // why add (startNode != null) condition? for SonarCloud Quality Gate passed if (null != startNode) { - visitedNodeCodeList.add(Long.toString(startNode.getCode())); + visitedNodeCodeList.add(startNode.getCode()); } return resultList; } @@ -209,8 +212,8 @@ public class DagHelper { * @throws Exception if error throws Exception */ public static ProcessDag generateFlowDag(List totalTaskNodeList, - List startNodeNameList, - List recoveryNodeCodeList, + List startNodeNameList, + List recoveryNodeCodeList, TaskDependType depNodeType) throws Exception { List destTaskNodeList = generateFlowNodeListByStartNode(totalTaskNodeList, startNodeNameList, @@ -248,9 +251,9 @@ public class DagHelper { * @param nodeCode nodeCode * @return task node */ - public static TaskNode findNodeByCode(List nodeDetails, String nodeCode) { + public static TaskNode findNodeByCode(List nodeDetails, Long nodeCode) { for (TaskNode taskNode : nodeDetails) { - if (Long.toString(taskNode.getCode()).equals(nodeCode)) { + if (taskNode.getCode() == nodeCode) { return taskNode; } } @@ -266,14 +269,14 @@ public class DagHelper { * @return can submit */ public static boolean allDependsForbiddenOrEnd(TaskNode taskNode, - DAG dag, - Map skipTaskNodeList, - Map completeTaskList) { - List dependList = taskNode.getDepList(); + DAG dag, + Map skipTaskNodeList, + Map completeTaskList) { + List dependList = taskNode.getDepList(); if (dependList == null) { return true; } - for (String dependNodeCode : dependList) { + for (Long dependNodeCode : dependList) { TaskNode dependNode = dag.getNode(dependNodeCode); if (dependNode == null || completeTaskList.containsKey(dependNodeCode) || dependNode.isForbidden() @@ -293,25 +296,25 @@ public class DagHelper { * * @return successor nodes */ - public static Set parsePostNodes(String preNodeCode, - Map skipTaskNodeList, - DAG dag, - Map completeTaskList) { - Set postNodeList = new HashSet<>(); - Collection startVertexes = new ArrayList<>(); + public static Set parsePostNodes(Long preNodeCode, + Map skipTaskNodeList, + DAG dag, + Map completeTaskList) { + Set postNodeList = new HashSet<>(); + Collection startVertexes = new ArrayList<>(); if (preNodeCode == null) { startVertexes = dag.getBeginNode(); } else if (dag.getNode(preNodeCode).isConditionsTask()) { - List conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); + List conditionTaskList = parseConditionTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); startVertexes.addAll(conditionTaskList); } else if (dag.getNode(preNodeCode).isSwitchTask()) { - List conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); + List conditionTaskList = parseSwitchTask(preNodeCode, skipTaskNodeList, dag, completeTaskList); startVertexes.addAll(conditionTaskList); } else { startVertexes = dag.getSubsequentNodes(preNodeCode); } - for (String subsequent : startVertexes) { + for (Long subsequent : startVertexes) { TaskNode taskNode = dag.getNode(subsequent); if (taskNode == null) { log.error("taskNode {} is null, please check dag", subsequent); @@ -337,11 +340,11 @@ public class DagHelper { * if all of the task dependence are skipped, skip it too. */ private static boolean isTaskNodeNeedSkip(TaskNode taskNode, - Map skipTaskNodeList) { + Map skipTaskNodeList) { if (CollectionUtils.isEmpty(taskNode.getDepList())) { return false; } - for (String depNode : taskNode.getDepList()) { + for (Long depNode : taskNode.getDepList()) { if (!skipTaskNodeList.containsKey(depNode)) { return false; } @@ -353,11 +356,11 @@ public class DagHelper { * parse condition task find the branch process * set skip flag for another one. */ - public static List parseConditionTask(String nodeCode, - Map skipTaskNodeList, - DAG dag, - Map completeTaskList) { - List conditionTaskList = new ArrayList<>(); + public static List parseConditionTask(Long nodeCode, + Map skipTaskNodeList, + DAG dag, + Map completeTaskList) { + List conditionTaskList = new ArrayList<>(); TaskNode taskNode = dag.getNode(nodeCode); if (!taskNode.isConditionsTask()) { return conditionTaskList; @@ -368,7 +371,7 @@ public class DagHelper { TaskInstance taskInstance = completeTaskList.get(nodeCode); ConditionsParameters conditionsParameters = JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); - List skipNodeList = new ArrayList<>(); + List skipNodeList = new ArrayList<>(); if (taskInstance.getState().isSuccess()) { conditionTaskList = conditionsParameters.getSuccessNode(); skipNodeList = conditionsParameters.getFailedNode(); @@ -380,7 +383,7 @@ public class DagHelper { } // the skipNodeList maybe null if no next task skipNodeList = Optional.ofNullable(skipNodeList).orElse(new ArrayList<>()); - for (String failedNode : skipNodeList) { + for (Long failedNode : skipNodeList) { setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList); } // the conditionTaskList maybe null if no next task @@ -395,11 +398,11 @@ public class DagHelper { * @param nodeCode * @return */ - public static List parseSwitchTask(String nodeCode, - Map skipTaskNodeList, - DAG dag, - Map completeTaskList) { - List conditionTaskList = new ArrayList<>(); + public static List parseSwitchTask(Long nodeCode, + Map skipTaskNodeList, + DAG dag, + Map completeTaskList) { + List conditionTaskList = new ArrayList<>(); TaskNode taskNode = dag.getNode(nodeCode); if (!taskNode.isSwitchTask()) { return conditionTaskList; @@ -411,15 +414,16 @@ public class DagHelper { return conditionTaskList; } - private static List skipTaskNode4Switch(TaskNode taskNode, Map skipTaskNodeList, - Map completeTaskList, - DAG dag) { + private static List skipTaskNode4Switch(TaskNode taskNode, + Map skipTaskNodeList, + Map completeTaskList, + DAG dag) { SwitchParameters switchParameters = - completeTaskList.get(Long.toString(taskNode.getCode())).getSwitchDependency(); + completeTaskList.get(taskNode.getCode()).getSwitchDependency(); int resultConditionLocation = switchParameters.getResultConditionLocation(); List conditionResultVoList = switchParameters.getDependTaskList(); - List switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); + List switchTaskList = conditionResultVoList.get(resultConditionLocation).getNextNode(); if (CollectionUtils.isEmpty(switchTaskList)) { switchTaskList = new ArrayList<>(); } @@ -436,16 +440,16 @@ public class DagHelper { /** * set task node and the post nodes skip flag */ - private static void setTaskNodeSkip(String skipNodeCode, - DAG dag, - Map completeTaskList, - Map skipTaskNodeList) { + private static void setTaskNodeSkip(Long skipNodeCode, + DAG dag, + Map completeTaskList, + Map skipTaskNodeList) { if (!dag.containsNode(skipNodeCode)) { return; } skipTaskNodeList.putIfAbsent(skipNodeCode, dag.getNode(skipNodeCode)); - Collection postNodeList = dag.getSubsequentNodes(skipNodeCode); - for (String post : postNodeList) { + Collection postNodeList = dag.getSubsequentNodes(skipNodeCode); + for (Long post : postNodeList) { TaskNode postNode = dag.getNode(post); if (isTaskNodeNeedSkip(postNode, skipTaskNodeList)) { setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList); @@ -458,14 +462,14 @@ public class DagHelper { * @param processDag processDag * @return dag */ - public static DAG buildDagGraph(ProcessDag processDag) { + public static DAG buildDagGraph(ProcessDag processDag) { - DAG dag = new DAG<>(); + DAG dag = new DAG<>(); // add vertex if (CollectionUtils.isNotEmpty(processDag.getNodes())) { for (TaskNode node : processDag.getNodes()) { - dag.addNode(Long.toString(node.getCode()), node); + dag.addNode(node.getCode(), node); } } @@ -490,12 +494,12 @@ public class DagHelper { // Traverse node information and build relationships for (TaskNode taskNode : taskNodeList) { String preTasks = taskNode.getPreTasks(); - List preTasksList = JSONUtils.toList(preTasks, String.class); + List preTasksList = JSONUtils.toList(preTasks, Long.class); // If the dependency is not empty if (preTasksList != null) { - for (String depNode : preTasksList) { - taskNodeRelations.add(new TaskNodeRelation(depNode, Long.toString(taskNode.getCode()))); + for (Long depNode : preTasksList) { + taskNodeRelations.add(new TaskNodeRelation(depNode, taskNode.getCode())); } } } @@ -530,7 +534,7 @@ public class DagHelper { TaskNode preNode = taskNodeMap.get(preTaskCode); TaskNode postNode = taskNodeMap.get(postTaskCode); taskNodeRelations - .add(new TaskNodeRelation(Long.toString(preNode.getCode()), Long.toString(postNode.getCode()))); + .add(new TaskNodeRelation(preNode.getCode(), postNode.getCode())); } } ProcessDag processDag = new ProcessDag(); @@ -542,20 +546,20 @@ public class DagHelper { /** * is there have conditions after the parent node */ - public static boolean haveConditionsAfterNode(String parentNodeCode, - DAG dag) { + public static boolean haveConditionsAfterNode(Long parentNodeCode, + DAG dag) { return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_CONDITIONS); } /** * is there have conditions after the parent node */ - public static boolean haveConditionsAfterNode(String parentNodeCode, List taskNodes) { + public static boolean haveConditionsAfterNode(Long parentNodeCode, List taskNodes) { if (CollectionUtils.isEmpty(taskNodes)) { return false; } for (TaskNode taskNode : taskNodes) { - List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class); + List preTasksList = JSONUtils.toList(taskNode.getPreTasks(), Long.class); if (preTasksList.contains(parentNodeCode) && taskNode.isConditionsTask()) { return true; } @@ -566,32 +570,32 @@ public class DagHelper { /** * is there have blocking node after the parent node */ - public static boolean haveBlockingAfterNode(String parentNodeCode, - DAG dag) { + public static boolean haveBlockingAfterNode(Long parentNodeCode, + DAG dag) { return haveSubAfterNode(parentNodeCode, dag, TaskConstants.TASK_TYPE_BLOCKING); } /** * is there have all node after the parent node */ - public static boolean haveAllNodeAfterNode(String parentNodeCode, - DAG dag) { + public static boolean haveAllNodeAfterNode(Long parentNodeCode, + DAG dag) { return haveSubAfterNode(parentNodeCode, dag, null); } /** * Whether there is a specified type of child node after the parent node */ - public static boolean haveSubAfterNode(String parentNodeCode, - DAG dag, String filterNodeType) { - Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); + public static boolean haveSubAfterNode(Long parentNodeCode, + DAG dag, String filterNodeType) { + Set subsequentNodes = dag.getSubsequentNodes(parentNodeCode); if (CollectionUtils.isEmpty(subsequentNodes)) { return false; } if (StringUtils.isBlank(filterNodeType)) { return true; } - for (String nodeName : subsequentNodes) { + for (Long nodeName : subsequentNodes) { TaskNode taskNode = dag.getNode(nodeName); if (taskNode.getType().equalsIgnoreCase(filterNodeType)) { return true; diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 83a29a9769..f326a04a73 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -704,7 +704,7 @@ public class ProcessServiceTest { Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt())) .thenReturn(list); - DAG stringTaskNodeTaskNodeRelationDAG = + DAG stringTaskNodeTaskNodeRelationDAG = processService.genDagGraph(processDefinition); Assertions.assertEquals(1, stringTaskNodeTaskNodeRelationDAG.getNodesCount()); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java index 42eabc6416..85c9296248 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java @@ -51,16 +51,16 @@ public class DagHelperTest { @Test public void testHaveSubAfterNode() { - String parentNodeCode = "5293789969856"; + Long parentNodeCode = 5293789969856L; List taskNodeRelations = new ArrayList<>(); TaskNodeRelation relation = new TaskNodeRelation(); - relation.setStartNode("5293789969856"); - relation.setEndNode("5293789969857"); + relation.setStartNode(5293789969856L); + relation.setEndNode(5293789969857L); taskNodeRelations.add(relation); TaskNodeRelation relationNext = new TaskNodeRelation(); - relationNext.setStartNode("5293789969856"); - relationNext.setEndNode("5293789969858"); + relationNext.setStartNode(5293789969856L); + relationNext.setEndNode(5293789969858L); taskNodeRelations.add(relationNext); List taskNodes = new ArrayList<>(); @@ -85,7 +85,7 @@ public class DagHelperTest { ProcessDag processDag = new ProcessDag(); processDag.setEdges(taskNodeRelations); processDag.setNodes(taskNodes); - DAG dag = DagHelper.buildDagGraph(processDag); + DAG dag = DagHelper.buildDagGraph(processDag); boolean canSubmit = DagHelper.haveAllNodeAfterNode(parentNodeCode, dag); Assertions.assertTrue(canSubmit); @@ -108,34 +108,34 @@ public class DagHelperTest { public void testTaskNodeCanSubmit() throws IOException { // 1->2->3->5->7 // 4->3->6 - DAG dag = generateDag(); - TaskNode taskNode3 = dag.getNode("3"); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - completeTaskList.putIfAbsent("1", new TaskInstance()); + DAG dag = generateDag(); + TaskNode taskNode3 = dag.getNode(3L); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); + completeTaskList.putIfAbsent(1L, new TaskInstance()); Boolean canSubmit = false; // 2/4 are forbidden submit 3 - TaskNode node2 = dag.getNode("2"); + TaskNode node2 = dag.getNode(2L); node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - TaskNode nodex = dag.getNode("4"); + TaskNode nodex = dag.getNode(4L); nodex.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList); Assertions.assertEquals(canSubmit, true); // 2forbidden, 3 cannot be submit - completeTaskList.putIfAbsent("2", new TaskInstance()); - TaskNode nodey = dag.getNode("4"); + completeTaskList.putIfAbsent(2L, new TaskInstance()); + TaskNode nodey = dag.getNode(4L); nodey.setRunFlag(""); canSubmit = DagHelper.allDependsForbiddenOrEnd(taskNode3, dag, skipNodeList, completeTaskList); Assertions.assertEquals(canSubmit, false); // 2/3 forbidden submit 5 - TaskNode node3 = dag.getNode("3"); + TaskNode node3 = dag.getNode(3L); node3.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - TaskNode node8 = dag.getNode("8"); + TaskNode node8 = dag.getNode(8L); node8.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); - TaskNode node5 = dag.getNode("5"); + TaskNode node5 = dag.getNode(5L); canSubmit = DagHelper.allDependsForbiddenOrEnd(node5, dag, skipNodeList, completeTaskList); Assertions.assertEquals(canSubmit, true); } @@ -145,64 +145,64 @@ public class DagHelperTest { */ @Test public void testParsePostNodeList() throws IOException { - DAG dag = generateDag(); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); + DAG dag = generateDag(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); - Set postNodes = null; + Set postNodes = null; // complete : null // expect post: 1/4 postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("1")); - Assertions.assertTrue(postNodes.contains("4")); + Assertions.assertTrue(postNodes.contains(1L)); + Assertions.assertTrue(postNodes.contains(4L)); // complete : 1 // expect post: 2/4 - completeTaskList.put("1", new TaskInstance()); + completeTaskList.put(1L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("2")); - Assertions.assertTrue(postNodes.contains("4")); + Assertions.assertTrue(postNodes.contains(2L)); + Assertions.assertTrue(postNodes.contains(4L)); // complete : 1/2 // expect post: 4 - completeTaskList.put("2", new TaskInstance()); + completeTaskList.put(2L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("4")); - Assertions.assertTrue(postNodes.contains("8")); + Assertions.assertTrue(postNodes.contains(4L)); + Assertions.assertTrue(postNodes.contains(8L)); // complete : 1/2/4 // expect post: 3 - completeTaskList.put("4", new TaskInstance()); + completeTaskList.put(4L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("3")); - Assertions.assertTrue(postNodes.contains("8")); + Assertions.assertTrue(postNodes.contains(3L)); + Assertions.assertTrue(postNodes.contains(8L)); // complete : 1/2/4/3 // expect post: 8/6 - completeTaskList.put("3", new TaskInstance()); + completeTaskList.put(3L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("8")); - Assertions.assertTrue(postNodes.contains("6")); + Assertions.assertTrue(postNodes.contains(8L)); + Assertions.assertTrue(postNodes.contains(6L)); // complete : 1/2/4/3/8 // expect post: 6/5 - completeTaskList.put("8", new TaskInstance()); + completeTaskList.put(8L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("5")); - Assertions.assertTrue(postNodes.contains("6")); + Assertions.assertTrue(postNodes.contains(5L)); + Assertions.assertTrue(postNodes.contains(6L)); // complete : 1/2/4/3/5/6/8 // expect post: 7 - completeTaskList.put("6", new TaskInstance()); - completeTaskList.put("5", new TaskInstance()); + completeTaskList.put(6L, new TaskInstance()); + completeTaskList.put(5L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains("7")); + Assertions.assertTrue(postNodes.contains(7L)); } /** @@ -212,35 +212,35 @@ public class DagHelperTest { */ @Test public void testForbiddenPostNode() throws IOException { - DAG dag = generateDag(); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - Set postNodes = null; + DAG dag = generateDag(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); + Set postNodes = null; // dag: 1-2-3-5-7 4-3-6 2-8-5-7 // forbid:2 complete:1 post:4/8 - completeTaskList.put("1", new TaskInstance()); - TaskNode node2 = dag.getNode("2"); + completeTaskList.put(1L, new TaskInstance()); + TaskNode node2 = dag.getNode(2L); node2.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("4")); - Assertions.assertTrue(postNodes.contains("8")); + Assertions.assertTrue(postNodes.contains(4L)); + Assertions.assertTrue(postNodes.contains(8L)); // forbid:2/4 complete:1 post:3/8 - TaskNode node4 = dag.getNode("4"); + TaskNode node4 = dag.getNode(4L); node4.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(2, postNodes.size()); - Assertions.assertTrue(postNodes.contains("3")); - Assertions.assertTrue(postNodes.contains("8")); + Assertions.assertTrue(postNodes.contains(3L)); + Assertions.assertTrue(postNodes.contains(8L)); // forbid:2/4/5 complete:1/8 post:3 - completeTaskList.put("8", new TaskInstance()); - TaskNode node5 = dag.getNode("5"); + completeTaskList.put(8L, new TaskInstance()); + TaskNode node5 = dag.getNode(5L); node5.setRunFlag(Constants.FLOWNODE_RUN_FLAG_FORBIDDEN); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains("3")); + Assertions.assertTrue(postNodes.contains(3L)); } /** @@ -250,16 +250,16 @@ public class DagHelperTest { */ @Test public void testConditionPostNode() throws IOException { - DAG dag = generateDag(); - Map completeTaskList = new HashMap<>(); - Map skipNodeList = new HashMap<>(); - Set postNodes = null; + DAG dag = generateDag(); + Map completeTaskList = new HashMap<>(); + Map skipNodeList = new HashMap<>(); + Set postNodes = null; // dag: 1-2-3-5-7 4-3-6 2-8-5-7 // 3-if - completeTaskList.put("1", new TaskInstance()); - completeTaskList.put("2", new TaskInstance()); - completeTaskList.put("4", new TaskInstance()); - TaskNode node3 = dag.getNode("3"); + completeTaskList.put(1L, new TaskInstance()); + completeTaskList.put(2L, new TaskInstance()); + completeTaskList.put(4L, new TaskInstance()); + TaskNode node3 = dag.getNode(3L); node3.setType(TASK_TYPE_CONDITIONS); node3.setConditionResult("{\n" + @@ -272,51 +272,51 @@ public class DagHelperTest { " ]\n" + " }"); - completeTaskList.remove("3"); + completeTaskList.remove(3L); TaskInstance taskInstance = new TaskInstance(); taskInstance.setState(TaskExecutionStatus.SUCCESS); // complete 1/2/3/4 expect:8 - completeTaskList.put("3", taskInstance); + completeTaskList.put(3L, taskInstance); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains("8")); + Assertions.assertTrue(postNodes.contains(8L)); // 2.complete 1/2/3/4/8 expect:5 skip:6 - completeTaskList.put("8", new TaskInstance()); + completeTaskList.put(8L, new TaskInstance()); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertTrue(postNodes.contains("5")); + Assertions.assertTrue(postNodes.contains(5L)); Assertions.assertEquals(1, skipNodeList.size()); - Assertions.assertTrue(skipNodeList.containsKey("6")); + Assertions.assertTrue(skipNodeList.containsKey(6L)); // 3.complete 1/2/3/4/5/8 expect post:7 skip:6 skipNodeList.clear(); TaskInstance taskInstance1 = new TaskInstance(); taskInstance.setState(TaskExecutionStatus.SUCCESS); - completeTaskList.put("5", taskInstance1); + completeTaskList.put(5L, taskInstance1); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains("7")); + Assertions.assertTrue(postNodes.contains(7L)); Assertions.assertEquals(1, skipNodeList.size()); - Assertions.assertTrue(skipNodeList.containsKey("6")); + Assertions.assertTrue(skipNodeList.containsKey(6L)); // dag: 1-2-3-5-7 4-3-6 // 3-if , complete:1/2/3/4 // 1.failure:3 expect post:6 skip:5/7 skipNodeList.clear(); - completeTaskList.remove("3"); + completeTaskList.remove(3L); taskInstance = new TaskInstance(); Map taskParamsMap = new HashMap<>(); taskParamsMap.put(Constants.SWITCH_RESULT, ""); taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); taskInstance.setState(TaskExecutionStatus.FAILURE); - completeTaskList.put("3", taskInstance); + completeTaskList.put(3L, taskInstance); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains("6")); + Assertions.assertTrue(postNodes.contains(6L)); Assertions.assertEquals(2, skipNodeList.size()); - Assertions.assertTrue(skipNodeList.containsKey("5")); - Assertions.assertTrue(skipNodeList.containsKey("7")); + Assertions.assertTrue(skipNodeList.containsKey(5L)); + Assertions.assertTrue(skipNodeList.containsKey(7L)); // dag: 1-2-3-5-7 4-3-6 // 3-if , complete:1/2/3/4 @@ -325,8 +325,8 @@ public class DagHelperTest { skipNodeList.clear(); completeTaskList.clear(); taskInstance.setSwitchDependency(getSwitchNode()); - completeTaskList.put("1", taskInstance); - postNodes = DagHelper.parsePostNodes("1", skipNodeList, dag, completeTaskList); + completeTaskList.put(1L, taskInstance); + postNodes = DagHelper.parsePostNodes(1L, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); } @@ -345,7 +345,7 @@ public class DagHelperTest { * @return dag * @throws JsonProcessingException if error throws JsonProcessingException */ - private DAG generateDag() throws IOException { + private DAG generateDag() throws IOException { List taskNodeList = new ArrayList<>(); TaskNode node1 = new TaskNode(); node1.setId("1"); @@ -423,8 +423,8 @@ public class DagHelperTest { node8.setPreTasks(JSONUtils.toJsonString(dep8)); taskNodeList.add(node8); - List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); + List startNodes = new ArrayList<>(); + List recoveryNodes = new ArrayList<>(); List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, startNodes, recoveryNodes, TaskDependType.TASK_POST); List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); @@ -445,7 +445,7 @@ public class DagHelperTest { * @return dag * @throws JsonProcessingException if error throws JsonProcessingException */ - private DAG generateDag2() throws IOException { + private DAG generateDag2() throws IOException { List taskNodeList = new ArrayList<>(); TaskNode node = new TaskNode(); @@ -488,13 +488,13 @@ public class DagHelperTest { node5.setName("4"); node5.setCode(4); node5.setType("SHELL"); - List dep5 = new ArrayList<>(); - dep5.add("1"); + List dep5 = new ArrayList<>(); + dep5.add(1L); node5.setPreTasks(JSONUtils.toJsonString(dep5)); taskNodeList.add(node5); - List startNodes = new ArrayList<>(); - List recoveryNodes = new ArrayList<>(); + List startNodes = new ArrayList<>(); + List recoveryNodes = new ArrayList<>(); List destTaskNodeList = DagHelper.generateFlowNodeListByStartNode(taskNodeList, startNodes, recoveryNodes, TaskDependType.TASK_POST); List taskNodeRelations = DagHelper.generateRelationListByFlowNodes(destTaskNodeList); @@ -508,15 +508,15 @@ public class DagHelperTest { SwitchParameters conditionsParameters = new SwitchParameters(); SwitchResultVo switchResultVo1 = new SwitchResultVo(); switchResultVo1.setCondition(" 2 == 1"); - switchResultVo1.setNextNode("2"); + switchResultVo1.setNextNode(2L); SwitchResultVo switchResultVo2 = new SwitchResultVo(); switchResultVo2.setCondition(" 2 == 2"); - switchResultVo2.setNextNode("4"); + switchResultVo2.setNextNode(4L); List list = new ArrayList<>(); list.add(switchResultVo1); list.add(switchResultVo2); conditionsParameters.setDependTaskList(list); - conditionsParameters.setNextNode("5"); + conditionsParameters.setNextNode(5L); conditionsParameters.setRelation("AND"); // in: AND(AND(1 is SUCCESS)) @@ -540,7 +540,7 @@ public class DagHelperTest { assert processData != null; List taskNodeList = processData.getTasks(); ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - DAG dag = DagHelper.buildDagGraph(processDag); + DAG dag = DagHelper.buildDagGraph(processDag); Assertions.assertNotNull(dag); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/SwitchResultVo.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/SwitchResultVo.java index 411f6e1014..3af8fad195 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/SwitchResultVo.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/SwitchResultVo.java @@ -30,16 +30,12 @@ import lombok.NoArgsConstructor; public class SwitchResultVo { private String condition; - private List nextNode; + private List nextNode; public void setNextNode(Object nextNode) { - if (nextNode instanceof String) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(String.valueOf(nextNode)); - this.nextNode = nextNodeList; - } else if (nextNode instanceof Number) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(nextNode.toString()); + if (nextNode instanceof Long) { + List nextNodeList = new ArrayList<>(); + nextNodeList.add((Long) nextNode); this.nextNode = nextNodeList; } else { this.nextNode = (ArrayList) nextNode; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java index 922c344aae..15141937b0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java @@ -24,6 +24,9 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import java.util.ArrayList; import java.util.List; +import lombok.Data; + +@Data public class ConditionsParameters extends AbstractParameters { // depend node list and state, only need task name @@ -31,10 +34,10 @@ public class ConditionsParameters extends AbstractParameters { private DependentRelation dependRelation; // node list to run when success - private List successNode; + private List successNode; // node list to run when failed - private List failedNode; + private List failedNode; @Override public boolean checkParameters() { @@ -46,38 +49,6 @@ public class ConditionsParameters extends AbstractParameters { return new ArrayList<>(); } - public List getDependTaskList() { - return dependTaskList; - } - - public void setDependTaskList(List dependTaskList) { - this.dependTaskList = dependTaskList; - } - - public DependentRelation getDependRelation() { - return dependRelation; - } - - public void setDependRelation(DependentRelation dependRelation) { - this.dependRelation = dependRelation; - } - - public List getSuccessNode() { - return successNode; - } - - public void setSuccessNode(List successNode) { - this.successNode = successNode; - } - - public List getFailedNode() { - return failedNode; - } - - public void setFailedNode(List failedNode) { - this.failedNode = failedNode; - } - public String getConditionResult() { return "{" + "\"successNode\": [\"" + successNode.get(0) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SwitchParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SwitchParameters.java index efdcba62ee..a1928bbe16 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SwitchParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SwitchParameters.java @@ -27,7 +27,7 @@ public class SwitchParameters extends AbstractParameters { private DependentRelation dependRelation; private String relation; - private List nextNode; + private List nextNode; @Override public boolean checkParameters() { @@ -69,18 +69,14 @@ public class SwitchParameters extends AbstractParameters { this.dependTaskList = dependTaskList; } - public List getNextNode() { + public List getNextNode() { return nextNode; } public void setNextNode(Object nextNode) { - if (nextNode instanceof String) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(String.valueOf(nextNode)); - this.nextNode = nextNodeList; - } else if (nextNode instanceof Number) { - List nextNodeList = new ArrayList<>(); - nextNodeList.add(nextNode.toString()); + if (nextNode instanceof Long) { + List nextNodeList = new ArrayList<>(); + nextNodeList.add((Long) nextNode); this.nextNode = nextNodeList; } else { this.nextNode = (ArrayList) nextNode;