Browse Source

#1310 add executor user to the process instance & task instance pages (#1973)

* add executor name in processDefinitionInstance & taskInstance

* modify processInstanceTest

* modify processInstanceTest

* #1310 add executor user to the process instance & task instance pages

* add processInstanceUT & taskInstanceUT

* add processInstanceUT & taskInstanceUT

* modify processInstanceTest & taskInstanceTest

* Remove duplicate code

* add userServiceUT
pull/2/head
Yelli 5 years ago committed by GitHub
parent
commit
619fb30c50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dockerfile/conf/dolphinscheduler/conf/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  2. 3
      dockerfile/conf/dolphinscheduler/conf/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  4. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java
  5. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  6. 29
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  7. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java
  8. 169
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  9. 154
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  10. 22
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  11. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  12. 51
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  13. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  14. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  15. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  16. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  17. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  18. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  19. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  20. 7
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue
  21. 7
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
  22. 4
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue
  23. 7
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue
  24. 4
      dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/index.vue
  25. 1
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  26. 1
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  27. 2
      pom.xml
  28. 1
      sql/dolphinscheduler-postgre.sql
  29. 1
      sql/dolphinscheduler_mysql.sql
  30. 20
      sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql
  31. 17
      sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

3
dockerfile/conf/dolphinscheduler/conf/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -90,6 +90,9 @@
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
</if>
<if test="executorId != 0">
and instance.executorId = #{executorId}
</if>
order by instance.start_time desc
</select>
<update id="setFailoverByHostAndStateArray">

