Browse Source

[Feature][API] New restful API for workflowInstance (#12990)

3.2.0-release
insist777 2 years ago committed by GitHub
parent
commit
a0eb45b9e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 154
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2Controller.java
  2. 65
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowInstanceQueryRequest.java
  3. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  4. 31
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  5. 22
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  6. 162
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  7. 121
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2ControllerTest.java
  8. 29
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  9. 47
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

154
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2Controller.java

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.*;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* workflow instance controller
*/
@Tag(name = "WORKFLOW_INSTANCE_TAG_V2")
@RestController
@RequestMapping("/v2/workflow-instances")
public class WorkflowInstanceV2Controller extends BaseController {
@Autowired
private ProcessInstanceService processInstanceService;
@Autowired
private ExecutorService execService;
/**
* query workflow instance list paging
* @param loginUser login user
* @param workflowInstanceQueryRequest workflowInstanceQueryRequest
* @return workflow instance list
*/
@Operation(summary = "queryWorkflowInstanceListPaging", description = "QUERY_PROCESS_INSTANCE_LIST_NOTES")
@GetMapping(consumes = {"application/json"})
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryWorkflowInstanceListPaging(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody WorkflowInstanceQueryRequest workflowInstanceQueryRequest) {
Result result =
checkPageParams(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize());
if (!result.checkResult()) {
return result;
}
result = processInstanceService.queryProcessInstanceList(loginUser, workflowInstanceQueryRequest);
return result;
}
/**
* Query workflowInstance by id
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @return Result result object query
*/
@Operation(summary = "queryWorkflowInstanceById", description = "QUERY_WORKFLOW_INSTANCE_BY_ID")
@Parameters({
@Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", schema = @Schema(implementation = Integer.class, example = "123456", required = true))
})
@GetMapping(value = "/{workflowInstanceId}")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_INSTANCE_BY_ID_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryWorkflowInstanceById(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("workflowInstanceId") Integer workflowInstanceId) {
Map<String, Object> result = processInstanceService.queryProcessInstanceById(loginUser, workflowInstanceId);
return returnDataList(result);
}
/**
* Delete workflowInstance by id
*
* @param loginUser login user
* @param workflowInstanceId workflow instance code
* @return Result result object delete
*/
@Operation(summary = "delete", description = "DELETE_WORKFLOWS_INSTANCE_NOTES")
@Parameters({
@Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", schema = @Schema(implementation = Integer.class, example = "123456", required = true))
})
@DeleteMapping(value = "/{workflowInstanceId}")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteWorkflowInstance(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("workflowInstanceId") Integer workflowInstanceId) {
processInstanceService.deleteProcessInstanceById(loginUser, workflowInstanceId);
return Result.success();
}
/**
* do action to workflow instance: pause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @param executeType execute type
* @return execute result code
*/
@Operation(summary = "execute", description = "EXECUTE_ACTION_TO_WORKFLOW_INSTANCE_NOTES")
@Parameters({
@Parameter(name = "workflowInstanceId", description = "WORKFLOW_INSTANCE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")),
@Parameter(name = "executeType", description = "EXECUTE_TYPE", required = true, schema = @Schema(implementation = ExecuteType.class))
})
@PostMapping(value = "/{workflowInstanceId}/execute/{executeType}")
@ResponseStatus(HttpStatus.OK)
@ApiException(EXECUTE_PROCESS_INSTANCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result execute(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("workflowInstanceId") Integer workflowInstanceId,
@PathVariable("executeType") ExecuteType executeType) {
Map<String, Object> result = execService.execute(loginUser, workflowInstanceId, executeType);
return returnDataList(result);
}
}

65
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/workflowInstance/WorkflowInstanceQueryRequest.java

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.dto.workflowInstance;
import org.apache.dolphinscheduler.api.dto.PageQueryDto;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import io.swagger.v3.oas.annotations.media.Schema;
/**
* workflow instance request
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
public class WorkflowInstanceQueryRequest extends PageQueryDto {
@Schema(name = "projectName", example = "PROJECT-NAME")
String projectName;
@Schema(name = "workflowName", example = "WORKFLOW-NAME")
String workflowName;
@Schema(name = "host", example = "HOST")
String host;
@Schema(name = "startDate", example = "START-TIME")
String startTime;
@Schema(name = "endDate", example = "END-DATE")
String endTime;
@Schema(name = "state", example = "STATE")
Integer state;
public ProcessInstance convert2ProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
if (this.workflowName != null) {
processInstance.setName(this.workflowName);
}
if (this.host != null) {
processInstance.setHost(this.host);
}
return processInstance;
}
}

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -92,6 +92,16 @@ public interface ExecutorService {
*/
Map<String, Object> execute(User loginUser, long projectCode, Integer processInstanceId, ExecuteType executeType);
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @param executeType execute type
* @return execute result code
*/
Map<String, Object> execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType);
/**
* check if sub processes are offline before starting process definition
*

31
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -55,6 +56,16 @@ public interface ProcessInstanceService {
long projectCode,
Integer processId);
/**
* query process instance by id
*
* @param loginUser login user
* @param processId process instance id
* @return process instance detail
*/
Map<String, Object> queryProcessInstanceById(User loginUser,
Integer processId);
/**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status
*
@ -84,6 +95,16 @@ public interface ProcessInstanceService {
Integer pageNo,
Integer pageSize);
/**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status
*
* @param loginUser login user
* @param workflowInstanceQueryRequest workflowInstanceQueryRequest
* @return process instance list
*/
Result queryProcessInstanceList(User loginUser,
WorkflowInstanceQueryRequest workflowInstanceQueryRequest);
/**
* query task list by process instance id
*
@ -163,6 +184,16 @@ public interface ProcessInstanceService {
long projectCode,
Integer processInstanceId);
/**
* delete process instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser login user
* @param workflowInstanceId work instance id
* @return delete result code
*/
Map<String, Object> deleteProcessInstanceById(User loginUser,
Integer workflowInstanceId);
/**
* view process instance variables
*

22
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -124,6 +124,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
ProcessDefinitionMapper processDefineMapper;
@Autowired
private MonitorService monitorService;
@ -465,6 +468,25 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
/**
* do action to workflow instancepause, stop, repeat, recover from pause, recover from stoprerun failed task
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @param executeType execute type
* @return execute result code
*/
@Override
public Map<String, Object> execute(User loginUser, Integer workflowInstanceId, ExecuteType executeType) {
ProcessInstance processInstance = processInstanceMapper.selectById(workflowInstanceId);
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
return execute(loginUser, processDefinition.getProjectCode(), workflowInstanceId, executeType);
}
@Override
public Map<String, Object> forceStartTaskInstance(User loginUser, int queueId) {
Map<String, Object> result = new HashMap<>();

162
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -17,9 +17,7 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_NOT_EXIST;
import static org.apache.dolphinscheduler.api.enums.Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
@ -33,6 +31,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
@ -225,9 +224,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* query process instance by id
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param processId process instance id
* @param processId process instance id
* @return process instance detail
*/
@Override
@ -263,20 +262,36 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
/**
* query workflow instance by id
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @return workflow instance detail
*/
@Override
public Map<String, Object> queryProcessInstanceById(User loginUser, Integer workflowInstanceId) {
ProcessInstance processInstance = processInstanceMapper.selectById(workflowInstanceId);
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
return queryProcessInstanceById(loginUser, processDefinition.getProjectCode(), workflowInstanceId);
}
/**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status
*
* @param loginUser login user
* @param projectCode project code
* @param loginUser login user
* @param projectCode project code
* @param processDefineCode process definition code
* @param pageNo page number
* @param pageSize page size
* @param searchVal search value
* @param stateType state type
* @param host host
* @param startDate start time
* @param endDate end time
* @param otherParamsJson otherParamsJson handle other params
* @param pageNo page number
* @param pageSize page size
* @param searchVal search value
* @param stateType state type
* @param host host
* @param startDate start time
* @param endDate end time
* @param otherParamsJson otherParamsJson handle other params
* @return process instance list
*/
@Override
@ -344,12 +359,68 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
/**
* paging query process instance list, filtering according to project, process definition, time range, keyword, process status
*
* @param loginUser login user
* @param workflowInstanceQueryRequest workflowInstanceQueryRequest
* @return process instance list
*/
@Override
public Result queryProcessInstanceList(User loginUser, WorkflowInstanceQueryRequest workflowInstanceQueryRequest) {
Result result = new Result();
ProcessInstance processInstance = workflowInstanceQueryRequest.convert2ProcessInstance();
String projectName = workflowInstanceQueryRequest.getProjectName();
if (!StringUtils.isBlank(projectName)) {
Project project = projectMapper.queryByName(projectName);
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
ProcessDefinition processDefinition =
processDefineMapper.queryByDefineName(project.getCode(), processInstance.getName());
processInstance.setProcessDefinitionCode(processDefinition.getCode());
}
Page<ProcessInstance> page =
new Page<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize());
PageInfo<ProcessInstance> pageInfo =
new PageInfo<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize());
IPage<ProcessInstance> processInstanceList = processInstanceMapper.queryProcessInstanceListV2Paging(page,
processInstance.getProcessDefinitionCode(), processInstance.getName(),
workflowInstanceQueryRequest.getStartTime(), workflowInstanceQueryRequest.getEndTime(),
workflowInstanceQueryRequest.getState(), processInstance.getHost());
List<ProcessInstance> processInstances = processInstanceList.getRecords();
List<Integer> userIds = Collections.emptyList();
if (CollectionUtils.isNotEmpty(processInstances)) {
userIds = processInstances.stream().map(ProcessInstance::getExecutorId).collect(Collectors.toList());
}
List<User> users = usersService.queryUser(userIds);
Map<Integer, User> idToUserMap = Collections.emptyMap();
if (CollectionUtils.isNotEmpty(users)) {
idToUserMap = users.stream().collect(Collectors.toMap(User::getId, Function.identity()));
}
for (ProcessInstance Instance : processInstances) {
Instance.setDuration(WorkflowUtils.getWorkflowInstanceDuration(Instance));
User executor = idToUserMap.get(Instance.getExecutorId());
if (null != executor) {
Instance.setExecutorName(executor.getUserName());
}
}
pageInfo.setTotal((int) processInstanceList.getTotal());
pageInfo.setTotalList(processInstances);
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query task list by process instance id
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param processId process instance id
* @param processId process instance id
* @return task list for the process instance
* @throws IOException io exception
*/
@ -437,9 +508,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* query sub process instance detail info by task id
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param taskId task id
* @param taskId task id
* @return sub process instance detail
*/
@Override
@ -492,17 +563,17 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* update process instance
*
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
* @param loginUser login user
* @param projectCode project code
* @param taskRelationJson process task relation json
* @param taskDefinitionJson taskDefinitionJson
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param processInstanceId process instance id
* @param scheduleTime schedule time
* @param syncDefine sync define
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @return update result code
*/
@Transactional
@ -659,9 +730,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* query parent process instance detail info by sub process instance id
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param subId sub process id
* @param subId sub process id
* @return parent instance detail
*/
@Override
@ -701,8 +772,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* delete process instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser login user
* @param projectCode project code
* @param loginUser login user
* @param projectCode project code
* @param processInstanceId process instance id
* @return delete result code
*/
@ -766,10 +837,27 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
/**
* delete workflow instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser login user
* @param workflowInstanceId workflow instance id
* @return delete result code
*/
@Override
public Map<String, Object> deleteProcessInstanceById(User loginUser, Integer workflowInstanceId) {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(workflowInstanceId)
.orElseThrow(() -> new ServiceException(PROCESS_INSTANCE_NOT_EXIST, workflowInstanceId));
ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
return deleteProcessInstanceById(loginUser, processDefinition.getProjectCode(), workflowInstanceId);
}
/**
* view process instance variables
*
* @param projectCode project code
* @param projectCode project code
* @param processInstanceId process instance id
* @return variables data
*/
@ -863,7 +951,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* encapsulation gantt structure
*
* @param projectCode project code
* @param projectCode project code
* @param processInstanceId process instance id
* @return gantt tree data
* @throws Exception exception when json parse
@ -938,7 +1026,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
* query process instance by processDefinitionCode and stateArray
*
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @param states states array
* @return process instance list
*/
@Override
@ -950,7 +1038,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
* query process instance by processDefinitionCode
*
* @param processDefinitionCode processDefinitionCode
* @param size size
* @param size size
* @return process instance list
*/
@Override

