From e4b9b67255528d1401f031905e8db07bb51aa580 Mon Sep 17 00:00:00 2001 From: JieguangZhou Date: Tue, 13 Dec 2022 16:43:44 +0800 Subject: [PATCH] Allow execute task in workflow instance (#13103) --- .../api/controller/ExecutorController.java | 32 +++++ .../WorkflowExecuteResponse.java | 30 +++++ .../api/enums/ExecuteType.java | 2 +- .../dolphinscheduler/api/enums/Status.java | 6 + .../api/service/ExecutorService.java | 15 +++ .../api/service/impl/ExecutorServiceImpl.java | 115 ++++++++++++++++++ .../api/service/ExecutorServiceTest.java | 48 ++++++++ .../common/enums/CommandType.java | 3 + .../dolphinscheduler/common/graph/DAG.java | 9 ++ .../dao/repository/TaskInstanceDao.java | 8 ++ .../repository/impl/TaskInstanceDaoImpl.java | 5 + .../runner/WorkflowExecuteRunnable.java | 72 +++++++++++ .../runner/WorkflowExecuteRunnableTest.java | 58 +++++++++ .../service/process/ProcessServiceImpl.java | 20 ++- dolphinscheduler-ui/src/common/common.ts | 4 + .../src/locales/en_US/project.ts | 4 + .../src/locales/zh_CN/project.ts | 4 + .../src/service/modules/executors/index.ts | 9 ++ .../src/service/modules/executors/types.ts | 7 ++ .../components/dag/dag-context-menu.tsx | 61 ++++++++-- .../workflow/components/dag/index.tsx | 25 ++++ .../workflow/instance/detail/index.tsx | 5 +- 22 files changed, 528 insertions(+), 14 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowExecuteResponse.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 712b80ed2b..597b0d50ba 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -450,4 +450,36 @@ public class ExecutorController extends BaseController { warningGroupId, workerGroup, environmentCode, startParamMap, dryRun); return returnDataList(result); } + + /** + * do action to process instance: pause, stop, repeat, recover from pause, recover from stop + * + * @param loginUser login user + * @param projectCode project code + * @param processInstanceId process instance id + * @param startNodeList start node list + * @param taskDependType task depend type + * @return execute result code + */ + @Operation(summary = "execute-task", description = "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") + @Parameters({ + @Parameter(name = "processInstanceId", description = "PROCESS_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")), + @Parameter(name = "startNodeList", description = "START_NODE_LIST", required = true, schema = @Schema(implementation = String.class)), + @Parameter(name = "taskDependType", description = "TASK_DEPEND_TYPE", required = true, schema = @Schema(implementation = TaskDependType.class)) + }) + @PostMapping(value = "/execute-task") + @ResponseStatus(HttpStatus.OK) + @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result executeTask(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam("processInstanceId") Integer processInstanceId, + @RequestParam("startNodeList") String startNodeList, + @RequestParam("taskDependType") TaskDependType taskDependType) { + logger.info("Start to execute task in process instance, projectCode:{}, processInstanceId:{}.", + projectCode, + processInstanceId); + return execService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowExecuteResponse.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowExecuteResponse.java new file mode 100644 index 0000000000..941aeffe8a --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowExecuteResponse.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.workflowInstance; + +import org.apache.dolphinscheduler.api.utils.Result; + +import lombok.Data; + +/** + * user List response + */ +@Data +public class WorkflowExecuteResponse extends Result { + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java index 755daa1dd1..fea69bf22d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/ExecuteType.java @@ -30,7 +30,7 @@ public enum ExecuteType { * 4 stop * 5 pause */ - NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE; + NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE, EXECUTE_TASK; public static ExecuteType getEnum(int value) { for (ExecuteType e : ExecuteType.values()) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 566ffac9c5..d82732d620 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -261,6 +261,9 @@ public enum Status { SCHEDULE_ALREADY_EXISTS(10204, "workflow {0} schedule {1} already exist, please update or delete it", "工作流 {0} 的定时 {1} 已经存在,请更新或删除"), + EXECUTE_NOT_DEFINE_TASK(10204, "please save and try again", + "请先保存后再执行"), + UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), RESOURCE_NOT_EXIST(20004, "resource not exist", "资源不存在"), @@ -380,6 +383,9 @@ public enum Status { PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error", "批量修改工作流任务关系错误"), + WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not finished, can not do this operation", + "工作流实例未结束,不能执行此操作"), + HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"), S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 4c6b930f54..61f1b51623 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; @@ -92,6 +93,20 @@ public interface ExecutorService { */ Map execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType); + /** + * do action to execute task in process instance + * + * @param loginUser login user + * @param projectCode project code + * @param processInstanceId process instance id + * @param startNodeList start node list + * @param taskDependType task depend type + * @return execute result code + */ + WorkflowExecuteResponse executeTask(User loginUser, long projectCode, Integer processInstanceId, + String startNodeList, + TaskDependType taskDependType); + /** * do action to process instance:pause, stop, repeat, recover from pause, recover from stop * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 99a4d90eb5..f03a97881f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -30,6 +30,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.MAX_TASK_TI import static org.apache.dolphinscheduler.common.constants.Constants.SCHEDULE_TIME_MAX_LENGTH; import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant; +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; @@ -68,6 +69,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; @@ -142,6 +144,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private ProcessInstanceDao processInstanceDao; + @Autowired + private TaskDefinitionLogMapper taskDefinitionLogMapper; + @Autowired private StateEventCallbackService stateEventCallbackService; @@ -487,6 +492,116 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return execute(loginUser, processDefinition.getProjectCode(), workflowInstanceId, executeType); } + /** + * do action to execute task in process instance + * + * @param loginUser login user + * @param projectCode project code + * @param processInstanceId process instance id + * @param startNodeList start node list + * @param taskDependType task depend type + * @return execute result code + */ + @Override + public WorkflowExecuteResponse executeTask(User loginUser, long projectCode, Integer processInstanceId, + String startNodeList, TaskDependType taskDependType) { + + WorkflowExecuteResponse response = new WorkflowExecuteResponse(); + + Project project = projectMapper.queryByCode(projectCode); + // check user access for project + + projectService.checkProjectAndAuthThrowException(loginUser, project, + ApiFuncIdentificationConstant.map.get(ExecuteType.EXECUTE_TASK)); + + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId) + .orElseThrow(() -> new ServiceException(Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId)); + + if (!processInstance.getState().isFinished()) { + logger.error("Can not execute task for process instance which is not finished, processInstanceId:{}.", + processInstanceId); + putMsg(response, Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED); + return response; + } + + ProcessDefinition processDefinition = + processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + processDefinition.setReleaseState(ReleaseState.ONLINE); + this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + + if (!checkTenantSuitable(processDefinition)) { + logger.error( + "There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ", + processDefinition.getId(), processDefinition.getName()); + putMsg(response, Status.TENANT_NOT_SUITABLE); + return response; + } + + // get the startParams user specified at the first starting while repeat running is needed + + long startNodeListLong; + try { + startNodeListLong = Long.parseLong(startNodeList); + } catch (NumberFormatException e) { + logger.error("startNodeList is not a number"); + putMsg(response, Status.REQUEST_PARAMS_NOT_VALID_ERROR, startNodeList); + return response; + } + + if (taskDefinitionLogMapper.queryMaxVersionForDefinition(startNodeListLong) == null) { + putMsg(response, Status.EXECUTE_NOT_DEFINE_TASK); + return response; + } + + // To add startParams only when repeat running is needed + Map cmdParam = new HashMap<>(); + cmdParam.put(CMD_PARAM_RECOVER_PROCESS_ID_STRING, processInstanceId); + // Add StartNodeList + cmdParam.put(CMD_PARAM_START_NODES, startNodeList); + + Command command = new Command(); + command.setCommandType(CommandType.EXECUTE_TASK); + command.setProcessDefinitionCode(processDefinition.getCode()); + command.setCommandParam(JSONUtils.toJsonString(cmdParam)); + command.setExecutorId(loginUser.getId()); + command.setProcessDefinitionVersion(processDefinition.getVersion()); + command.setProcessInstanceId(processInstanceId); + command.setTestFlag(processInstance.getTestFlag()); + + // Add taskDependType + command.setTaskDependType(taskDependType); + + if (!commandService.verifyIsNeedCreateCommand(command)) { + logger.warn( + "Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", + processDefinition.getCode(), processDefinition.getVersion(), processInstanceId); + putMsg(response, Status.PROCESS_INSTANCE_EXECUTING_COMMAND, + String.valueOf(processDefinition.getCode())); + return response; + } + + logger.info("Creating command, commandInfo:{}.", command); + int create = commandService.createCommand(command); + + if (create > 0) { + logger.info("Create {} command complete, processDefinitionCode:{}, processDefinitionVersion:{}.", + command.getCommandType().getDescp(), command.getProcessDefinitionCode(), + processDefinition.getVersion()); + putMsg(response, Status.SUCCESS); + } else { + logger.error( + "Execute process instance failed because create {} command error, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", + command.getCommandType().getDescp(), command.getProcessDefinitionCode(), + processDefinition.getVersion(), + processInstanceId); + putMsg(response, Status.EXECUTE_PROCESS_INSTANCE_ERROR); + } + + return response; + } + @Override public Map forceStartTaskInstance(User loginUser, int queueId) { Map result = new HashMap<>(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 7b01b488c8..975ec1238e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -21,10 +21,13 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_START; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteResponse; import org.apache.dolphinscheduler.api.enums.ExecuteType; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; @@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskDependType; import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; @@ -55,6 +59,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.service.command.CommandService; @@ -118,6 +123,9 @@ public class ExecutorServiceTest { @Mock private TaskDefinitionMapper taskDefinitionMapper; + @Mock + private TaskDefinitionLogMapper taskDefinitionLogMapper; + @Mock private ProjectMapper projectMapper; @@ -537,4 +545,44 @@ public class ExecutorServiceTest { Assertions.assertEquals("4,4", result.get(2)); } + @Test + public void testExecuteTask() { + String startNodeList = "1234567870"; + TaskDependType taskDependType = TaskDependType.TASK_ONLY; + + ProcessInstance processInstanceMock = Mockito.mock(ProcessInstance.class, RETURNS_DEEP_STUBS); + Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)) + .thenReturn(Optional.ofNullable(processInstanceMock)); + + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setProjectCode(projectCode); + Mockito.when(processService.findProcessDefinition(Mockito.anyLong(), Mockito.anyInt())) + .thenReturn(processDefinition); + + Mockito.when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(new Tenant()); + + when(processInstanceMock.getState().isFinished()).thenReturn(false); + WorkflowExecuteResponse responseInstanceIsNotFinished = + executorService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType); + Assertions.assertEquals(Status.WORKFLOW_INSTANCE_IS_NOT_FINISHED.getCode(), + responseInstanceIsNotFinished.getCode()); + + when(processInstanceMock.getState().isFinished()).thenReturn(true); + WorkflowExecuteResponse responseStartNodeListError = + executorService.executeTask(loginUser, projectCode, processInstanceId, "1234567870,", taskDependType); + Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), responseStartNodeListError.getCode()); + + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(Mockito.anyLong())).thenReturn(null); + WorkflowExecuteResponse responseNotDefineTask = + executorService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType); + Assertions.assertEquals(Status.EXECUTE_NOT_DEFINE_TASK.getCode(), responseNotDefineTask.getCode()); + + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(Mockito.anyLong())).thenReturn(1); + Mockito.when(commandService.verifyIsNeedCreateCommand(any())).thenReturn(true); + WorkflowExecuteResponse responseSuccess = + executorService.executeTask(loginUser, projectCode, processInstanceId, startNodeList, taskDependType); + Assertions.assertEquals(Status.SUCCESS.getCode(), responseSuccess.getCode()); + + } + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java index d87bc39b79..7aa2b881b3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java @@ -40,6 +40,8 @@ public enum CommandType { * 8 pause a process * 9 stop a process * 10 recover waiting thread + * 11 recover serial wait + * 12 start a task node in a process instance */ START_PROCESS(0, "start a new process"), START_CURRENT_TASK_PROCESS(1, "start a new process from current nodes"), @@ -53,6 +55,7 @@ public enum CommandType { STOP(9, "stop a process"), RECOVER_WAITING_THREAD(10, "recover waiting thread"), RECOVER_SERIAL_WAIT(11, "recover serial wait"), + EXECUTE_TASK(12, "start a task node in a process instance"), ; CommandType(int code, String descp) { diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java index fb5a564cf6..7b3df72e9d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/graph/DAG.java @@ -498,6 +498,15 @@ public class DAG { return new AbstractMap.SimpleEntry<>(notZeroIndegreeNodeMap.size() == 0, topoResultList); } + /** + * Get all the nodes that are in the graph + * + * @return all nodes in the graph + */ + public Set getAllNodesList() { + return nodesMap.keySet(); + } + @Override public String toString() { return "DAG{" diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java index 4c16a56322..11b53bbeeb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/TaskInstanceDao.java @@ -66,6 +66,14 @@ public interface TaskInstanceDao { */ List findValidTaskListByProcessId(Integer processInstanceId, int testFlag); + /** + * Query list of task instance by process instance id and task code + * @param processInstanceId processInstanceId + * @param taskCode task code + * @return list of valid task instance + */ + TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode); + /** * find previous task list by work process id * @param processInstanceId processInstanceId diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index cb25ff6886..a68a0953fd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -147,6 +147,11 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag); } + @Override + public TaskInstance findTaskByInstanceIdAndCode(Integer processInstanceId, Long taskCode) { + return taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId, taskCode); + } + @Override public List findPreviousTaskListByWorkProcessId(Integer processInstanceId) { ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index d1e12966f8..827e618ec2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -881,6 +881,7 @@ public class WorkflowExecuteRunnable implements Callable { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } } + clearDataIfExecuteTask(); } else { logger.info("The current workflowInstance is a newly running workflowInstance"); } @@ -2045,6 +2046,77 @@ public class WorkflowExecuteRunnable implements Callable { } } + /** + * clear related data if command of process instance is EXECUTE_TASK + * 1. find all task code from sub dag (only contains related task) + * 2. set the flag of tasks to Flag.NO + * 3. clear varPool data from re-execute task instance in process instance + * 4. remove related task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap + * + * @return task instance + */ + protected void clearDataIfExecuteTask() { + // only clear data if command is EXECUTE_TASK + if (!processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) { + return; + } + + // Records the key of varPool data to be removed + Set taskCodesString = dag.getAllNodesList(); + + List removeTaskInstances = new ArrayList<>(); + + for (String taskCodeString : taskCodesString) { + long taskCode = Long.parseLong(taskCodeString); + TaskInstance taskInstance; + if (validTaskMap.containsKey(taskCode)) { + taskInstance = taskInstanceMap.get(validTaskMap.get(taskCode)); + } else { + taskInstance = taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskCode); + } + if (taskInstance == null) { + continue; + } + removeTaskInstances.add(taskInstance); + } + + for (TaskInstance taskInstance : removeTaskInstances) { + taskInstance.setFlag(Flag.NO); + taskInstanceDao.updateTaskInstance(taskInstance); + } + + Set removeSet = new HashSet<>(); + for (TaskInstance taskInstance : removeTaskInstances) { + String taskVarPool = taskInstance.getVarPool(); + if (StringUtils.isNotEmpty(taskVarPool)) { + List properties = JSONUtils.toList(taskVarPool, Property.class); + List keys = properties.stream() + .filter(property -> property.getDirect().equals(Direct.OUT)) + .map(property -> String.format("%s_%s", property.getProp(), property.getType())) + .collect(Collectors.toList()); + removeSet.addAll(keys); + } + } + + // remove varPool data and update process instance + // TODO: we can remove this snippet if : we get varPool from pre taskInstance instead of process instance when + // task can not get pre task from incomplete dag + List processProperties = JSONUtils.toList(processInstance.getVarPool(), Property.class); + processProperties = processProperties.stream() + .filter(property -> !(property.getDirect().equals(Direct.IN) + && removeSet.contains(String.format("%s_%s", property.getProp(), property.getType())))) + .collect(Collectors.toList()); + + processInstance.setVarPool(JSONUtils.toJsonString(processProperties)); + processInstanceDao.updateProcessInstance(processInstance); + + // remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap + taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode()))); + completeTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey()))); + validTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey()))); + errorTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey()))); + } + private enum WorkflowRunnableStatus { CREATED, INITIALIZE_DAG, INITIALIZE_QUEUE, STARTED, ; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index f76109325c..13b6c48004 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -22,9 +22,11 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING; import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -39,6 +41,7 @@ import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.expand.CuringParamsService; +import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.ProcessService; import java.lang.reflect.Field; @@ -258,6 +261,61 @@ public class WorkflowExecuteRunnableTest { } } + @Test + public void testClearDataIfExecuteTask() throws NoSuchFieldException, IllegalAccessException { + TaskInstance taskInstance1 = new TaskInstance(); + taskInstance1.setId(1); + taskInstance1.setTaskCode(1); + + TaskInstance taskInstance2 = new TaskInstance(); + taskInstance2.setId(2); + taskInstance2.setTaskCode(2); + + Map taskInstanceMap = new ConcurrentHashMap<>(); + taskInstanceMap.put(taskInstance1.getId(), taskInstance1); + taskInstanceMap.put(taskInstance2.getId(), taskInstance2); + + Map completeTaskList = new ConcurrentHashMap<>(); + completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId()); + completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); + + Class masterExecThreadClass = WorkflowExecuteRunnable.class; + + Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap"); + completeTaskMapField.setAccessible(true); + completeTaskMapField.set(workflowExecuteThread, completeTaskList); + + Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap"); + taskInstanceMapField.setAccessible(true); + taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); + + Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK); + Mockito.when(processInstance.getId()).thenReturn(123); + + DAG dag = Mockito.mock(DAG.class); + Set taskCodesString = new HashSet<>(); + taskCodesString.add("1"); + taskCodesString.add("2"); + Mockito.when(dag.getAllNodesList()).thenReturn(taskCodesString); + Mockito.when(dag.containsNode("1")).thenReturn(true); + Mockito.when(dag.containsNode("2")).thenReturn(false); + + Field dagField = masterExecThreadClass.getDeclaredField("dag"); + dagField.setAccessible(true); + dagField.set(workflowExecuteThread, dag); + + Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance1.getTaskCode())) + .thenReturn(taskInstance1); + Mockito.when(taskInstanceDao.findTaskByInstanceIdAndCode(processInstance.getId(), taskInstance2.getTaskCode())) + .thenReturn(null); + + workflowExecuteThread.clearDataIfExecuteTask(); + + Assertions.assertEquals(1, taskInstanceMap.size()); + Assertions.assertEquals(1, completeTaskList.size()); + + } + private List zeroSchedulerList() { return Collections.emptyList(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 8c171185d2..a79381e47c 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -889,6 +889,12 @@ public class ProcessServiceImpl implements ProcessService { cmdParam.remove(CommandKeyConstants.CMD_PARAM_RECOVERY_START_NODE_STRING); processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); } + // delete the StartNodeList from command parameter if last execution is only execute specified tasks + if (processInstance.getCommandType().equals(CommandType.EXECUTE_TASK)) { + cmdParam.remove(CommandKeyConstants.CMD_PARAM_START_NODES); + processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); + processInstance.setTaskDependType(command.getTaskDependType()); + } // delete all the valid tasks when repeat running List validTaskList = taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), @@ -905,6 +911,11 @@ public class ProcessServiceImpl implements ProcessService { break; case SCHEDULER: break; + case EXECUTE_TASK: + processInstance.setRunTimes(runTime + 1); + processInstance.setTaskDependType(command.getTaskDependType()); + processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam)); + break; default: break; } @@ -2035,8 +2046,13 @@ public class ProcessServiceImpl implements ProcessService { // and update the origin one if exist int updateResult = 0; int insertResult = 0; - if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs)) { - insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); + + // only insert new task definitions if they not in updateTaskDefinitionLogs + List newInsertTaskDefinitionLogs = newTaskDefinitionLogs.stream() + .filter(taskDefinitionLog -> !updateTaskDefinitionLogs.contains(taskDefinitionLog)) + .collect(Collectors.toList()); + if (CollectionUtils.isNotEmpty(newInsertTaskDefinitionLogs)) { + insertResult = taskDefinitionLogMapper.batchInsert(newInsertTaskDefinitionLogs); } if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) { insertResult += taskDefinitionLogMapper.batchInsert(updateTaskDefinitionLogs); diff --git a/dolphinscheduler-ui/src/common/common.ts b/dolphinscheduler-ui/src/common/common.ts index 109c57965d..6858f3fa62 100644 --- a/dolphinscheduler-ui/src/common/common.ts +++ b/dolphinscheduler-ui/src/common/common.ts @@ -121,6 +121,10 @@ export const runningType = (t: any) => [ { desc: `${t('project.workflow.recover_serial_wait')}`, code: 'RECOVER_SERIAL_WAIT' + }, + { + desc: `${t('project.workflow.execute_task')}`, + code: 'EXECUTE_TASK' } ] diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index e671415ad7..1f9f49d50c 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -103,6 +103,9 @@ export default { backward_execution: 'Backward execution', forward_execution: 'Forward execution', current_node_execution: 'Execute only the current node', + backward_execution_task: 'Run backwards', + forward_execution_task: 'Run forwards', + current_node_execution_task: 'Run', notification_strategy: 'Notification Strategy', workflow_priority: 'Workflow Priority', worker_group: 'Worker Group', @@ -158,6 +161,7 @@ export default { pause: 'Pause', recovery_waiting_thread: 'Recovery waiting thread', recover_serial_wait: 'Recover serial wait', + execute_task: 'Execute the specified task', recovery_suspend: 'Recovery Suspend', recovery_failed: 'Recovery Failed', gantt: 'Gantt', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 2c53550a94..ce8589630e 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -105,6 +105,9 @@ export default { backward_execution: '向后执行', forward_execution: '向前执行', current_node_execution: '仅执行当前节点', + backward_execution_task: '向后运行', + forward_execution_task: '向前运行', + current_node_execution_task: '运行', notification_strategy: '通知策略', workflow_priority: '流程优先级', worker_group: 'Worker分组', @@ -160,6 +163,7 @@ export default { pause: '暂停', recovery_waiting_thread: '恢复等待线程', recover_serial_wait: '串行恢复', + execute_task: '执行指定任务', recovery_suspend: '恢复运行', recovery_failed: '重跑失败任务', gantt: '甘特图', diff --git a/dolphinscheduler-ui/src/service/modules/executors/index.ts b/dolphinscheduler-ui/src/service/modules/executors/index.ts index d7356407de..d304e6385f 100644 --- a/dolphinscheduler-ui/src/service/modules/executors/index.ts +++ b/dolphinscheduler-ui/src/service/modules/executors/index.ts @@ -18,6 +18,7 @@ import { axios } from '@/service/service' import { ExecuteReq, + ExecuteTaskReq, ProjectCodeReq, ProcessDefinitionCodeReq, ProcessInstanceReq @@ -31,6 +32,14 @@ export function execute(data: ExecuteReq, code: number): any { }) } +export function executeTask(data: ExecuteTaskReq, code: number): any { + return axios({ + url: `/projects/${code}/executors/execute-task`, + method: 'post', + data + }) +} + export function startCheckProcessDefinition( data: ProcessDefinitionCodeReq, code: ProjectCodeReq diff --git a/dolphinscheduler-ui/src/service/modules/executors/types.ts b/dolphinscheduler-ui/src/service/modules/executors/types.ts index 39f17096b2..70ecd97870 100644 --- a/dolphinscheduler-ui/src/service/modules/executors/types.ts +++ b/dolphinscheduler-ui/src/service/modules/executors/types.ts @@ -41,6 +41,12 @@ interface ExecuteReq { processInstanceId: number } +interface ExecuteTaskReq { + processInstanceId: number + startNodeList: number + taskDependType: string +} + interface ProjectCodeReq { projectCode: number } @@ -69,6 +75,7 @@ interface ProcessInstanceReq extends ProcessDefinitionCodeReq { export { ExecuteReq, + ExecuteTaskReq, ProjectCodeReq, ProcessDefinitionCodeReq, ProcessInstanceReq diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx index 7347a6b3ea..87c5db3b28 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag-context-menu.tsx @@ -30,6 +30,10 @@ const props = { type: Boolean as PropType, default: false }, + executeTaskDisplay: { + type: Boolean as PropType, + default: false + }, menuDisplay: { type: Boolean as PropType, default: false @@ -59,7 +63,7 @@ const props = { export default defineComponent({ name: 'dag-context-menu', props, - emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks'], + emits: ['hide', 'start', 'edit', 'viewLog', 'copyTask', 'removeTasks', 'executeTask'], setup(props, ctx) { const graph = inject('graph', ref()) const route = useRoute() @@ -83,6 +87,22 @@ export default defineComponent({ } } + const handleExecuteTaskOnly = () => { + ctx.emit('executeTask', Number(props.cell?.id), 'TASK_ONLY') + } + + const handleExecuteTaskPOST = () => { + if (props.taskInstance) { + ctx.emit('executeTask', Number(props.cell?.id), 'TASK_POST') + } + } + + const handleExecuteTaskPRE = () => { + if (props.taskInstance) { + ctx.emit('executeTask', Number(props.cell?.id), 'TASK_PRE') + } + } + const handleCopy = () => { const genNums = 1 const type = props.cell?.data.taskType @@ -115,7 +135,10 @@ export default defineComponent({ handleEdit, handleCopy, handleDelete, - handleViewLog + handleViewLog, + handleExecuteTaskOnly, + handleExecuteTaskPOST, + handleExecuteTaskPRE } }, render() { @@ -158,12 +181,34 @@ export default defineComponent({ )} {this.taskInstance && ( - - {t('project.node.view_log')} - + + {t('project.node.view_log')} + + )} + {this.executeTaskDisplay && ( + <> + + {t('project.workflow.current_node_execution_task')} + + + {t('project.workflow.backward_execution_task')} + + + {t('project.workflow.forward_execution_task')} + + )} ) diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx index e0a6f62c66..98e388220b 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/index.tsx @@ -55,6 +55,7 @@ import { queryLog } from '@/service/modules/log' import { useAsyncState } from '@vueuse/core' import utils from '@/utils' import { useUISettingStore } from '@/store/ui-setting/ui-setting' +import { executeTask } from '@/service/modules/executors' const props = { // If this prop is passed, it means from definition detail @@ -135,6 +136,13 @@ export default defineComponent({ } }) + // execute task buttons in the dag node menu + const executeTaskDisplay = computed(() => { + return ( + route.name === 'workflow-instance-detail' + ) + }) + // other button in the dag node menu const menuDisplay = computed(() => { if (props.instance) { @@ -283,6 +291,20 @@ export default defineComponent({ getLogs(logTimer) } + const handleExecuteTask = (startNodeList: number, taskDependType: string) => { + executeTask({ + processInstanceId: Number(route.params.id), + startNodeList: startNodeList, + taskDependType: taskDependType, + }, + props.projectCode).then(() => { + window.$message.success(t('project.workflow.success')) + setTimeout(() => { + window.location.reload(); + }, 1000); + }) + } + const downloadLogs = () => { utils.downloadFile('log/download-log', { taskInstanceId: nodeVariables.logTaskId @@ -382,6 +404,7 @@ export default defineComponent({ /> {!!props.definition && ( { window.$message.success(t('project.dag.success')) - router.push({ path: `/projects/${projectCode}/workflow/instances` }) + window.location.reload() }) }