3
dockerfile/conf/dolphinscheduler/conf/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -124,6 +124,9 @@
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
</if>
<if test="executorId != 0">
and instance.executor_id = #{executorId}
</if>
order by instance.start_time desc
</select>
</mapper>

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -75,6 +75,7 @@ public class ProcessInstanceController extends BaseController{
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type ="String"),
@ApiImplicitParam(name = "executorName", value = "EXECUTOR_NAME", type ="String"),
@ApiImplicitParam(name = "stateType", value = "EXECUTION_STATUS", type ="ExecutionStatus"),
@ApiImplicitParam(name = "host", value = "HOST", type ="String"),
@ApiImplicitParam(name = "startDate", value = "START_DATE", type ="String"),
@ -88,6 +89,7 @@ public class ProcessInstanceController extends BaseController{
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "processDefinitionId", required = false, defaultValue = "0") Integer processDefinitionId,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "executorName", required = false) String executorName,
@RequestParam(value = "stateType", required = false) ExecutionStatus stateType,
@RequestParam(value = "host", required = false) String host,
@RequestParam(value = "startDate", required = false) String startTime,
@ -96,12 +98,12 @@ public class ProcessInstanceController extends BaseController{
@RequestParam("pageSize") Integer pageSize){
try{
logger.info("query all process instance list, login user:{},project name:{}, define id:{}," +
"search value:{},state type:{},host:{},start time:{}, end time:{},page number:{}, page size:{}",
loginUser.getUserName(), projectName, processDefinitionId, searchVal, stateType,host,
"search value:{},executor name:{},state type:{},host:{},start time:{}, end time:{},page number:{}, page size:{}",
loginUser.getUserName(), projectName, processDefinitionId, searchVal, executorName,stateType,host,
startTime, endTime, pageNo, pageSize);
searchVal = ParameterUtils.handleEscapes(searchVal);
Map<String, Object> result = processInstanceService.queryProcessInstanceList(
loginUser, projectName, processDefinitionId, startTime, endTime, searchVal, stateType, host, pageNo, pageSize);
loginUser, projectName, processDefinitionId, startTime, endTime, searchVal, executorName, stateType, host, pageNo, pageSize);
return returnDataListPaging(result);
}catch (Exception e){
logger.error(QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR.getMsg(),e);

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java

@ -69,6 +69,7 @@ public class TaskInstanceController extends BaseController{
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID",required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", type ="String"),
@ApiImplicitParam(name = "taskName", value = "TASK_NAME", type ="String"),
@ApiImplicitParam(name = "executorName", value = "EXECUTOR_NAME", type ="String"),
@ApiImplicitParam(name = "stateType", value = "EXECUTION_STATUS", type ="ExecutionStatus"),
@ApiImplicitParam(name = "host", value = "HOST", type ="String"),
@ApiImplicitParam(name = "startDate", value = "START_DATE", type ="String"),
@ -83,6 +84,7 @@ public class TaskInstanceController extends BaseController{
@RequestParam(value = "processInstanceId", required = false, defaultValue = "0") Integer processInstanceId,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "taskName", required = false) String taskName,
@RequestParam(value = "executorName", required = false) String executorName,
@RequestParam(value = "stateType", required = false) ExecutionStatus stateType,
@RequestParam(value = "host", required = false) String host,
@RequestParam(value = "startDate", required = false) String startTime,
@ -91,11 +93,11 @@ public class TaskInstanceController extends BaseController{
@RequestParam("pageSize") Integer pageSize){
try{
logger.info("query task instance list, project name:{},process instance:{}, search value:{},task name:{}, state type:{}, host:{}, start:{}, end:{}",
projectName, processInstanceId, searchVal, taskName, stateType, host, startTime, endTime);
logger.info("query task instance list, project name:{},process instance:{}, search value:{},task name:{}, executor name: {},state type:{}, host:{}, start:{}, end:{}",
projectName, processInstanceId, searchVal, taskName, executorName, stateType, host, startTime, endTime);
searchVal = ParameterUtils.handleEscapes(searchVal);
Map<String, Object> result = taskInstanceService.queryTaskListPaging(
loginUser, projectName, processInstanceId, taskName, startTime, endTime, searchVal, stateType, host, pageNo, pageSize);
loginUser, projectName, processInstanceId, taskName, executorName, startTime, endTime, searchVal, stateType, host, pageNo, pageSize);
return returnDataListPaging(result);
}catch (Exception e){
logger.error(Status.QUERY_TASK_LIST_PAGING_ERROR.getMsg(),e);

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -95,6 +95,9 @@ public class ProcessInstanceService extends BaseDAGService {
@Autowired
WorkerGroupMapper workerGroupMapper;
@Autowired
UsersService usersService;
/**
* query process instance by id
*
@ -151,7 +154,7 @@ public class ProcessInstanceService extends BaseDAGService {
*/
public Map<String, Object> queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId,
String startDate, String endDate,
String searchVal, ExecutionStatus stateType, String host,
String searchVal, String executorName,ExecutionStatus stateType, String host,
Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>(5);
@ -182,25 +185,31 @@ public class ProcessInstanceService extends BaseDAGService {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate");
return result;
}
Page<ProcessInstance> page = new Page(pageNo, pageSize);
PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<ProcessInstance> processInstanceList =
processInstanceMapper.queryProcessInstanceListPaging(page,
project.getId(), processDefineId, searchVal, statusArray, host, start, end);
project.getId(), processDefineId, searchVal, executorId,statusArray, host, start, end);
List<ProcessInstance> processInstances = processInstanceList.getRecords();
for(ProcessInstance processInstance: processInstances){
processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(),processInstance.getEndTime()));
User executor = usersService.queryUser(processInstance.getExecutorId());
if (null != executor) {
processInstance.setExecutorName(executor.getUserName());
}
}
Set<String> exclusionSet = new HashSet<String>();
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("locations");
exclusionSet.add("connects");
exclusionSet.add("processInstanceJson");
PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
pageInfo.setTotalCount((int) processInstanceList.getTotal());
pageInfo.setLists(CollectionUtils.getListByExclusion(processInstances, exclusionSet));
result.put(Constants.DATA_LIST, pageInfo);