121
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkflowInstanceV2ControllerTest.java

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest;
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
public class WorkflowInstanceV2ControllerTest extends AbstractControllerTest {
@InjectMocks
private WorkflowInstanceV2Controller workflowInstanceV2Controller;
@Mock
private ProcessInstanceService processInstanceService;
@Mock
private ExecutorService execService;
@Test
public void testQueryWorkFlowInstanceListPaging() {
User loginUser = getLoginUser();
WorkflowInstanceQueryRequest workflowInstanceQueryRequest = new WorkflowInstanceQueryRequest();
workflowInstanceQueryRequest.setProjectName("test");
workflowInstanceQueryRequest.setWorkflowName("shell");
workflowInstanceQueryRequest.setPageNo(1);
workflowInstanceQueryRequest.setPageSize(10);
Result result = new Result();
PageInfo<ProcessInstance> pageInfo =
new PageInfo<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize());
pageInfo.setTotalList(Collections.singletonList(new ProcessInstance()));
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
Mockito.when(processInstanceService.queryProcessInstanceList(any(),
any(WorkflowInstanceQueryRequest.class))).thenReturn(result);
Result result1 =
workflowInstanceV2Controller.queryWorkflowInstanceListPaging(loginUser, workflowInstanceQueryRequest);
Assertions.assertTrue(result1.isSuccess());
}
@Test
public void testQueryWorkflowInstanceById() {
User loginUser = getLoginUser();
Map<String, Object> result = new HashMap<>();
result.put(DATA_LIST, new ProcessInstance());
putMsg(result, Status.SUCCESS);
Mockito.when(processInstanceService.queryProcessInstanceById(any(), eq(1))).thenReturn(result);
Result result1 = workflowInstanceV2Controller.queryWorkflowInstanceById(loginUser, 1);
Assertions.assertTrue(result1.isSuccess());
}
@Test
public void testDeleteWorkflowInstanceById() {
User loginUser = getLoginUser();
Mockito.when(processInstanceService.deleteProcessInstanceById(any(), eq(1))).thenReturn(null);
Result result = workflowInstanceV2Controller.deleteWorkflowInstance(loginUser, 1);
Assertions.assertTrue(result.isSuccess());
}
@Test
public void testExecuteWorkflowInstance() {
User loginUser = getLoginUser();
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(execService.execute(any(), eq(1), any(ExecuteType.class))).thenReturn(result);
Result result1 = workflowInstanceV2Controller.execute(loginUser, 1, ExecuteType.STOP);
Assertions.assertTrue(result1.isSuccess());
}
private User getLoginUser() {
User user = new User();
user.setId(1);
user.setUserName("admin");
return user;
}
}

