diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java index af23a4ce0b..8c032b399e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskInstanceController.java @@ -119,9 +119,21 @@ public class TaskInstanceController extends BaseController { return result; } searchVal = ParameterUtils.handleEscapes(searchVal); - result = taskInstanceService.queryTaskListPaging(loginUser, projectCode, processInstanceId, processInstanceName, + result = taskInstanceService.queryTaskListPaging( + loginUser, + projectCode, + processInstanceId, + processInstanceName, processDefinitionName, - taskName, executorName, startTime, endTime, searchVal, stateType, host, taskExecuteType, pageNo, + taskName, + executorName, + startTime, + endTime, + searchVal, + stateType, + host, + taskExecuteType, + pageNo, pageSize); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java index 6c86aeb0b2..3d5d3197dc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseService.java @@ -18,10 +18,12 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.dao.entity.User; +import java.util.Date; import java.util.List; import java.util.Map; @@ -107,12 +109,8 @@ public interface BaseService { /** * check and parse date parameters - * - * @param startDateStr start date string - * @param endDateStr end date string - * @return map */ - Map checkAndParseDateParameters(String startDateStr, String endDateStr); + Date checkAndParseDateParameters(String startDateStr) throws ServiceException; /** * check checkDescriptionLength diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index fc60773839..20bea9b600 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowInstanceQueryRequest; +import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -82,18 +83,18 @@ public interface ProcessInstanceService { * @param otherParamsJson otherParamsJson handle other params * @return process instance list */ - Result queryProcessInstanceList(User loginUser, - long projectCode, - long processDefineCode, - String startDate, - String endDate, - String searchVal, - String executorName, - WorkflowExecutionStatus stateType, - String host, - String otherParamsJson, - Integer pageNo, - Integer pageSize); + Result> queryProcessInstanceList(User loginUser, + long projectCode, + long processDefineCode, + String startDate, + String endDate, + String searchVal, + String executorName, + WorkflowExecutionStatus stateType, + String host, + String otherParamsJson, + Integer pageNo, + Integer pageSize); /** * paging query process instance list, filtering according to project, process definition, time range, keyword, process status diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AuditServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AuditServiceImpl.java index 834dede53d..4fdf0a169a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AuditServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AuditServiceImpl.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.AuditService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuditOperationType; import org.apache.dolphinscheduler.common.enums.AuditResourceType; import org.apache.dolphinscheduler.dao.entity.AuditLog; @@ -34,7 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.AuditLogMapper; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; @@ -85,13 +83,6 @@ public class AuditServiceImpl extends BaseServiceImpl implements AuditService { Integer pageNo, Integer pageSize) { Result result = new Result(); - Map checkAndParseDateResult = checkAndParseDateParameters(startDate, endDate); - Status resultEnum = (Status) checkAndParseDateResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - putMsg(result, resultEnum); - return result; - } - int[] resourceArray = null; if (resourceType != null) { resourceArray = new int[]{resourceType.getCode()}; @@ -102,8 +93,8 @@ public class AuditServiceImpl extends BaseServiceImpl implements AuditService { opsArray = new int[]{operationType.getCode()}; } - Date start = (Date) checkAndParseDateResult.get(Constants.START_TIME); - Date end = (Date) checkAndParseDateResult.get(Constants.END_TIME); + Date start = (Date) checkAndParseDateParameters(startDate); + Date end = (Date) checkAndParseDateParameters(endDate); Page page = new Page<>(pageNo, pageSize); IPage logIPage = auditLogMapper.queryAuditLog(page, resourceArray, opsArray, userName, start, end); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java index 8c8a54fe1a..bdb1295bc8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/BaseServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service.impl; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.utils.Result; @@ -31,7 +32,6 @@ import org.apache.commons.lang3.StringUtils; import java.text.MessageFormat; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -186,38 +186,18 @@ public class BaseServiceImpl implements BaseService { /** * check and parse date parameters - * - * @param startDateStr start date string - * @param endDateStr end date string - * @return map */ @Override - public Map checkAndParseDateParameters(String startDateStr, String endDateStr) { - Map result = new HashMap<>(); + public Date checkAndParseDateParameters(String startDateStr) throws ServiceException { Date start = null; if (!StringUtils.isEmpty(startDateStr)) { start = DateUtils.stringToDate(startDateStr); if (Objects.isNull(start)) { logger.warn("Parameter startDateStr is invalid."); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); - return result; - } - } - result.put(Constants.START_TIME, start); - - Date end = null; - if (!StringUtils.isEmpty(endDateStr)) { - end = DateUtils.stringToDate(endDateStr); - if (Objects.isNull(end)) { - logger.warn("Parameter endDateStr is invalid."); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); - return result; + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); } } - result.put(Constants.END_TIME, end); - - putMsg(result, Status.SUCCESS); - return result; + return start; } @Override diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 6dcbcaa5f6..5b8cfb0eeb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -301,22 +301,24 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce * @return process instance list */ @Override - public Result queryProcessInstanceList(User loginUser, long projectCode, long processDefineCode, String startDate, - String endDate, String searchVal, String executorName, - WorkflowExecutionStatus stateType, String host, String otherParamsJson, - Integer pageNo, Integer pageSize) { + public Result> queryProcessInstanceList(User loginUser, + long projectCode, + long processDefineCode, + String startDate, + String endDate, + String searchVal, + String executorName, + WorkflowExecutionStatus stateType, + String host, + String otherParamsJson, + Integer pageNo, + Integer pageSize) { Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); // check user access for project - Map checkResult = - projectService.checkProjectAndAuth(loginUser, project, projectCode, - ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - putMsg(result, resultEnum); - return result; - } + projectService.checkProjectAndAuthThrowException(loginUser, project, + ApiFuncIdentificationConstant.WORKFLOW_INSTANCE); int[] statusArray = null; // filter by state @@ -324,21 +326,22 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce statusArray = new int[]{stateType.getCode()}; } - Map checkAndParseDateResult = checkAndParseDateParameters(startDate, endDate); - resultEnum = (Status) checkAndParseDateResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - putMsg(result, resultEnum); - return result; - } - Date start = (Date) checkAndParseDateResult.get(Constants.START_TIME); - Date end = (Date) checkAndParseDateResult.get(Constants.END_TIME); + Date start = checkAndParseDateParameters(startDate); + Date end = checkAndParseDateParameters(endDate); Page page = new Page<>(pageNo, pageSize); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); - int executorId = usersService.getUserIdByName(executorName); - IPage processInstanceList = processInstanceMapper.queryProcessInstanceListPaging(page, - project.getCode(), processDefineCode, searchVal, executorId, statusArray, host, start, end); + IPage processInstanceList = processInstanceMapper.queryProcessInstanceListPaging( + page, + project.getCode(), + processDefineCode, + searchVal, + executorName, + statusArray, + host, + start, + end); List processInstances = processInstanceList.getRecords(); List userIds = Collections.emptyList(); @@ -385,6 +388,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getCode(), processInstance.getName()); processInstance.setProcessDefinitionCode(processDefinition.getCode()); + processInstance.setProjectCode(project.getCode()); } Page page = @@ -392,10 +396,15 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce PageInfo pageInfo = new PageInfo<>(workflowInstanceQueryRequest.getPageNo(), workflowInstanceQueryRequest.getPageSize()); - IPage processInstanceList = processInstanceMapper.queryProcessInstanceListV2Paging(page, - processInstance.getProcessDefinitionCode(), processInstance.getName(), - workflowInstanceQueryRequest.getStartTime(), workflowInstanceQueryRequest.getEndTime(), - workflowInstanceQueryRequest.getState(), processInstance.getHost()); + IPage processInstanceList = processInstanceMapper.queryProcessInstanceListV2Paging( + page, + processInstance.getProjectCode(), + processInstance.getProcessDefinitionCode(), + processInstance.getName(), + workflowInstanceQueryRequest.getStartTime(), + workflowInstanceQueryRequest.getEndTime(), + workflowInstanceQueryRequest.getState(), + processInstance.getHost()); List processInstances = processInstanceList.getRecords(); List userIds = Collections.emptyList(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 1d4f9b64de..d0bee2ae39 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -138,38 +138,44 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); // check user access for project - Map checkResult = - projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE); - Status status = (Status) checkResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - putMsg(result, status); - return result; - } + projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_INSTANCE); int[] statusArray = null; if (stateType != null) { statusArray = new int[]{stateType.getCode()}; } - Map checkAndParseDateResult = checkAndParseDateParameters(startDate, endDate); - status = (Status) checkAndParseDateResult.get(Constants.STATUS); - if (status != Status.SUCCESS) { - putMsg(result, status); - return result; - } - Date start = (Date) checkAndParseDateResult.get(Constants.START_TIME); - Date end = (Date) checkAndParseDateResult.get(Constants.END_TIME); + Date start = checkAndParseDateParameters(startDate); + Date end = checkAndParseDateParameters(endDate); Page page = new Page<>(pageNo, pageSize); PageInfo> pageInfo = new PageInfo<>(pageNo, pageSize); - int executorId = usersService.getUserIdByName(executorName); IPage taskInstanceIPage; if (taskExecuteType == TaskExecuteType.STREAM) { // stream task without process instance taskInstanceIPage = taskInstanceMapper.queryStreamTaskInstanceListPaging( - page, project.getCode(), processDefinitionName, searchVal, taskName, executorId, statusArray, host, - taskExecuteType, start, end); + page, + project.getCode(), + processDefinitionName, + searchVal, + taskName, + executorName, + statusArray, + host, + taskExecuteType, + start, + end); } else { taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging( - page, project.getCode(), processInstanceId, processInstanceName, searchVal, taskName, executorId, - statusArray, host, taskExecuteType, start, end); + page, + project.getCode(), + processInstanceId, + processInstanceName, + searchVal, + taskName, + executorName, + statusArray, + host, + taskExecuteType, + start, + end); } Set exclusionSet = new HashSet<>(); exclusionSet.add(Constants.CLASS); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 7c40b6d2c6..ba62a9c78e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -185,13 +185,14 @@ public class ProcessInstanceServiceTest { putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); // project auth fail - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - Result proejctAuthFailRes = - processInstanceService.queryProcessInstanceList(loginUser, projectCode, 46, "2020-01-01 00:00:00", - "2020-01-02 00:00:00", "", "test_user", WorkflowExecutionStatus.SUBMITTED_SUCCESS, - "192.168.xx.xx", "", 1, 10); - Assertions.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) proejctAuthFailRes.getCode()); + when(projectMapper.queryByCode(projectCode)).thenReturn(null); + Mockito.doThrow(new ServiceException()).when(projectService).checkProjectAndAuthThrowException(Mockito.any(), + Mockito.any(), Mockito.any()); + Assertions.assertThrows(ServiceException.class, () -> { + processInstanceService.queryProcessInstanceList(loginUser, projectCode, 46, "2020-01-01 00:00:00", + "2020-01-02 00:00:00", "", "test_user", WorkflowExecutionStatus.SUBMITTED_SUCCESS, + "192.168.xx.xx", "", 1, 10); + }); Date start = DateUtils.stringToDate("2020-01-01 00:00:00"); Date end = DateUtils.stringToDate("2020-01-02 00:00:00"); @@ -204,17 +205,25 @@ public class ProcessInstanceServiceTest { // data parameter check putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(Mockito.any(), Mockito.any(), + Mockito.any()); when(processDefineMapper.selectById(Mockito.anyInt())).thenReturn(getProcessDefinition()); when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), eq("192.168.xx.xx"), Mockito.any(), Mockito.any())).thenReturn(pageReturn); - - Result dataParameterRes = - processInstanceService.queryProcessInstanceList(loginUser, projectCode, 1, "20200101 00:00:00", - "20200102 00:00:00", "", loginUser.getUserName(), WorkflowExecutionStatus.SUBMITTED_SUCCESS, - "192.168.xx.xx", "", 1, 10); - Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int) dataParameterRes.getCode()); + Assertions.assertThrows(ServiceException.class, () -> processInstanceService.queryProcessInstanceList( + loginUser, + projectCode, + 1, + "20200101 00:00:00", + "20200102 00:00:00", + "", + loginUser.getUserName(), + WorkflowExecutionStatus.SUBMITTED_SUCCESS, + "192.168.xx.xx", + "", + 1, + 10)); // project auth success putMsg(result, Status.SUCCESS, projectCode); @@ -224,7 +233,7 @@ public class ProcessInstanceServiceTest { when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), - eq(1L), eq(""), eq(-1), Mockito.any(), + eq(1L), eq(""), eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); @@ -236,7 +245,7 @@ public class ProcessInstanceServiceTest { // data parameter empty when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), - eq(1L), eq(""), eq(-1), Mockito.any(), + eq(1L), eq(""), eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(null), eq(null))).thenReturn(pageReturn); successRes = processInstanceService.queryProcessInstanceList(loginUser, projectCode, 1, "", "", "", loginUser.getUserName(), WorkflowExecutionStatus.SUBMITTED_SUCCESS, @@ -255,7 +264,7 @@ public class ProcessInstanceServiceTest { // executor name empty when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), - eq(1L), eq(""), eq(0), Mockito.any(), + eq(1L), eq(""), eq("admin"), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); Result executorEmptyRes = processInstanceService.queryProcessInstanceList(loginUser, projectCode, 1, "2020-01-01 00:00:00", diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 89de9b6643..7a032191cb 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.dto.taskInstance.TaskInstanceRemoveCacheResponse; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; @@ -107,9 +108,10 @@ public class TaskInstanceServiceTest { putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); // project auth fail - when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); - Result projectAuthFailRes = taskInstanceService.queryTaskListPaging(loginUser, + when(projectMapper.queryByCode(projectCode)).thenReturn(null); + Mockito.doThrow(new ServiceException()).when(projectService).checkProjectAndAuthThrowException(Mockito.any(), + Mockito.any(), Mockito.any()); + Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging(loginUser, projectCode, 0, "", @@ -123,14 +125,13 @@ public class TaskInstanceServiceTest { "", TaskExecuteType.BATCH, 1, - 20); - Assertions.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) projectAuthFailRes.getCode()); + 20)); // data parameter check putMsg(result, Status.SUCCESS, projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); - Result dataParameterRes = taskInstanceService.queryTaskListPaging(loginUser, + Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", @@ -144,8 +145,7 @@ public class TaskInstanceServiceTest { "192.168.xx.xx", TaskExecuteType.BATCH, 1, - 20); - Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), (int) dataParameterRes.getCode()); + 20)); // project putMsg(result, Status.SUCCESS, projectCode); @@ -158,12 +158,23 @@ public class TaskInstanceServiceTest { taskInstanceList.add(taskInstance); pageReturn.setRecords(taskInstanceList); when(projectMapper.queryByCode(projectCode)).thenReturn(project); - when(projectService.checkProjectAndAuth(loginUser, project, projectCode, TASK_INSTANCE)).thenReturn(result); + Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(Mockito.any(), Mockito.any(), + Mockito.any()); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); - when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), - eq(""), eq(""), eq(""), - eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))) + when(taskInstanceMapper.queryTaskInstanceListPaging( + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any())) .thenReturn(pageReturn); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); when(processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId())) @@ -175,9 +186,10 @@ public class TaskInstanceServiceTest { Assertions.assertEquals(Status.SUCCESS.getCode(), (int) successRes.getCode()); // executor name empty - when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), + when(taskInstanceMapper.queryTaskInstanceListPaging( + Mockito.any(Page.class), eq(project.getCode()), eq(1), eq(""), eq(""), eq(""), - eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))) + eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), eq(start), eq(end))) .thenReturn(pageReturn); Result executorEmptyRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "", "", "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", @@ -196,7 +208,7 @@ public class TaskInstanceServiceTest { // start/end date null when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), eq(""), eq(""), eq(""), - eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any())) + eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any())) .thenReturn(pageReturn); Result executorNullDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "", "", null, null, "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20); @@ -205,18 +217,42 @@ public class TaskInstanceServiceTest { // start date error format when(taskInstanceMapper.queryTaskInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1), eq(""), eq(""), eq(""), - eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any())) + eq(""), Mockito.any(), eq("192.168.xx.xx"), eq(TaskExecuteType.BATCH), any(), any())) .thenReturn(pageReturn); - Result executorErrorStartDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", + Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging( + loginUser, + projectCode, + 1, + "", + "", + "", + "", + "error date", + null, + "", + TaskExecutionStatus.SUCCESS, + "192.168.xx.xx", + TaskExecuteType.BATCH, + 1, + 20)); + + Assertions.assertThrows(ServiceException.class, () -> taskInstanceService.queryTaskListPaging( + loginUser, + projectCode, + 1, + "", + "", "", - "", "error date", null, "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20); - Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), - (int) executorErrorStartDateRes.getCode()); - Result executorErrorEndDateRes = taskInstanceService.queryTaskListPaging(loginUser, projectCode, 1, "", "", "", - "", null, "error date", "", TaskExecutionStatus.SUCCESS, "192.168.xx.xx", TaskExecuteType.BATCH, 1, 20); - Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), - (int) executorErrorEndDateRes.getCode()); + "", + null, + "error date", + "", + TaskExecutionStatus.SUCCESS, + "192.168.xx.xx", + TaskExecuteType.BATCH, + 1, + 20)); } /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index 741d21edbe..70e89cfb80 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -58,24 +58,14 @@ public class ProcessInstance { @TableId(value = "id", type = IdType.AUTO) private Integer id; - /** - * process definition code - */ private Long processDefinitionCode; - /** - * process definition version - */ private int processDefinitionVersion; - /** - * process state - */ + private Long projectCode; + private WorkflowExecutionStatus state; - /** - * state history - */ private String stateHistory; /** @@ -88,80 +78,35 @@ public class ProcessInstance { * recovery flag for failover */ private Flag recovery; - /** - * start time - */ private Date startTime; - /** - * end time - */ @TableField(updateStrategy = FieldStrategy.IGNORED) private Date endTime; - /** - * run time - */ private int runTimes; - /** - * name - */ private String name; - /** - * host - */ private String host; - /** - * process definition structure - */ @TableField(exist = false) private ProcessDefinition processDefinition; - /** - * process command type - */ private CommandType commandType; - /** - * command parameters - */ private String commandParam; - /** - * node depend type - */ private TaskDependType taskDependType; - /** - * task max try times - */ private int maxTryTimes; - /** - * failure strategy when task failed. - */ private FailureStrategy failureStrategy; - /** - * warning type - */ private WarningType warningType; - /** - * warning group - */ private Integer warningGroupId; - /** - * schedule time - */ private Date scheduleTime; - /** - * command start time - */ private Date commandStartTime; /** @@ -175,21 +120,12 @@ public class ProcessInstance { @TableField(exist = false) private DagData dagData; - /** - * executor id - */ private int executorId; - /** - * executor name - */ - @TableField(exist = false) private String executorName; - /** - * tenant code - */ - @TableField(exist = false) + private int tenantId; + private String tenantCode; /** @@ -248,11 +184,6 @@ public class ProcessInstance { */ private int timeout; - /** - * tenant id - */ - private int tenantId; - /** * varPool string */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index fbef6cc9fe..02b17ca9e7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -70,30 +70,16 @@ public class TaskInstance implements Serializable { */ private String taskType; - /** - * process instance id - */ private int processInstanceId; - /** - * task code - */ + private String processInstanceName; + + private Long projectCode; + private long taskCode; - /** - * task definition version - */ private int taskDefinitionVersion; - /** - * process instance name - */ - @TableField(exist = false) - private String processInstanceName; - - /** - * process definition name - */ @TableField(exist = false) private String processDefinitionName; @@ -269,10 +255,6 @@ public class TaskInstance implements Serializable { */ private String varPool; - /** - * executor name - */ - @TableField(exist = false) private String executorName; @TableField(exist = false) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 2897ee5450..d6cc986a39 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -101,7 +101,7 @@ public interface ProcessInstanceMapper extends BaseMapper { * @param projectCode projectCode * @param processDefinitionCode processDefinitionCode * @param searchVal searchVal - * @param executorId executorId + * @param executorName executorName * @param statusArray statusArray * @param host host * @param startTime startTime @@ -112,7 +112,7 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("projectCode") Long projectCode, @Param("processDefinitionCode") Long processDefinitionCode, @Param("searchVal") String searchVal, - @Param("executorId") Integer executorId, + @Param("executorName") String executorName, @Param("states") int[] statusArray, @Param("host") String host, @Param("startTime") Date startTime, @@ -281,6 +281,7 @@ public interface ProcessInstanceMapper extends BaseMapper { * @return process instance IPage */ IPage queryProcessInstanceListV2Paging(Page page, + @Param("projectCode") Long projectCode, @Param("processDefinitionCode") Long processDefinitionCode, @Param("name") String name, @Param("startTime") String startTime, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 14f354f11e..ae66577214 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -136,7 +136,7 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("processInstanceName") String processInstanceName, @Param("searchVal") String searchVal, @Param("taskName") String taskName, - @Param("executorId") int executorId, + @Param("executorName") String executorName, @Param("states") int[] statusArray, @Param("host") String host, @Param("taskExecuteType") TaskExecuteType taskExecuteType, @@ -148,7 +148,7 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("processDefinitionName") String processDefinitionName, @Param("searchVal") String searchVal, @Param("taskName") String taskName, - @Param("executorId") int executorId, + @Param("executorName") String executorName, @Param("states") int[] statusArray, @Param("host") String host, @Param("taskExecuteType") TaskExecuteType taskExecuteType, diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java index 7566e17783..d9ce044b88 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImpl.java @@ -92,6 +92,7 @@ public class TaskInstanceDaoImpl implements TaskInstanceDao { taskInstance.setState(TaskExecutionStatus.PAUSE); } taskInstance.setExecutorId(processInstance.getExecutorId()); + taskInstance.setExecutorName(processInstance.getExecutorName()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstance)); if (taskInstance.getSubmitTime() == null) { taskInstance.setSubmitTime(new Date()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java index 431a022d59..2fc9fbdf5c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java @@ -33,6 +33,8 @@ public class TaskInstanceUtils { target.setName(source.getName()); target.setTaskType(source.getTaskType()); target.setProcessInstanceId(source.getProcessInstanceId()); + target.setProcessInstanceName(source.getProcessInstanceName()); + target.setProjectCode(source.getProjectCode()); target.setTaskCode(source.getTaskCode()); target.setTaskDefinitionVersion(source.getTaskDefinitionVersion()); target.setProcessInstanceName(source.getProcessInstanceName()); diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index a9fb446307..0bf5b29ea1 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -20,11 +20,11 @@ id - , name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, + , name, process_definition_version, process_definition_code, project_code, state, recovery, start_time, end_time, run_times,host, command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, - process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, + process_instance_priority, worker_group,environment_code, timeout, tenant_id, tenant_code, var_pool, dry_run, test_flag, next_process_instance_id, restart_time, state_history update t_ds_process_instance @@ -276,35 +272,33 @@ and id ]]> #{id} order by id asc limit 1 - SELECT - - - - - FROM t_ds_process_instance instance - join t_ds_process_definition define ON instance.process_definition_code = define.code - where instance.is_sub_process=0 + + FROM t_ds_process_instance + where is_sub_process=0 + + and project_code = #{projectCode} + - and instance.process_definition_code = #{processDefinitionCode} + and process_definition_code = #{processDefinitionCode} - and instance.name like concat('%', #{name}, '%') + and name like concat('%', #{name}, '%') - and instance.start_time = ]]> #{startTime} + and start_time = ]]> #{startTime} - and instance.start_time #{endTime} + and start_time #{endTime} - and instance.state = #{state} + and state = #{state} - and instance.host like concat('%', #{host}, '%') + and host like concat('%', #{host}, '%') - order by instance.start_time desc,instance.id desc + order by start_time desc, id desc - SELECT c.code AS process_definition_code ,c.name AS process_definition_name diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 3768373cf5..3f94e21b61 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -596,6 +596,7 @@ CREATE TABLE t_ds_process_instance name varchar(255) DEFAULT NULL, process_definition_version int(11) DEFAULT NULL, process_definition_code bigint(20) not NULL, + project_code bigint(20) DEFAULT NULL, state tinyint(4) DEFAULT NULL, state_history text, recovery tinyint(4) DEFAULT NULL, @@ -617,6 +618,7 @@ CREATE TABLE t_ds_process_instance update_time timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, is_sub_process int(11) DEFAULT '0', executor_id int(11) NOT NULL, + executor_name varchar(64) DEFAULT NULL, history_cmd text, process_instance_priority int(11) DEFAULT '2', worker_group varchar(64) DEFAULT NULL, @@ -624,6 +626,7 @@ CREATE TABLE t_ds_process_instance timeout int(11) DEFAULT '0', next_process_instance_id int(11) DEFAULT '0', tenant_id int(11) NOT NULL DEFAULT '-1', + tenant_code varchar(64) DEFAULT NULL, var_pool longtext, dry_run int NULL DEFAULT 0, restart_time datetime DEFAULT NULL, @@ -872,6 +875,8 @@ CREATE TABLE t_ds_task_instance task_code bigint(20) NOT NULL, task_definition_version int(11) DEFAULT NULL, process_instance_id int(11) DEFAULT NULL, + process_instance_name varchar(255) DEFAULT NULL, + project_code bigint(20) DEFAULT NULL, state tinyint(4) DEFAULT NULL, submit_time datetime DEFAULT NULL, start_time datetime DEFAULT NULL, @@ -894,6 +899,7 @@ CREATE TABLE t_ds_task_instance environment_code bigint(20) DEFAULT '-1', environment_config text DEFAULT '', executor_id int(11) DEFAULT NULL, + executor_name varchar(64) DEFAULT NULL, first_submit_time datetime DEFAULT NULL, delay_time int(4) DEFAULT '0', task_group_id int(11) DEFAULT NULL, diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 9444f5886d..227692f222 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -598,6 +598,7 @@ CREATE TABLE `t_ds_process_instance` ( `name` varchar(255) DEFAULT NULL COMMENT 'process instance name', `process_definition_code` bigint(20) NOT NULL COMMENT 'process definition code', `process_definition_version` int(11) DEFAULT '0' COMMENT 'process definition version', + `project_code` bigint(20) DEFAULT NULL COMMENT 'project code', `state` tinyint(4) DEFAULT NULL COMMENT 'process instance Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete', `state_history` text DEFAULT NULL COMMENT 'state history desc', `recovery` tinyint(4) DEFAULT NULL COMMENT 'process instance failover flag:0:normal,1:failover instance', @@ -619,12 +620,14 @@ CREATE TABLE `t_ds_process_instance` ( `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `is_sub_process` int(11) DEFAULT '0' COMMENT 'flag, whether the process is sub process', `executor_id` int(11) NOT NULL COMMENT 'executor id', + `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name', `history_cmd` text COMMENT 'history commands of process instance operation', `process_instance_priority` int(11) DEFAULT '2' COMMENT 'process instance priority. 0 Highest,1 High,2 Medium,3 Low,4 Lowest', `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id', `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', `timeout` int(11) DEFAULT '0' COMMENT 'time out', `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id', + `tenant_code` varchar(64) DEFAULT NULL COMMENT 'tenant code', `var_pool` longtext COMMENT 'var_pool', `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run', `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId', @@ -866,6 +869,8 @@ CREATE TABLE `t_ds_task_instance` ( `task_code` bigint(20) NOT NULL COMMENT 'task definition code', `task_definition_version` int(11) DEFAULT '0' COMMENT 'task definition version', `process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id', + `process_instance_name` varchar(255) DEFAULT NULL COMMENT 'process instance name', + `project_code` bigint(20) DEFAULT NULL COMMENT 'project code', `state` tinyint(4) DEFAULT NULL COMMENT 'Status: 0 commit succeeded, 1 running, 2 prepare to pause, 3 pause, 4 prepare to stop, 5 stop, 6 fail, 7 succeed, 8 need fault tolerance, 9 kill, 10 wait for thread, 11 wait for dependency to complete', `submit_time` datetime DEFAULT NULL COMMENT 'task submit time', `start_time` datetime DEFAULT NULL COMMENT 'task start time', @@ -888,6 +893,7 @@ CREATE TABLE `t_ds_task_instance` ( `environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code', `environment_config` text COMMENT 'this config contains many environment variables config', `executor_id` int(11) DEFAULT NULL, + `executor_name` varchar(64) DEFAULT NULL, `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time', `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time', `var_pool` longtext COMMENT 'var_pool', diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 121729ba87..a41959b806 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -524,6 +524,7 @@ CREATE TABLE t_ds_process_instance ( name varchar(255) DEFAULT NULL , process_definition_code bigint DEFAULT NULL , process_definition_version int DEFAULT NULL , + project_code bigint DEFAULT NULL , state int DEFAULT NULL , state_history text, recovery int DEFAULT NULL , @@ -546,6 +547,7 @@ CREATE TABLE t_ds_process_instance ( update_time timestamp NULL , is_sub_process int DEFAULT '0' , executor_id int NOT NULL , + executor_name varchar(64) DEFAULT NULL, history_cmd text , dependence_schedule_times text , process_instance_priority int DEFAULT '2' , @@ -553,6 +555,7 @@ CREATE TABLE t_ds_process_instance ( environment_code bigint DEFAULT '-1', timeout int DEFAULT '0' , tenant_id int NOT NULL DEFAULT '-1' , + tenant_code varchar(64) DEFAULT NULL , var_pool text , dry_run int DEFAULT '0' , next_process_instance_id int DEFAULT '0', @@ -766,6 +769,8 @@ CREATE TABLE t_ds_task_instance ( task_code bigint NOT NULL, task_definition_version int DEFAULT NULL , process_instance_id int DEFAULT NULL , + process_instance_name varchar(255) DEFAULT NULL, + project_code bigint DEFAULT NULL, state int DEFAULT NULL , submit_time timestamp DEFAULT NULL , start_time timestamp DEFAULT NULL , @@ -788,6 +793,7 @@ CREATE TABLE t_ds_task_instance ( environment_code bigint DEFAULT '-1', environment_config text, executor_id int DEFAULT NULL , + executor_name varchar(64) DEFAULT NULL , first_submit_time timestamp DEFAULT NULL , delay_time int DEFAULT '0' , task_group_id int DEFAULT NULL, diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql index a12439debc..a2643503bd 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql @@ -218,3 +218,67 @@ d// delimiter ; CALL add_t_ds_task_instance_idx_cache_key; DROP PROCEDURE add_t_ds_task_instance_idx_cache_key; + +-- ALTER TABLE `t_ds_process_instance` ADD column `project_code`, `process_definition_name`, `executor_name`, `tenant_code`; +drop PROCEDURE if EXISTS add_t_ds_process_instance_add_project_code; +delimiter d// +CREATE PROCEDURE add_t_ds_process_instance_add_project_code() +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='project_code') + THEN +ALTER TABLE t_ds_process_instance ADD `project_code` bigint(20) DEFAULT NULL COMMENT 'project code'; +END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='executor_name') + THEN +ALTER TABLE t_ds_process_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name'; +END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='tenant_code') + THEN +ALTER TABLE t_ds_process_instance ADD `tenant_code` varchar(64) DEFAULT NULL COMMENT 'tenant code'; +END IF; +END; +d// +delimiter ; +CALL add_t_ds_process_instance_add_project_code; +DROP PROCEDURE add_t_ds_process_instance_add_project_code; + +-- ALTER TABLE `t_ds_task_instance` ADD column `project_code`, `process_definition_name`, `executor_name`, `tenant_code`; +drop PROCEDURE if EXISTS add_t_ds_task_instance_add_project_code; +delimiter d// +CREATE PROCEDURE add_t_ds_task_instance_add_project_code() +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='process_instance_name') + THEN +ALTER TABLE t_ds_task_instance ADD `process_instance_name` varchar(255) DEFAULT NULL COMMENT 'process instance name'; +END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='project_code') + THEN +ALTER TABLE t_ds_task_instance ADD `project_code` bigint(20) DEFAULT NULL COMMENT 'project code'; +END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='executor_name') + THEN +ALTER TABLE t_ds_task_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name'; +END IF; +END; +d// +delimiter ; +CALL add_t_ds_task_instance_add_project_code; +DROP PROCEDURE add_t_ds_task_instance_add_project_code; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql index c88f0593c4..b5b4ab048e 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -133,3 +133,77 @@ ALTER TABLE t_ds_task_instance ADD COLUMN IF NOT EXISTS cache_key varchar(200) D ALTER TABLE t_ds_task_instance DROP COLUMN IF EXISTS cacke_key; CREATE INDEX IF NOT EXISTS idx_cache_key ON t_ds_task_instance USING Btree("cache_key"); + +-- add_t_ds_process_instance_add_project_code +delimiter ; +DROP FUNCTION IF EXISTS add_t_ds_process_instance_add_project_code(); +delimiter d// +CREATE FUNCTION add_t_ds_process_instance_add_project_code() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_process_instance' + AND COLUMN_NAME ='project_code') + THEN +ALTER TABLE t_ds_process_instance ADD `project_code` bigint DEFAULT NULL COMMENT 'project code'; +END IF; +IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_process_instance' + AND COLUMN_NAME ='executor_name') + THEN +ALTER TABLE t_ds_process_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name'; +END IF; +IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_process_instance' + AND COLUMN_NAME ='tenant_code') + THEN +ALTER TABLE t_ds_process_instance ADD `tenant_code` varchar(64) DEFAULT NULL COMMENT 'tenant code'; +END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select add_t_ds_process_instance_add_project_code(); +DROP FUNCTION add_t_ds_process_instance_add_project_code(); + +-- add_t_ds_process_instance_add_project_code +delimiter ; +DROP FUNCTION IF EXISTS add_t_ds_task_instance_add_project_code(); +delimiter d// +CREATE FUNCTION add_t_ds_task_instance_add_project_code() RETURNS void AS $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='process_instance_name') + THEN +ALTER TABLE t_ds_task_instance ADD `process_instance_name` varchar(255) DEFAULT NULL COMMENT 'process instance name'; +END IF; +IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='project_code') + THEN +ALTER TABLE t_ds_process_instance ADD `project_code` bigint DEFAULT NULL COMMENT 'project code'; +END IF; +IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_CATALOG=current_database() + AND TABLE_SCHEMA=current_schema() + AND TABLE_NAME='t_ds_task_instance' + AND COLUMN_NAME ='executor_name') + THEN +ALTER TABLE t_ds_task_instance ADD `executor_name` varchar(64) DEFAULT NULL COMMENT 'execute user name'; +END IF; +END; +$$ LANGUAGE plpgsql; +d// +delimiter ; +select add_t_ds_task_instance_add_project_code(); +DROP FUNCTION add_t_ds_task_instance_add_project_code(); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java index 01a9b8b92c..fda62ea2f8 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java @@ -170,6 +170,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest { processDefinitionMapper.insert(processDefinition); ProcessInstance processInstance = insertOne(); + processInstance.setProjectCode(processDefinition.getProjectCode()); processInstance.setProcessDefinitionCode(processDefinition.getCode()); processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); processInstance.setIsSubProcess(Flag.NO); @@ -184,7 +185,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest { processDefinition.getProjectCode(), processInstance.getProcessDefinitionCode(), processInstance.getName(), - 0, + "", stateArray, processInstance.getHost(), null, diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index be71c3561f..09417d468c 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -375,7 +375,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest { "", "", "", - 0, + "", new int[0], "", TaskExecuteType.BATCH, diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 0fbece94ac..690198e9d6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -260,6 +260,7 @@ public class StreamTaskExecuteRunnable implements Runnable { taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); // set process instance id to 0 taskInstance.setProcessInstanceId(0); + taskInstance.setProjectCode(taskDefinition.getProjectCode()); // task instance type taskInstance.setTaskType(taskDefinition.getTaskType().toUpperCase()); // task instance whether alert diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index ec76121279..62caae828e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -1183,6 +1183,8 @@ public class WorkflowExecuteRunnable implements Callable { taskInstance.setState(TaskExecutionStatus.SUBMITTED_SUCCESS); // process instance id taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setProcessInstanceName(processInstance.getName()); + taskInstance.setProjectCode(processInstance.getProjectCode()); // task instance type taskInstance.setTaskType(taskNode.getType().toUpperCase()); // task instance whether alert diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 0ff34d7484..865692a38e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -595,6 +595,7 @@ public class ProcessServiceImpl implements ProcessService { ProcessInstance processInstance = new ProcessInstance(processDefinition); processInstance.setProcessDefinitionCode(processDefinition.getCode()); processInstance.setProcessDefinitionVersion(processDefinition.getVersion()); + processInstance.setProjectCode(processDefinition.getProjectCode()); processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, "init running"); processInstance.setRecovery(Flag.NO); processInstance.setStartTime(new Date()); @@ -608,6 +609,8 @@ public class ProcessServiceImpl implements ProcessService { processInstance.setTaskDependType(command.getTaskDependType()); processInstance.setFailureStrategy(command.getFailureStrategy()); processInstance.setExecutorId(command.getExecutorId()); + processInstance.setExecutorName(Optional.ofNullable(userMapper.selectById(command.getExecutorId())) + .map(User::getUserName).orElse(null)); WarningType warningType = command.getWarningType() == null ? WarningType.NONE : command.getWarningType(); processInstance.setWarningType(warningType); Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId(); @@ -647,6 +650,8 @@ public class ProcessServiceImpl implements ProcessService { .setEnvironmentCode(Objects.isNull(command.getEnvironmentCode()) ? -1 : command.getEnvironmentCode()); processInstance.setTimeout(processDefinition.getTimeout()); processInstance.setTenantId(processDefinition.getTenantId()); + processInstance.setTenantCode(Optional.ofNullable(tenantMapper.queryById(processDefinition.getTenantId())) + .map(Tenant::getTenantCode).orElse(null)); return processInstance; } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 7900a63186..ef08a76fcd 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; @@ -137,6 +138,9 @@ public class ProcessServiceTest { @Mock private UserMapper userMapper; + @Mock + private TenantMapper tenantMapper; + @Mock private TaskInstanceMapper taskInstanceMapper; @Mock diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java index b56d103606..3fe4d0c276 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java @@ -20,10 +20,17 @@ package org.apache.dolphinscheduler.tools.datasource; import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; + +import org.apache.commons.collections4.CollectionUtils; import java.io.IOException; import java.sql.Connection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.sql.DataSource; @@ -38,13 +45,20 @@ public class DolphinSchedulerManager { private final UpgradeDao upgradeDao; - public DolphinSchedulerManager(DataSource dataSource, List daos) throws Exception { + private Map upgraderMap = new HashMap<>(); + + public DolphinSchedulerManager(DataSource dataSource, List daos, + List dolphinSchedulerUpgraders) throws Exception { final DbType type = getCurrentDbType(dataSource); upgradeDao = daos.stream() .filter(it -> it.getDbType() == type) .findFirst() .orElseThrow(() -> new RuntimeException( "Cannot find UpgradeDao implementation for db type: " + type)); + if (CollectionUtils.isNotEmpty(dolphinSchedulerUpgraders)) { + upgraderMap = dolphinSchedulerUpgraders.stream() + .collect(Collectors.toMap(DolphinSchedulerUpgrader::getCurrentVersion, Function.identity())); + } } private DbType getCurrentDbType(DataSource dataSource) throws Exception { @@ -114,6 +128,10 @@ public class DolphinSchedulerManager { } else if ("2.0.0".equals(schemaVersion)) { upgradeDao.upgradeDolphinSchedulerTo200(schemaDir); } + DolphinSchedulerUpgrader dolphinSchedulerUpgrader = upgraderMap.get(schemaVersion); + if (dolphinSchedulerUpgrader != null) { + dolphinSchedulerUpgrader.doUpgrade(); + } version = schemaVersion; } } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/InitDolphinScheduler.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/InitDolphinScheduler.java index 15b1cbb3b2..f086a815f0 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/InitDolphinScheduler.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/InitDolphinScheduler.java @@ -17,14 +17,18 @@ package org.apache.dolphinscheduler.tools.datasource; +import org.apache.dolphinscheduler.dao.DaoConfiguration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; +@ImportAutoConfiguration(DaoConfiguration.class) @SpringBootApplication public class InitDolphinScheduler { diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java index bdd7fcf997..b4039f185b 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/UpgradeDolphinScheduler.java @@ -17,14 +17,18 @@ package org.apache.dolphinscheduler.tools.datasource; +import org.apache.dolphinscheduler.dao.DaoConfiguration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; +@ImportAutoConfiguration(DaoConfiguration.class) @SpringBootApplication public class UpgradeDolphinScheduler { diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java new file mode 100644 index 0000000000..b9d7d0edcf --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.datasource.upgrader; + +public interface DolphinSchedulerUpgrader { + + void doUpgrade(); + + String getCurrentVersion(); +} diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java new file mode 100644 index 0000000000..aceff6f758 --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.datasource.upgrader.v320; + +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; + +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; + +@Slf4j +@Component +public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { + + @Autowired + private ProcessInstanceMapper processInstanceMapper; + + @Autowired + private ProcessDefinitionLogMapper processDefinitionLogMapper; + + @Autowired + private TenantMapper tenantMapper; + + @Autowired + private UserMapper userMapper; + + @Autowired + private TaskInstanceMapper taskInstanceMapper; + + @Override + public void doUpgrade() { + upgradeWorkflowInstance(); + upgradeTaskInstance(); + } + + private void upgradeWorkflowInstance() { + Map tenantMap = tenantMapper.selectList(new QueryWrapper<>()) + .stream() + .collect(Collectors.toMap(Tenant::getId, Tenant::getTenantCode)); + Map userMap = userMapper.selectList(new QueryWrapper<>()) + .stream() + .collect(Collectors.toMap(User::getId, User::getUserName)); + + while (true) { + LambdaQueryWrapper wrapper = new QueryWrapper<>(new ProcessInstance()) + .lambda() + .eq(ProcessInstance::getProjectCode, null) + .last("limit 1000"); + List needUpdateWorkflowInstance = processInstanceMapper.selectList(wrapper); + if (CollectionUtils.isEmpty(needUpdateWorkflowInstance)) { + return; + } + for (ProcessInstance processInstance : needUpdateWorkflowInstance) { + ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( + processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); + if (processDefinitionLog != null) { + processInstance.setProjectCode(processDefinitionLog.getProjectCode()); + processInstance.setTenantCode(tenantMap.get(processDefinitionLog.getTenantId())); + processInstance.setExecutorName(userMap.get(processInstance.getExecutorId())); + } else { + processInstance.setProjectCode(-1L); + } + processInstanceMapper.updateById(processInstance); + } + log.info("Success upgrade workflow instance, current batch size: {}", needUpdateWorkflowInstance.size()); + + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + log.error("Upgrade workflow instance error", e); + } + } + } + + private void upgradeTaskInstance() { + while (true) { + LambdaQueryWrapper wrapper = new QueryWrapper<>(new TaskInstance()) + .lambda() + .eq(TaskInstance::getProjectCode, null) + .last("limit 1000"); + List taskInstances = taskInstanceMapper.selectList(wrapper); + if (CollectionUtils.isEmpty(taskInstances)) { + return; + } + for (TaskInstance taskInstance : taskInstances) { + ProcessInstance processInstance = processInstanceMapper.selectById(taskInstance.getProcessInstanceId()); + if (processInstance == null) { + taskInstance.setProjectCode(-1L); + } else { + taskInstance.setProjectCode(processInstance.getProjectCode()); + taskInstance.setProcessInstanceName(processInstance.getName()); + taskInstance.setExecutorName(processInstance.getExecutorName()); + } + taskInstanceMapper.updateById(taskInstance); + } + + log.info("Success upgrade task instance, current batch size: {}", taskInstances.size()); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + log.error("Upgrade task instance error", e); + } + } + } + + @Override + public String getCurrentVersion() { + return "3.2.0"; + } +}