29
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
@ -24,14 +26,11 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,6 +60,12 @@ public class TaskInstanceService extends BaseService {
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
ProcessInstanceService processInstanceService;
@Autowired
UsersService usersService;
/**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
@ -79,8 +84,8 @@ public class TaskInstanceService extends BaseService {
* @return task list page
*/
public Map<String,Object> queryTaskListPaging(User loginUser, String projectName,
Integer processInstanceId, String taskName, String startDate, String endDate,
String searchVal, ExecutionStatus stateType,String host,
Integer processInstanceId, String taskName, String executorName, String startDate,
String endDate, String searchVal, ExecutionStatus stateType,String host,
Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
@ -112,17 +117,23 @@ public class TaskInstanceService extends BaseService {
}
Page<TaskInstance> page = new Page(pageNo, pageSize);
PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
page, project.getId(), processInstanceId, searchVal, taskName, statusArray, host, start, end
page, project.getId(), processInstanceId, searchVal, taskName, executorId, statusArray, host, start, end
);
PageInfo pageInfo = new PageInfo<ProcessInstance>(pageNo, pageSize);
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("taskJson");
List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
for(TaskInstance taskInstance : taskInstanceList){
taskInstance.setDuration(DateUtils.differSec(taskInstance.getStartTime(),
taskInstance.getEndTime()));
taskInstance.setDuration(DateUtils.differSec(taskInstance.getStartTime(), taskInstance.getEndTime()));
User executor = usersService.queryUser(taskInstance.getExecutorId());
if (null != executor) {
taskInstance.setExecutorName(executor.getUserName());
}
}
pageInfo.setTotalCount((int)taskInstanceIPage.getTotal());
pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(),exclusionSet));

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java

@ -191,6 +191,26 @@ public class UsersService extends BaseService {
return userMapper.queryUserByNamePassword(name, md5);
}
/**
* get user id by user name
* @param name user name
* @return if name empty 0, user not exists -1, user exist user id
*/
public int getUserIdByName(String name) {
//executor name query
int executorId = 0;
if (StringUtils.isNotEmpty(name)) {
User executor = queryUser(name);
if (null != executor) {
executorId = executor.getId();
} else {
executorId = -1;
}
}
return executorId;
}
/**
* query user list
*

169
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -16,43 +16,131 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.Map;
import java.text.MessageFormat;
import java.util.*;
@RunWith(SpringRunner.class)
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessInstanceServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceServiceTest.class);
@Autowired
@InjectMocks
ProcessInstanceService processInstanceService;
@Mock
ProjectMapper projectMapper;
@Mock
ProjectService projectService;
@Mock
ProcessService processService;
@Mock
ProcessInstanceMapper processInstanceMapper;
@Mock
ProcessDefinitionMapper processDefineMapper;
@Mock
ProcessDefinitionService processDefinitionService;
@Mock
ExecutorService execService;
@Mock
TaskInstanceMapper taskInstanceMapper;
@Mock
LoggerService loggerService;
@Mock
WorkerGroupMapper workerGroupMapper;
@Mock
UsersService usersService;
@Test
public void viewVariables() {
try {
Map<String, Object> map = processInstanceService.viewVariables(-1);
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}catch (Exception e){
logger.error(e.getMessage(), e);
}
public void testQueryProcessInstanceList() {
String projectName = "project_test1";
User loginUser = getAdminUser();
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
//project auth fail
when(projectMapper.queryByName(projectName)).thenReturn(null);
when(projectService.checkProjectAndAuth(loginUser,null,projectName)).thenReturn(result);
Map<String, Object> proejctAuthFailRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 46, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "test_user", ExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", 1, 10);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
//project auth success
putMsg(result, Status.SUCCESS, projectName);
Project project = getProject(projectName);
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance();
List<ProcessInstance> processInstanceList = new ArrayList<>();
Page<ProcessInstance> pageReturn = new Page<>(1, 10);
processInstanceList.add(processInstance);
pageReturn.setRecords(processInstanceList);
when(projectMapper.queryByName(projectName)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", 1, 10);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
//executor null
when(usersService.queryUser(loginUser.getId())).thenReturn(null);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(-1);
Map<String, Object> executorExistRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "admin", ExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", 1, 10);
Assert.assertEquals(Status.SUCCESS, executorExistRes.get(Constants.STATUS));
//executor name empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(0), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
Map<String, Object> executorEmptyRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "", ExecutionStatus.SUBMITTED_SUCCESS,
"192.168.xx.xx", 1, 10);
Assert.assertEquals(Status.SUCCESS, executorEmptyRes.get(Constants.STATUS));
}
@Test
@ -63,21 +151,58 @@ public class ProcessInstanceServiceTest {
try {
Map<String, DependResult> resultMap =
processInstanceService.parseLogForDependentResult(logString);
Assert.assertEquals(resultMap.size() , 1);
Assert.assertEquals(1 , resultMap.size());
} catch (IOException e) {
}
}
@Test
public void queryProcessInstanceList() throws Exception {
/**
* get Mock Admin User
* @return admin user
*/
private User getAdminUser() {
User loginUser = new User();
loginUser.setId(27);
loginUser.setId(-1);
loginUser.setUserName("admin");
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> map = processInstanceService.queryProcessInstanceList(loginUser, "project_test1", 0, "", "", "", ExecutionStatus.FAILURE, "", 1, 5);
return loginUser;
}
/**
* get mock Project
* @param projectName projectName
* @return Project
*/
private Project getProject(String projectName){
Project project = new Project();
project.setId(1);
project.setName(projectName);
project.setUserId(1);
return project;
}
/**
* get Mock process instance
* @return process instance
*/
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_process_instance");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
return processInstance;
}
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
}