29
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -55,6 +55,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance host by stateArray
*
* @param stateArray
* @return
*/
@ -228,10 +229,10 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query top n process instance order by running duration
*
* @param size size
* @param startTime start time
* @param startTime end time
* @param status process instance status
* @param size size
* @param startTime start time
* @param startTime end time
* @param status process instance status
* @param projectCode project code
* @return ProcessInstance list
*/
@ -266,4 +267,24 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
ProcessInstance loadNextProcess4Serial(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("state") int state, @Param("id") int id);
/**
* Filter process instance
*
* @param page page
* @param processDefinitionCode processDefinitionCode
* @param name name
* @param host host
* @param startTime startTime
* @param endTime endTime
* @return process instance IPage
*/
IPage<ProcessInstance> queryProcessInstanceListV2Paging(Page<ProcessInstance> page,
@Param("processDefinitionCode") Long processDefinitionCode,
@Param("name") String name,
@Param("startTime") String startTime,
@Param("endTime") String endTime,
@Param("state") Integer state,
@Param("host") String host);
}

47
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -19,7 +19,8 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
<sql id="baseSql">
id, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
id
, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
@ -110,7 +111,8 @@
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run , instance.test_flag ,instance.next_process_instance_id,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run , instance.test_flag
,instance.next_process_instance_id,
restart_time, instance.state_history
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
@ -171,7 +173,8 @@
where worker_group = #{originWorkerGroupName}
</update>
<select id="countInstanceStateByProjectCodes" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
<select id="countInstanceStateByProjectCodes"
resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select t.state, count(0) as count
from t_ds_process_instance t
join t_ds_process_definition d on d.code=t.process_definition_code
@ -247,12 +250,13 @@
</if>
order by id asc
</select>
<select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
and process_definition_version = #{processDefinitionVersion}
and process_definition_version = #{processDefinitionVersion}
<if test="states != null and states.length != 0">
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
@ -274,6 +278,36 @@
and id <![CDATA[ > ]]> #{id}
order by id asc limit 1
</select>
<select id="queryProcessInstanceListV2Paging"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
SELECT
<include refid="baseSql">
<property name="alias" value="instance"/>
</include>
FROM t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
<if test="processDefinitionCode != 0">
and instance.process_definition_code = #{processDefinitionCode}
</if>
<if test="name != null and name != ''">
and instance.name like concat('%', #{name}, '%')
</if>
<if test="startTime != null and startTime != ''">
and instance.start_time <![CDATA[ >= ]]> #{startTime}
</if>
<if test="endTime != null and endTime != ''">
and instance.start_time <![CDATA[ <= ]]> #{endTime}
</if>
<if test="state != null and state != ''">
and instance.state = #{state}
</if>
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
</if>
order by instance.start_time desc,instance.id desc
</select>
<update id="updateGlobalParamsById">
update t_ds_process_instance
set global_params = #{globalParams}
@ -282,6 +316,7 @@
<update id="updateNextProcessIdById">
update t_ds_process_instance
set next_process_instance_id = #{thisInstanceId}
where id = #{runningInstanceId} and next_process_instance_id=0
where id = #{runningInstanceId}
and next_process_instance_id = 0
</update>
</mapper>

Loading…
Cancel
Save