From a0eb45b9e4e3beb6a3ff77e72d3a1ca806a956bd Mon Sep 17 00:00:00 2001 From: insist777 <84278047+insist777@users.noreply.github.com> Date: Mon, 28 Nov 2022 12:58:46 +0800 Subject: [PATCH] [Feature][API] New restful API for workflowInstance (#12990) --- .../WorkflowInstanceV2Controller.java | 154 +++++++++++++++++ .../WorkflowInstanceQueryRequest.java | 65 +++++++ .../api/service/ExecutorService.java | 10 ++ .../api/service/ProcessInstanceService.java | 31 ++++ .../api/service/impl/ExecutorServiceImpl.java | 22 +++ .../impl/ProcessInstanceServiceImpl.java | 162 ++++++++++++++---- .../WorkflowInstanceV2ControllerTest.java | 121 +++++++++++++ .../dao/mapper/ProcessInstanceMapper.java | 29 +++- .../dao/mapper/ProcessInstanceMapper.xml | 47 ++++- 9 files changed, 594 insertions(+), 47 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2Controller.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowInstanceQueryRequest.java create mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2ControllerTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2Controller.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2Controller.java new file mode 100644 index 0000000000..09d39e79ab --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2Controller.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.api.enums.Status.*; + +import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.service.ExecutorService; +import org.apache.dolphinscheduler.api.service.ProcessInstanceService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.Parameters; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.tags.Tag; + +/** + * workflow instance controller + */ +@Tag(name = "WORKFLOW_INSTANCE_TAG_V2") +@RestController +@RequestMapping("/v2/workflow-instances") +public class WorkflowInstanceV2Controller extends BaseController { + + @Autowired + private ProcessInstanceService processInstanceService; + + @Autowired + private ExecutorService execService; + + /** + * query workflow instance list paging + * @param loginUser login user + * @param workflowInstanceQueryRequest workflowInstanceQueryRequest + * @return workflow instance list + */ + @Operation(summary = "queryWorkflowInstanceListPaging", description = "QUERY_PROCESS_INSTANCE_LIST_NOTES") + @GetMapping(consumes = {"application/json"}) + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryWorkflowInstanceListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestBody WorkflowInstanceQueryRequest workflowInstanceQueryRequest) { + Result result = + checkPageParams(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize()); + if (!result.checkResult()) { + return result; + } + + result = processInstanceService.queryProcessInstanceList(loginUser, workflowInstanceQueryRequest); + return result; + } + + /** + * Query workflowInstance by id + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @return Result result object query + */ + @Operation(summary = "queryWorkflowInstanceById", description = "QUERY_WORKFLOW_INSTANCE_BY_ID") + @Parameters({ + @Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", schema = @Schema(implementation = Integer.class, example = "123456", required = true)) + }) + @GetMapping(value = "/{workflowInstanceId}") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_PROCESS_INSTANCE_BY_ID_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryWorkflowInstanceById(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("workflowInstanceId") Integer workflowInstanceId) { + Map result = processInstanceService.queryProcessInstanceById(loginUser, workflowInstanceId); + return returnDataList(result); + } + + /** + * Delete workflowInstance by id + * + * @param loginUser login user + * @param workflowInstanceId workflow instance code + * @return Result result object delete + */ + @Operation(summary = "delete", description = "DELETE_WORKFLOWS_INSTANCE_NOTES") + @Parameters({ + @Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", schema = @Schema(implementation = Integer.class, example = "123456", required = true)) + }) + @DeleteMapping(value = "/{workflowInstanceId}") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result deleteWorkflowInstance(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("workflowInstanceId") Integer workflowInstanceId) { + processInstanceService.deleteProcessInstanceById(loginUser, workflowInstanceId); + return Result.success(); + } + + /** + * do action to workflow instance: pause, stop, repeat, recover from pause, recover from stop + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @param executeType execute type + * @return execute result code + */ + @Operation(summary = "execute", description = "EXECUTE_ACTION_TO_WORKFLOW_INSTANCE_NOTES") + @Parameters({ + @Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")), + @Parameter(name = "executeType", description = "EXECUTE_TYPE", required = true, schema = @Schema(implementation = ExecuteType.class)) + }) + @PostMapping(value = "/{workflowInstanceId}/execute/{executeType}") + @ResponseStatus(HttpStatus.OK) + @ApiException(EXECUTE_PROCESS_INSTANCE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result execute(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @PathVariable("workflowInstanceId") Integer workflowInstanceId, + @PathVariable("executeType") ExecuteType executeType) { + Map result = execService.execute(loginUser, workflowInstanceId, executeType); + return returnDataList(result); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowInstanceQueryRequest.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowInstanceQueryRequest.java new file mode 100644 index 0000000000..c909919b55 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowInstanceQueryRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.dto.workflowInstance; + +import org.apache.dolphinscheduler.api.dto.PageQueryDto; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; + +import lombok.Data; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import io.swagger.v3.oas.annotations.media.Schema; + +/** + * workflow instance request + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +@Data +public class WorkflowInstanceQueryRequest extends PageQueryDto { + + @Schema(name = "projectName", example = "PROJECT-NAME") + String projectName; + + @Schema(name = "workflowName", example = "WORKFLOW-NAME") + String workflowName; + + @Schema(name = "host", example = "HOST") + String host; + + @Schema(name = "startDate", example = "START-TIME") + String startTime; + + @Schema(name = "endDate", example = "END-DATE") + String endTime; + + @Schema(name = "state", example = "STATE") + Integer state; + + public ProcessInstance convert2ProcessInstance() { + ProcessInstance processInstance = new ProcessInstance(); + if (this.workflowName != null) { + processInstance.setName(this.workflowName); + } + if (this.host != null) { + processInstance.setHost(this.host); + } + return processInstance; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 6f19a29caf..4c6b930f54 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -92,6 +92,16 @@ public interface ExecutorService { */ Map execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType); + /** + * do action to process instance:pause, stop, repeat, recover from pause, recover from stop + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @param executeType execute type + * @return execute result code + */ + Map execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType); + /** * check if sub processes are offline before starting process definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index d84ca5d3cb..7ff2aa9091 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -55,6 +56,16 @@ public interface ProcessInstanceService { long projectCode, Integer processId); + /** + * query process instance by id + * + * @param loginUser login user + * @param processId process instance id + * @return process instance detail + */ + Map queryProcessInstanceById(User loginUser, + Integer processId); + /** * paging query process instance list, filtering according to project, process definition, time range, keyword, process status * @@ -84,6 +95,16 @@ public interface ProcessInstanceService { Integer pageNo, Integer pageSize); + /** + * paging query process instance list, filtering according to project, process definition, time range, keyword, process status + * + * @param loginUser login user + * @param workflowInstanceQueryRequest workflowInstanceQueryRequest + * @return process instance list + */ + Result queryProcessInstanceList(User loginUser, + WorkflowInstanceQueryRequest workflowInstanceQueryRequest); + /** * query task list by process instance id * @@ -163,6 +184,16 @@ public interface ProcessInstanceService { long projectCode, Integer processInstanceId); + /** + * delete process instance by id, at the same time,delete task instance and their mapping relation data + * + * @param loginUser login user + * @param workflowInstanceId work instance id + * @return delete result code + */ + Map deleteProcessInstanceById(User loginUser, + Integer workflowInstanceId); + /** * view process instance variables * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index bc885f1b82..f344a3a226 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -124,6 +124,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private ProcessDefinitionMapper processDefinitionMapper; + @Autowired + ProcessDefinitionMapper processDefineMapper; + @Autowired private MonitorService monitorService; @@ -465,6 +468,25 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } + /** + * do action to workflow instance:pause, stop, repeat, recover from pause, recover from stop,rerun failed task + + + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @param executeType execute type + * @return execute result code + */ + @Override + public Map execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType) { + ProcessInstance processInstance = processInstanceMapper.selectById(workflowInstanceId); + ProcessDefinition processDefinition = + processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + + return execute(loginUser, processDefinition.getProjectCode(), workflowInstanceId, executeType); + } + @Override public Map forceStartTaskInstance(User loginUser, int queueId) { Map result = new HashMap<>(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 4c3fceefe3..21b50ebec7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -17,9 +17,7 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST; import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR; import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; @@ -33,6 +31,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ExecutorService; @@ -225,9 +224,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * query process instance by id * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param processId process instance id + * @param processId process instance id * @return process instance detail */ @Override @@ -263,20 +262,36 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + /** + * query workflow instance by id + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @return workflow instance detail + */ + @Override + public Map queryProcessInstanceById(User loginUser, Integer workflowInstanceId) { + ProcessInstance processInstance = processInstanceMapper.selectById(workflowInstanceId); + ProcessDefinition processDefinition = + processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + + return queryProcessInstanceById(loginUser, processDefinition.getProjectCode(), workflowInstanceId); + } + /** * paging query process instance list, filtering according to project, process definition, time range, keyword, process status * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processDefineCode process definition code - * @param pageNo page number - * @param pageSize page size - * @param searchVal search value - * @param stateType state type - * @param host host - * @param startDate start time - * @param endDate end time - * @param otherParamsJson otherParamsJson handle other params + * @param pageNo page number + * @param pageSize page size + * @param searchVal search value + * @param stateType state type + * @param host host + * @param startDate start time + * @param endDate end time + * @param otherParamsJson otherParamsJson handle other params * @return process instance list */ @Override @@ -344,12 +359,68 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + /** + * paging query process instance list, filtering according to project, process definition, time range, keyword, process status + * + * @param loginUser login user + * @param workflowInstanceQueryRequest workflowInstanceQueryRequest + * @return process instance list + */ + @Override + public Result queryProcessInstanceList(User loginUser, WorkflowInstanceQueryRequest workflowInstanceQueryRequest) { + Result result = new Result(); + ProcessInstance processInstance = workflowInstanceQueryRequest.convert2ProcessInstance(); + String projectName = workflowInstanceQueryRequest.getProjectName(); + if (!StringUtils.isBlank(projectName)) { + Project project = projectMapper.queryByName(projectName); + projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION); + ProcessDefinition processDefinition = + processDefineMapper.queryByDefineName(project.getCode(), processInstance.getName()); + processInstance.setProcessDefinitionCode(processDefinition.getCode()); + } + + Page page = + new Page<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize()); + PageInfo pageInfo = + new PageInfo<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize()); + + IPage processInstanceList = processInstanceMapper.queryProcessInstanceListV2Paging(page, + processInstance.getProcessDefinitionCode(), processInstance.getName(), + workflowInstanceQueryRequest.getStartTime(), workflowInstanceQueryRequest.getEndTime(), + workflowInstanceQueryRequest.getState(), processInstance.getHost()); + + List processInstances = processInstanceList.getRecords(); + List userIds = Collections.emptyList(); + if (CollectionUtils.isNotEmpty(processInstances)) { + userIds = processInstances.stream().map(ProcessInstance::getExecutorId).collect(Collectors.toList()); + } + List users = usersService.queryUser(userIds); + Map idToUserMap = Collections.emptyMap(); + if (CollectionUtils.isNotEmpty(users)) { + idToUserMap = users.stream().collect(Collectors.toMap(User::getId, Function.identity())); + } + + for (ProcessInstance Instance : processInstances) { + Instance.setDuration(WorkflowUtils.getWorkflowInstanceDuration(Instance)); + User executor = idToUserMap.get(Instance.getExecutorId()); + if (null != executor) { + Instance.setExecutorName(executor.getUserName()); + } + } + + pageInfo.setTotal((int) processInstanceList.getTotal()); + pageInfo.setTotalList(processInstances); + result.setData(pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + /** * query task list by process instance id * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param processId process instance id + * @param processId process instance id * @return task list for the process instance * @throws IOException io exception */ @@ -437,9 +508,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * query sub process instance detail info by task id * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param taskId task id + * @param taskId task id * @return sub process instance detail */ @Override @@ -492,17 +563,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * update process instance * - * @param loginUser login user - * @param projectCode project code - * @param taskRelationJson process task relation json + * @param loginUser login user + * @param projectCode project code + * @param taskRelationJson process task relation json * @param taskDefinitionJson taskDefinitionJson - * @param processInstanceId process instance id - * @param scheduleTime schedule time - * @param syncDefine sync define - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode + * @param processInstanceId process instance id + * @param scheduleTime schedule time + * @param syncDefine sync define + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode * @return update result code */ @Transactional @@ -659,9 +730,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * query parent process instance detail info by sub process instance id * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param subId sub process id + * @param subId sub process id * @return parent instance detail */ @Override @@ -701,8 +772,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * delete process instance by id, at the same time,delete task instance and their mapping relation data * - * @param loginUser login user - * @param projectCode project code + * @param loginUser login user + * @param projectCode project code * @param processInstanceId process instance id * @return delete result code */ @@ -766,10 +837,27 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + /** + * delete workflow instance by id, at the same time,delete task instance and their mapping relation data + * + * @param loginUser login user + * @param workflowInstanceId workflow instance id + * @return delete result code + */ + @Override + public Map deleteProcessInstanceById(User loginUser, Integer workflowInstanceId) { + ProcessInstance processInstance = processService.findProcessInstanceDetailById(workflowInstanceId) + .orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId)); + ProcessDefinition processDefinition = + processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + + return deleteProcessInstanceById(loginUser, processDefinition.getProjectCode(), workflowInstanceId); + } + /** * view process instance variables * - * @param projectCode project code + * @param projectCode project code * @param processInstanceId process instance id * @return variables data */ @@ -863,7 +951,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * encapsulation gantt structure * - * @param projectCode project code + * @param projectCode project code * @param processInstanceId process instance id * @return gantt tree data * @throws Exception exception when json parse @@ -938,7 +1026,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce * query process instance by processDefinitionCode and stateArray * * @param processDefinitionCode processDefinitionCode - * @param states states array + * @param states states array * @return process instance list */ @Override @@ -950,7 +1038,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce * query process instance by processDefinitionCode * * @param processDefinitionCode processDefinitionCode - * @param size size + * @param size size * @return process instance list */ @Override diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2ControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2ControllerTest.java new file mode 100644 index 0000000000..be73b31ff5 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2ControllerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.controller; + +import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; + +import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; +import org.apache.dolphinscheduler.api.enums.ExecuteType; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ExecutorService; +import org.apache.dolphinscheduler.api.service.ProcessInstanceService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; + +public class WorkflowInstanceV2ControllerTest extends AbstractControllerTest { + + @InjectMocks + private WorkflowInstanceV2Controller workflowInstanceV2Controller; + + @Mock + private ProcessInstanceService processInstanceService; + + @Mock + private ExecutorService execService; + + @Test + public void testQueryWorkFlowInstanceListPaging() { + User loginUser = getLoginUser(); + + WorkflowInstanceQueryRequest workflowInstanceQueryRequest = new WorkflowInstanceQueryRequest(); + workflowInstanceQueryRequest.setProjectName("test"); + workflowInstanceQueryRequest.setWorkflowName("shell"); + workflowInstanceQueryRequest.setPageNo(1); + workflowInstanceQueryRequest.setPageSize(10); + + Result result = new Result(); + PageInfo pageInfo = + new PageInfo<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize()); + pageInfo.setTotalList(Collections.singletonList(new ProcessInstance())); + result.setData(pageInfo); + putMsg(result, Status.SUCCESS); + + Mockito.when(processInstanceService.queryProcessInstanceList(any(), + any(WorkflowInstanceQueryRequest.class))).thenReturn(result); + + Result result1 = + workflowInstanceV2Controller.queryWorkflowInstanceListPaging(loginUser, workflowInstanceQueryRequest); + Assertions.assertTrue(result1.isSuccess()); + } + + @Test + public void testQueryWorkflowInstanceById() { + User loginUser = getLoginUser(); + + Map result = new HashMap<>(); + result.put(DATA_LIST, new ProcessInstance()); + putMsg(result, Status.SUCCESS); + + Mockito.when(processInstanceService.queryProcessInstanceById(any(), eq(1))).thenReturn(result); + Result result1 = workflowInstanceV2Controller.queryWorkflowInstanceById(loginUser, 1); + Assertions.assertTrue(result1.isSuccess()); + } + + @Test + public void testDeleteWorkflowInstanceById() { + User loginUser = getLoginUser(); + + Mockito.when(processInstanceService.deleteProcessInstanceById(any(), eq(1))).thenReturn(null); + Result result = workflowInstanceV2Controller.deleteWorkflowInstance(loginUser, 1); + Assertions.assertTrue(result.isSuccess()); + } + + @Test + public void testExecuteWorkflowInstance() { + User loginUser = getLoginUser(); + + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS); + + Mockito.when(execService.execute(any(), eq(1), any(ExecuteType.class))).thenReturn(result); + + Result result1 = workflowInstanceV2Controller.execute(loginUser, 1, ExecuteType.STOP); + Assertions.assertTrue(result1.isSuccess()); + } + + private User getLoginUser() { + User user = new User(); + user.setId(1); + user.setUserName("admin"); + return user; + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index eb1758c949..c362eb8bf8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -55,6 +55,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance host by stateArray + * * @param stateArray * @return */ @@ -228,10 +229,10 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query top n process instance order by running duration * - * @param size size - * @param startTime start time - * @param startTime end time - * @param status process instance status + * @param size size + * @param startTime start time + * @param startTime end time + * @param status process instance status * @param projectCode project code * @return ProcessInstance list */ @@ -266,4 +267,24 @@ public interface ProcessInstanceMapper extends BaseMapper { ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode, @Param("state") int state, @Param("id") int id); + + /** + * Filter process instance + * + * @param page page + * @param processDefinitionCode processDefinitionCode + * @param name name + * @param host host + * @param startTime startTime + * @param endTime endTime + * @return process instance IPage + */ + IPage queryProcessInstanceListV2Paging(Page page, + @Param("processDefinitionCode") Long processDefinitionCode, + @Param("name") String name, + @Param("startTime") String startTime, + @Param("endTime") String endTime, + @Param("state") Integer state, + @Param("host") String host); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index d5a9136b40..ad907b7958 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -19,7 +19,8 @@ - id, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, + id + , name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, @@ -110,7 +111,8 @@ + - select from t_ds_process_instance where process_definition_code=#{processDefinitionCode} - and process_definition_version = #{processDefinitionVersion} + and process_definition_version = #{processDefinitionVersion} and state in @@ -274,6 +278,36 @@ and id ]]> #{id} order by id asc limit 1 + update t_ds_process_instance set global_params = #{globalParams} @@ -282,6 +316,7 @@ update t_ds_process_instance set next_process_instance_id = #{thisInstanceId} - where id = #{runningInstanceId} and next_process_instance_id=0 + where id = #{runningInstanceId} + and next_process_instance_id = 0