154
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -16,47 +16,177 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Map;
import java.text.MessageFormat;
import java.util.*;
@RunWith(SpringRunner.class)
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class TaskInstanceServiceTest {
private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceTest.class);
@Autowired
@InjectMocks
private TaskInstanceService taskInstanceService;
@Mock
ProjectMapper projectMapper;
@Mock
ProjectService projectService;
@Mock
ProcessService processService;
@Mock
TaskInstanceMapper taskInstanceMapper;
@Mock
ProcessInstanceService processInstanceService;
@Mock
UsersService usersService;
@Test
public void queryTaskListPaging(){
String projectName = "project_test1";
User loginUser = getAdminUser();
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
//project auth fail
when(projectMapper.queryByName(projectName)).thenReturn(null);
when(projectService.checkProjectAndAuth(loginUser,null,projectName)).thenReturn(result);
Map<String, Object> proejctAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, "project_test1", 0, "",
"test_user", "2019-02-26 19:48:00", "2019-02-26 19:48:22", "", null, "", 1, 20);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS));
//project
putMsg(result, Status.SUCCESS, projectName);
Project project = getProject(projectName);
Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00");
Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00");
ProcessInstance processInstance = getProcessInstance();
TaskInstance taskInstance = getTaskInstance();
List<TaskInstance> taskInstanceList = new ArrayList<>();
Page<TaskInstance> pageReturn = new Page<>(1, 10);
taskInstanceList.add(taskInstance);
pageReturn.setRecords(taskInstanceList);
when(projectMapper.queryByName(Mockito.anyString())).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId())).thenReturn(processInstance);
Map<String, Object> successRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
//executor name empty
when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(""),
eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
Map<String, Object> executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
"", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, executorEmptyRes.get(Constants.STATUS));
//executor null
when(usersService.queryUser(loginUser.getId())).thenReturn(null);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(-1);
Map<String, Object> executorNullRes = taskInstanceService.queryTaskListPaging(loginUser, projectName, 1, "",
"test_user", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", ExecutionStatus.SUCCESS, "192.168.xx.xx", 1, 20);
Assert.assertEquals(Status.SUCCESS, executorNullRes.get(Constants.STATUS));
}
/**
* get Mock Admin User
* @return admin user
*/
private User getAdminUser() {
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserName("admin");
loginUser.setUserType(UserType.GENERAL_USER);
return loginUser;
}
Map<String, Object> map = taskInstanceService.queryTaskListPaging(loginUser, "project_test1", 0, "",
"2019-02-26 19:48:00", "2019-02-26 19:48:22", "", null, "", 1, 20);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
PageInfo pageInfo = (PageInfo) map.get("data");
/**
* get mock Project
* @param projectName projectName
* @return Project
*/
private Project getProject(String projectName){
Project project = new Project();
project.setId(1);
project.setName(projectName);
project.setUserId(1);
return project;
}
if(pageInfo != null){
logger.info(pageInfo.getLists().toString());
/**
* get Mock process instance
* @return process instance
*/
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setName("test_process_instance");
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstance.setExecutorId(-1);
return processInstance;
}
/**
* get Mock task instance
* @return task instance
*/
private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setName("test_task_instance");
taskInstance.setStartTime(new Date());
taskInstance.setEndTime(new Date());
taskInstance.setExecutorId(-1);
return taskInstance;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
}

22
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -148,6 +148,28 @@ public class UsersServiceTest {
Assert.assertTrue(queryUser!=null);
}
@Test
public void testGetUserIdByName() {
User user = new User();
user.setId(1);
user.setUserType(UserType.ADMIN_USER);
user.setUserName("test_user");
//user name null
int userId = usersService.getUserIdByName("");
Assert.assertEquals(0, userId);
//user not exist
when(usersService.queryUser(user.getUserName())).thenReturn(null);
int userNotExistId = usersService.getUserIdByName(user.getUserName());
Assert.assertEquals(-1, userNotExistId);
//user exist
when(usersService.queryUser(user.getUserName())).thenReturn(user);
int userExistId = usersService.getUserIdByName(user.getUserName());
Assert.assertEquals(user.getId(), userExistId);
}
@Test

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -139,6 +139,12 @@ public class ProcessInstance {
*/
private int executorId;
/**
* executor name
*/
@TableField(exist = false)
private String executorName;
/**
* tenant code
*/
@ -472,6 +478,14 @@ public class ProcessInstance {
return historyCmd;
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public void setHistoryCmd(String historyCmd) {
this.historyCmd = historyCmd;
}

51
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -196,6 +196,17 @@ public class TaskInstance {
*/
private int workerGroupId;
/**
* executor id
*/
private int executorId;
/**
* executor name
*/
@TableField(exist = false)
private String executorName;
public void init(String host,Date startTime,String executePath){
@ -415,6 +426,22 @@ public class TaskInstance {
this.retryInterval = retryInterval;
}
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public String getExecutorName() {
return executorName;
}
public void setExecutorName(String executorName) {
this.executorName = executorName;
}
public Boolean isTaskComplete() {
return this.getState().typeIsPause()
@ -466,6 +493,14 @@ public class TaskInstance {
this.workerGroupId = workerGroupId;
}
public String getDependentResult() {
return dependentResult;
}
public void setDependentResult(String dependentResult) {
this.dependentResult = dependentResult;
}
@Override
public String toString() {
return "TaskInstance{" +
@ -485,27 +520,21 @@ public class TaskInstance {
", logPath='" + logPath + '\'' +
", retryTimes=" + retryTimes +
", alertFlag=" + alertFlag +
", flag=" + flag +
", processInstance=" + processInstance +
", processDefine=" + processDefine +
", pid=" + pid +
", appLink='" + appLink + '\'' +
", flag=" + flag +
", dependency=" + dependency +
", dependency='" + dependency + '\'' +
", duration=" + duration +
", maxRetryTimes=" + maxRetryTimes +
", retryInterval=" + retryInterval +
", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority +
", workGroupId=" + workerGroupId +
", dependentResult='" + dependentResult + '\'' +
", workerGroupId=" + workerGroupId +
", executorId=" + executorId +
", executorName='" + executorName + '\'' +
'}';
}
public String getDependentResult() {
return dependentResult;
}
public void setDependentResult(String dependentResult) {
this.dependentResult = dependentResult;
}
}

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -82,6 +82,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("projectId") int projectId,
@Param("processDefinitionId") Integer processDefinitionId,
@Param("searchVal") String searchVal,
@Param("executorId") Integer executorId,
@Param("states") int[] statusArray,
@Param("host") String host,
@Param("startTime") Date startTime,

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -16,13 +16,12 @@
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
@ -64,6 +63,7 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("processInstanceId") Integer processInstanceId,
@Param("searchVal") String searchVal,
@Param("taskName") String taskName,
@Param("executorId") int executorId,
@Param("states") int[] statusArray,
@Param("host") String host,
@Param("startTime") Date startTime,

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -90,6 +90,9 @@
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
</if>
<if test="executorId != 0">
and instance.executor_id = #{executorId}
</if>
order by instance.start_time desc
</select>
<update id="setFailoverByHostAndStateArray">

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -124,6 +124,9 @@
<if test="host != null and host != ''">
and instance.host like concat('%', #{host}, '%')
</if>
<if test="executorId != 0">
and instance.executor_id = #{executorId}
</if>
order by instance.start_time desc
</select>
</mapper>

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -165,6 +165,7 @@ public class ProcessInstanceMapperTest {
processDefinition.getProjectId(),
processInstance.getProcessDefinitionId(),
processInstance.getName(),
0,
stateArray,
processInstance.getHost(),
null,

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -282,6 +282,7 @@ public class TaskInstanceMapperTest {
task.getProcessInstanceId(),
"",
"",
0,
new int[0],
"",
null,null

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -950,6 +950,7 @@ public class ProcessService {
}
}
}
taskInstance.setExecutorId(processInstance.getExecutorId());
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
taskInstance.setSubmitTime(new Date());

7
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/_source/instanceConditions/index.vue

@ -54,6 +54,9 @@
<div class="list">
<x-input v-model.trim="searchParams.host" @on-enterkey="_ckQuery" style="width: 140px;" size="small" :placeholder="$t('host')"></x-input>
</div>
<div class="list">
<x-input v-model.trim="searchParams.executorName" @on-enterkey="_ckQuery" style="width: 140px;" size="small" :placeholder="$t('Executor')"></x-input>
</div>
<div class="list">
<x-input v-model.trim="searchParams.searchVal" @on-enterkey="_ckQuery" style="width: 200px;" size="small" :placeholder="$t('name')"></x-input>
</div>
@ -80,7 +83,9 @@
// search value
searchVal: '',
// host
host: ''
host: '',
// executor name
executorName: ''
}
}
},

7
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue

@ -28,6 +28,9 @@
<th scope="col">
<span>{{$t('Process Name')}}</span>
</th>
<th scope="col" width="70">
<span>{{$t('Executor')}}</span>
</th>
<th scope="col" width="70">
<span>{{$t('Run Type')}}</span>
</th>
@ -67,6 +70,10 @@
<td>
<span class="ellipsis" style="padding-left: 4px;"><router-link :to="{ path: '/projects/instance/list/' + item.id}" tag="a" class="links" :title="item.name">{{item.name}}</router-link></span>
</td>
<td>
<span v-if="item.executorName">{{item.executorName}}</span>
<span v-else>-</span>
</td>
<td><span>{{_rtRunningType(item.commandType)}}</span></td>
<td>
<span v-if="item.scheduleTime">{{item.scheduleTime | formatDate}}</span>

4
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/index.vue

@ -71,7 +71,9 @@
// Start Time
startDate: '',
// End Time
endDate: ''
endDate: '',
// Exectuor Name
executorName: ''
}
}
},

7
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue

@ -28,6 +28,9 @@
<th scope="col">
<span>{{$t('Process Instance')}}</span>
</th>
<th scope="col" width="70">
<span>{{$t('Executor')}}</span>
</th>
<th scope="col" width="90">
<span>{{$t('Node Type')}}</span>
</th>
@ -64,6 +67,10 @@
<span class="ellipsis" :title="item.name">{{item.name}}</span>
</td>
<td><a href="javascript:" class="links" @click="_go(item)"><span class="ellipsis">{{item.processInstanceName}}</span></a></td>
<td>
<span v-if="item.executorName">{{item.executorName}}</span>
<span v-else>-</span>
</td>
<td><span>{{item.taskType}}</span></td>
<td><span v-html="_rtState(item.state)" style="cursor: pointer;"></span></td>
<td>

4
dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/taskInstance/index.vue

@ -68,7 +68,9 @@
// start date
startDate: '',
// end date
endDate: ''
endDate: '',
// Exectuor Name
executorName: ''
}
}
},

1
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -240,6 +240,7 @@ export default {
'Service-Master': 'Service-Master',
'Service-Worker': 'Service-Worker',
'Process Name': 'Process Name',
'Executor': 'Executor',
'Run Type': 'Run Type',
'Scheduling Time': 'Scheduling Time',
'Run Times': 'Run Times',

1
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -239,6 +239,7 @@ export default {
'Service-Master': '服务管理-Master',
'Service-Worker': '服务管理-Worker',
'Process Name': '工作流名称',
'Executor': '执行用户',
'Run Type': '运行类型',
'Scheduling Time': '调度时间',
'Run Times': '运行次数',

2
pom.xml

@ -708,6 +708,8 @@
<include>**/api/service/BaseDAGServiceTest.java</include>
<include>**/api/service/LoggerServiceTest.java</include>
<include>**/api/service/DataAnalysisServiceTest.java</include>
<include>**/api/service/ProcessInstanceServiceTest.java</include>
<include>**/api/service/TaskInstanceServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include>
<include>**/alert/utils/JSONUtilsTest.java</include>

1
sql/dolphinscheduler-postgre.sql

@ -574,6 +574,7 @@ CREATE TABLE t_ds_task_instance (
max_retry_times int DEFAULT NULL ,
task_instance_priority int DEFAULT NULL ,
worker_group_id int DEFAULT '-1' ,
executor_id int DEFAULT NULL ,
PRIMARY KEY (id)
) ;

1
sql/dolphinscheduler_mysql.sql

@ -616,6 +616,7 @@ CREATE TABLE `t_ds_task_instance` (
`max_retry_times` int(2) DEFAULT NULL COMMENT 'max retry times',
`task_instance_priority` int(11) DEFAULT NULL COMMENT 'task instance priority:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) DEFAULT '-1' COMMENT 'worker group id',
`executor_id` int(11) DEFAULT NULL COMMENT 'executor id',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) USING BTREE,

20
sql/upgrade/1.2.2_schema/mysql/dolphinscheduler_ddl.sql

@ -35,3 +35,23 @@ d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_definition_A_modify_by;
DROP PROCEDURE uc_dolphin_T_t_ds_process_definition_A_modify_by;
-- uc_dolphin_T_t_ds_process_definition_A_modify_by
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_executor_id;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_executor_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='executor_id')
THEN
ALTER TABLE t_ds_task_instance ADD `executor_id` int(11) DEFAULT NULL COMMENT 'executor id';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_A_executor_id;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_executor_id;

17
sql/upgrade/1.2.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -32,3 +32,20 @@ delimiter ;
SELECT uc_dolphin_T_t_ds_process_definition_A_modify_by();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_definition_A_modify_by();
-- uc_dolphin_T_t_ds_process_definition_A_modify_by
delimiter d//
CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_executor_id() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='executor_id')
THEN
ALTER TABLE t_ds_task_instance ADD COLUMN executor_id int DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
SELECT uc_dolphin_T_t_ds_task_instance_A_executor_id();
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_executor_id();

Loading…
Cancel
Save