diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 6896c3ae98..0fafcf7dcb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -149,21 +150,21 @@ public interface ProcessInstanceService { Map viewGantt(Integer processInstanceId) throws Exception; /** - * query process instance by processDefinitionId and stateArray + * query process instance by processDefinitionCode and stateArray * - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @param states states array * @return process instance list */ - List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states); + List queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states); /** - * query process instance by processDefinitionId + * query process instance by processDefinitionCode * - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @param size size * @return process instance list */ - List queryByProcessDefineId(int processDefinitionId,int size); + List queryByProcessDefineCode(Long processDefinitionCode,int size); -} +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index db972c4b6b..374abb6fca 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -484,7 +484,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } // check process instances is already running - List processInstances = processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, Constants.NOT_TERMINATED_STATES); + List processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES); if (CollectionUtils.isNotEmpty(processInstances)) { putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, processInstances.size()); return result; @@ -621,17 +621,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * get process definition list by ids */ private List getProcessDefinitionList(String processDefinitionIds) { - List processDefinitionList = new ArrayList<>(); String[] processDefinitionIdArray = processDefinitionIds.split(","); + + List processDefinitionList = new ArrayList<>(); for (String strProcessDefinitionId : processDefinitionIdArray) { //get workflow info int processDefinitionId = Integer.parseInt(strProcessDefinitionId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); - String processDefinitionJson = JSONUtils.toJsonString(processService.genProcessData(processDefinition)); - processDefinition.setProcessDefinitionJson(processDefinitionJson); - processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); + processDefinitionList.add(exportProcessMetaData(processDefinition)); } - return processDefinitionList; } @@ -671,39 +669,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get export process metadata string * - * @param processDefinitionId process definition id * @param processDefinition process definition * @return export process metadata string */ - public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { - //create workflow json file - return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); - } - - /** - * get export process metadata string - * - * @param processDefinitionId process definition id - * @param processDefinition process definition - * @return export process metadata string - */ - public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + public ProcessMeta exportProcessMetaData(ProcessDefinition processDefinition) { + ProcessData processData = processService.genProcessData(processDefinition); //correct task param which has data source or dependent param - String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinitionJson); - processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); + addExportTaskNodeSpecialParam(processData); //export process metadata ProcessMeta exportProcessMeta = new ProcessMeta(); exportProcessMeta.setProjectName(processDefinition.getProjectName()); exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); - exportProcessMeta.setProcessDefinitionJson(processDefinitionJson); + exportProcessMeta.setProcessDefinitionJson(JSONUtils.toJsonString(processService.genProcessData(processDefinition))); exportProcessMeta.setProcessDefinitionDescription(processDefinition.getDescription()); exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); //schedule info - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); + List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinition.getId()); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); @@ -723,26 +707,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * correct task param which has datasource or dependent * - * @param processDefinitionJson processDefinitionJson + * @param processData process data * @return correct processDefinitionJson */ - private String addExportTaskNodeSpecialParam(String processDefinitionJson) { - ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); - ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); - - for (int i = 0; i < jsonArray.size(); i++) { - JsonNode taskNode = jsonArray.path(i); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - if (null != addTaskParam) { - addTaskParam.addExportSpecialParam(taskNode); - } + private void addExportTaskNodeSpecialParam(ProcessData processData) { + List taskNodeList = processData.getTasks(); + List tmpNodeList = new ArrayList<>(); + for (TaskNode taskNode : taskNodeList) { + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskNode.getType()); + JsonNode jsonNode = JSONUtils.toJsonNode(taskNode); + if (null != addTaskParam) { + addTaskParam.addExportSpecialParam(jsonNode); } + tmpNodeList.add(JSONUtils.parseObject(jsonNode.toString(), TaskNode.class)); } - jsonObject.set(TASKS, jsonArray); - return jsonObject.toString(); + processData.setTasks(tmpNodeList); } /** @@ -1259,7 +1238,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * List of process instances */ - List processInstanceList = processInstanceService.queryByProcessDefineId(processId, limit); + List processInstanceList = processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), limit); for (ProcessInstance processInstance : processInstanceList) { processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())); @@ -1783,4 +1762,4 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } -} +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index a83900e322..95e3f5c4a5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -257,9 +257,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); int executorId = usersService.getUserIdByName(executorName); + ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefineId); + IPage processInstanceList = processInstanceMapper.queryProcessInstanceListPaging(page, - project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end); + project.getCode(), processDefinition.getCode(), searchVal, executorId, statusArray, host, start, end); List processInstances = processInstanceList.getRecords(); @@ -451,7 +453,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce int update = processService.updateProcessInstance(processInstance); int updateDefine = 1; if (Boolean.TRUE.equals(syncDefine)) { - updateDefine = syncDefinition(loginUser, project, processInstanceJson, locations, connects, + updateDefine = syncDefinition(loginUser, project, locations, connects, processInstance, processDefinition, processData); } if (update > 0 && updateDefine > 0) { @@ -465,12 +467,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * sync definition according process instance */ - private int syncDefinition(User loginUser, Project project, String processInstanceJson, String locations, String connects, + private int syncDefinition(User loginUser, Project project, String locations, String connects, ProcessInstance processInstance, ProcessDefinition processDefinition, ProcessData processData) { String originDefParams = JSONUtils.toJsonString(processData.getGlobalParams()); - processDefinition.setProcessDefinitionJson(processInstanceJson); processDefinition.setGlobalParams(originDefParams); processDefinition.setLocations(locations); processDefinition.setConnects(connects); @@ -511,9 +512,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce if (tenant != null) { processInstance.setTenantCode(tenant.getTenantCode()); } - processInstance.setProcessInstanceJson(processInstanceJson); processInstance.setGlobalParams(globalParams); } + /** * query parent process instance detail info by sub process instance id * @@ -645,10 +646,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * get local params - * - * @param processInstance - * @param timeParams - * @return */ private Map> getLocalParams(ProcessInstance processInstance, Map timeParams) { Map> localUserDefParams = new HashMap<>(); @@ -674,6 +671,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } return localUserDefParams; } + /** * encapsulation gantt structure * @@ -732,25 +730,27 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } /** - * query process instance by processDefinitionId and stateArray - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode and stateArray + * + * @param processDefinitionCode processDefinitionCode * @param states states array * @return process instance list */ @Override - public List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) { - return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states); + public List queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states) { + return processInstanceMapper.queryByProcessDefineCodeAndStatus(processDefinitionCode, states); } /** - * query process instance by processDefinitionId - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode + * + * @param processDefinitionCode processDefinitionCode * @param size size * @return process instance list */ @Override - public List queryByProcessDefineId(int processDefinitionId, int size) { - return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size); + public List queryByProcessDefineCode(Long processDefinitionCode, int size) { + return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 96d6ad264b..69d9b22474 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -184,7 +184,6 @@ public class ProcessDefinitionControllerTest { processDefinition.setId(id); processDefinition.setLocations(locations); processDefinition.setName(name); - processDefinition.setProcessDefinitionJson(json); Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); @@ -267,7 +266,6 @@ public class ProcessDefinitionControllerTest { processDefinition.setId(id); processDefinition.setLocations(locations); processDefinition.setName(name); - processDefinition.setProcessDefinitionJson(json); String name2 = "dag_test"; int id2 = 2; @@ -279,7 +277,6 @@ public class ProcessDefinitionControllerTest { processDefinition2.setId(id2); processDefinition2.setLocations(locations); processDefinition2.setName(name2); - processDefinition2.setProcessDefinitionJson(json); resourceList.add(processDefinition); resourceList.add(processDefinition2); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 5f87999fcc..234dd4dc4d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.utils.DateUtils; @@ -391,14 +392,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null); - String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" - + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" - + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" - + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" - + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + ProcessData processData = getProcessData(); Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); Map instanceNotexitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, "project_test1", "test_def"); @@ -416,12 +410,6 @@ public class ProcessDefinitionServiceTest { String projectName = "project_test1"; Project project = getProject(projectName); - String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"," - + "\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234" - + "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," - + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," - + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; - User loginUser = new User(); loginUser.setId(-1); loginUser.setUserType(UserType.GENERAL_USER); @@ -463,7 +451,6 @@ public class ProcessDefinitionServiceTest { // instance exit ProcessDefinition definition = getProcessDefinition(); definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); - definition.setProcessDefinitionJson(processDefinitionJson); definition.setConnects("[]"); Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); @@ -478,8 +465,7 @@ public class ProcessDefinitionServiceTest { , Mockito.any(ProcessDefinition.class))) .thenReturn(1); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(processData); + Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData()); Map map3 = processDefinitionService.batchCopyProcessDefinition( loginUser, projectName, "46", 1); @@ -518,11 +504,6 @@ public class ProcessDefinitionServiceTest { ProcessDefinition definition = getProcessDefinition(); definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); - definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\"" - + ",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234" - + "\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," - + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," - + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"); definition.setConnects("[]"); // check target project result == null @@ -656,16 +637,6 @@ public class ProcessDefinitionServiceTest { loginUser, "project_test1", 46, ReleaseState.getEnum(2)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); - //FIXME has function exit code 1 when exception - //process definition offline - // List schedules = new ArrayList<>(); - // Schedule schedule = getSchedule(); - // schedules.add(schedule); - // Mockito.when(scheduleMapper.selectAllByProcessDefineArray(new int[]{46})).thenReturn(schedules); - // Mockito.when(scheduleMapper.updateById(schedule)).thenReturn(1); - // Map offlineRes = processDefinitionService.releaseProcessDefinition(loginUser, "project_test1", - // 46, ReleaseState.OFFLINE.getCode()); - // Assert.assertEquals(Status.SUCCESS, offlineRes.get(Constants.STATUS)); } @Test @@ -746,7 +717,6 @@ public class ProcessDefinitionServiceTest { //success Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(new ProcessData()); - processDefinition.setProcessDefinitionJson(SHELL_JSON); Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Map dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionId(46); Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS)); @@ -763,29 +733,37 @@ public class ProcessDefinitionServiceTest { //process definition exist ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(SHELL_JSON); List processDefinitionList = new ArrayList<>(); processDefinitionList.add(processDefinition); Mockito.when(processDefineMapper.queryDefinitionListByIdList(idArray)).thenReturn(processDefinitionList); - String processDefinitionJson = "{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}" - + ",\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"" - + ",\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}" - + ",\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"" - + ",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"" - + ",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"; - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + ProcessData processData = getProcessData(); Mockito.when(processService.genProcessData(processDefinition)).thenReturn(processData); Map successRes = processDefinitionService.getTaskNodeListByDefinitionIdList(defineIdList); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } + private ProcessData getProcessData() { + ProcessData processData = new ProcessData(); + List taskNodeList = new ArrayList<>(); + processData.setTasks(taskNodeList); + List properties = new ArrayList<>(); + processData.setGlobalParams(properties); + processData.setTenantId(10); + processData.setTimeout(100); + return processData; + } + @Test public void testQueryProcessDefinitionAllByProjectId() { int projectId = 1; + Long projectCode = 2L; + Project project = new Project(); + project.setId(projectId); + project.setCode(projectCode); + Mockito.when(projectMapper.selectById(projectId)).thenReturn(project); + ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(SHELL_JSON); List processDefinitionList = new ArrayList<>(); processDefinitionList.add(processDefinition); Project test = getProject("test"); @@ -825,7 +803,7 @@ public class ProcessDefinitionServiceTest { //task instance not exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); - Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); + Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Map taskNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); @@ -893,7 +871,7 @@ public class ProcessDefinitionServiceTest { //task instance exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); - Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); + Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList); Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); @@ -1188,6 +1166,7 @@ public class ProcessDefinitionServiceTest { processDefinition.setProjectId(2); processDefinition.setTenantId(1); processDefinition.setDescription(""); + processDefinition.setCode(9999L); return processDefinition; } @@ -1284,15 +1263,7 @@ public class ProcessDefinitionServiceTest { Integer processDefinitionId = 111; ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(processDefinitionId); - processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":" - + "[\"\"]},\"delayTime\":\"0\",\"dependence\":{}," - + "\"description\":\"\",\"id\":\"tasks-3011\",\"maxRetryTimes\":\"0\",\"name\":\"tsssss\"," - + "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}," - + "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\"," - + "\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":\"SHELL\"," - + "\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); - Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinitionId, processDefinition)); + Assert.assertNotNull(processDefinitionService.exportProcessMetaData(processDefinition)); } @Test diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index ea8f3c4faf..121c832f33 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -130,11 +130,24 @@ public class ProcessInstanceServiceTest { "192.168.xx.xx", 1, 10); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, proejctAuthFailRes.get(Constants.STATUS)); + Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); + Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); + ProcessInstance processInstance = getProcessInstance(); + List processInstanceList = new ArrayList<>(); + Page pageReturn = new Page<>(1, 10); + processInstanceList.add(processInstance); + pageReturn.setRecords(processInstanceList); + // data parameter check putMsg(result, Status.SUCCESS, projectName); Project project = getProject(projectName); when(projectMapper.queryByName(projectName)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); + when(processDefineMapper.selectById(Mockito.anyInt())).thenReturn(getProcessDefinition()); + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class) + , Mockito.any(), Mockito.any(), Mockito.any(),Mockito.any(), Mockito.any(), + eq("192.168.xx.xx"), Mockito.any(), Mockito.any())).thenReturn(pageReturn); + Map dataParameterRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "20200101 00:00:00", "20200102 00:00:00", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS, "192.168.xx.xx", 1, 10); @@ -142,18 +155,12 @@ public class ProcessInstanceServiceTest { //project auth success putMsg(result, Status.SUCCESS, projectName); - Date start = DateUtils.getScheduleDate("2020-01-01 00:00:00"); - Date end = DateUtils.getScheduleDate("2020-01-02 00:00:00"); - ProcessInstance processInstance = getProcessInstance(); - List processInstanceList = new ArrayList<>(); - Page pageReturn = new Page<>(1, 10); - processInstanceList.add(processInstance); - pageReturn.setRecords(processInstanceList); + when(projectMapper.queryByName(projectName)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); - when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(), + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); Map successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00", @@ -162,7 +169,7 @@ public class ProcessInstanceServiceTest { Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); // data parameter empty - when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(), + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(), eq("192.168.xx.xx"), eq(null), eq(null))).thenReturn(pageReturn); successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "", "", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS, @@ -178,7 +185,7 @@ public class ProcessInstanceServiceTest { Assert.assertEquals(Status.SUCCESS, executorExistRes.get(Constants.STATUS)); //executor name empty - when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(0), Mockito.any(), + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); Map executorEmptyRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", "", ExecutionStatus.SUBMITTED_SUCCESS, @@ -532,6 +539,7 @@ public class ProcessInstanceServiceTest { */ private Project getProject(String projectName) { Project project = new Project(); + project.setCode(1L); project.setId(1); project.setName(projectName); project.setUserId(1); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java index 6c09064eae..7d6f7d3df1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; */ public class DependentItem { - private int definitionId; + private Long definitionCode; private String depTasks; private String cycle; private String dateValue; @@ -34,18 +34,18 @@ public class DependentItem { public String getKey(){ return String.format("%d-%s-%s-%s", - getDefinitionId(), + getDefinitionCode(), getDepTasks(), getCycle(), getDateValue()); } - public int getDefinitionId() { - return definitionId; + public Long getDefinitionCode() { + return definitionCode; } - public void setDefinitionId(int definitionId) { - this.definitionId = definitionId; + public void setDefinitionCode(Long definitionCode) { + this.definitionCode = definitionCode; } public String getDepTasks() { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index c08b83b0f9..2cc69f7ba4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -52,6 +52,7 @@ public class ProcessInstance { * process definition id * TODO delete */ + @TableField(exist = false) private int processDefinitionId; /** @@ -160,6 +161,7 @@ public class ProcessInstance { * process instance json * TODO delete */ + @TableField(exist = false) private String processInstanceJson; /** @@ -193,11 +195,13 @@ public class ProcessInstance { /** * task locations for web */ + @TableField(exist = false) private String locations; /** * task connects for web */ + @TableField(exist = false) private String connects; /** @@ -208,6 +212,7 @@ public class ProcessInstance { /** * depend processes schedule time */ + @TableField(exist = false) private String dependenceScheduleTimes; /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index e660d99be3..6e53b45eb4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -37,6 +37,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance detail info by id + * * @param processId processId * @return process instance */ @@ -44,6 +45,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance by host and stateArray + * * @param host host * @param stateArray stateArray * @return process instance list @@ -53,21 +55,23 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance by tenantId and stateArray + * * @param tenantId tenantId * @param states states array * @return process instance list */ List queryByTenantIdAndStatus(@Param("tenantId") int tenantId, - @Param("states") int[] states); + @Param("states") int[] states); /** * query process instance by worker group and stateArray + * * @param workerGroupId workerGroupId * @param states states array * @return process instance list */ List queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId, - @Param("states") int[] states); + @Param("states") int[] states); /** * process instance page @@ -85,9 +89,10 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * process instance page + * * @param page page - * @param projectId projectId - * @param processDefinitionId processDefinitionId + * @param projectCode projectCode + * @param processDefinitionCode processDefinitionCode * @param searchVal searchVal * @param executorId executorId * @param statusArray statusArray @@ -97,8 +102,8 @@ public interface ProcessInstanceMapper extends BaseMapper { * @return process instance page */ IPage queryProcessInstanceListPaging(Page page, - @Param("projectId") int projectId, - @Param("processDefinitionId") Integer processDefinitionId, + @Param("projectCode") Long projectCode, + @Param("processDefinitionCode") Long processDefinitionCode, @Param("searchVal") String searchVal, @Param("executorId") Integer executorId, @Param("states") int[] statusArray, @@ -108,6 +113,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * set failover by host and state array + * * @param host host * @param stateArray stateArray * @return set result @@ -117,7 +123,8 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * update process instance by state - * @param originState originState + * + * @param originState originState * @param destState destState * @return update result */ @@ -125,7 +132,8 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("destState") ExecutionStatus destState); /** - * update process instance by tenantId + * update process instance by tenantId + * * @param originTenantId originTenantId * @param destTenantId destTenantId * @return update result @@ -135,6 +143,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * update process instance by worker groupId + * * @param originWorkerGroupId originWorkerGroupId * @param destWorkerGroupId destWorkerGroupId * @return update result @@ -143,6 +152,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * count process instance state by user + * * @param startTime startTime * @param endTime endTime * @param projectCodes projectCodes @@ -154,74 +164,76 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("projectCodes") Long[] projectCodes); /** - * query process instance by processDefinitionId - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode + * + * @param processDefinitionCode processDefinitionCode * @param size size * @return process instance list */ - List queryByProcessDefineId( - @Param("processDefinitionId") int processDefinitionId, - @Param("size") int size); + List queryByProcessDefineCode(@Param("processDefinitionCode") Long processDefinitionCode, + @Param("size") int size); /** * query last scheduler process instance - * @param definitionId processDefinitionId + * + * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime * @return process instance */ - ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionId") int definitionId, + ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime); /** * query last running process instance - * @param definitionId definitionId + * + * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime * @param stateArray stateArray * @return process instance */ - ProcessInstance queryLastRunningProcess(@Param("processDefinitionId") int definitionId, + ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime, @Param("states") int[] stateArray); /** * query last manual process instance - * @param definitionId definitionId + * + * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime * @return process instance */ - ProcessInstance queryLastManualProcess(@Param("processDefinitionId") int definitionId, + ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime); + /** * query top n process instance order by running duration - * @param size + * * @param status process instance status - * @param startTime - * @param endTime * @return ProcessInstance list */ List queryTopNProcessInstance(@Param("size") int size, @Param("startTime") Date startTime, @Param("endTime") Date endTime, - @Param("status")ExecutionStatus status); + @Param("status") ExecutionStatus status); + /** - * query process instance by processDefinitionId and stateArray - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode and stateArray + * + * @param processDefinitionCode processDefinitionCode * @param states states array * @return process instance list */ - List queryByProcessDefineIdAndStatus( - @Param("processDefinitionId") int processDefinitionId, - @Param("states") int[] states); + List queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode, + @Param("states") int[] states); - int updateGlobalParamsById( - @Param("globalParams") String globalParams, - @Param("id") int id); + int updateGlobalParamsById(@Param("globalParams") String globalParams, + @Param("id") int id); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index a4eb5bd042..2e5a4fea13 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -19,10 +19,10 @@ - id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, + id, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, - warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag, - update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times, + warning_group_id, schedule_time, command_start_time, global_params, flag, + update_time, is_sub_process, executor_id, history_cmd, process_instance_priority, worker_group, timeout, tenant_id, var_pool select t.state, count(0) as count from t_ds_process_instance t - join t_ds_process_definition d on d.id=t.process_definition_id - join t_ds_project p on p.id=d.project_id + join t_ds_process_definition d on d.code=t.process_definition_code + join t_ds_project p on p.code=d.project_code where 1 = 1 and t.is_sub_process = 0 @@ -162,18 +162,18 @@ group by t.state - select from t_ds_process_instance - where process_definition_id=#{processDefinitionId} + where process_definition_code=#{processDefinitionCode} order by start_time desc limit #{size} - select from t_ds_process_instance - where process_definition_id=#{processDefinitionId} + where process_definition_code=#{processDefinitionCode} and state in #{i} @@ -224,4 +223,4 @@ set global_params = #{globalParams} where id = #{id} - + \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml index 53f073650a..90718320ba 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml @@ -75,9 +75,10 @@ , u.user_name as user_name, - (SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_id = p.id) AS def_count, - (SELECT COUNT(*) FROM t_ds_process_definition def, t_ds_process_instance inst WHERE def.id = - inst.process_definition_id AND def.project_id = p.id AND inst.state=1 ) as inst_running_count + (SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_code = p.code) AS def_count, + (SELECT COUNT(*) FROM t_ds_process_definition_log def, t_ds_process_instance inst WHERE def.code = + inst.process_definition_code and def.version = inst.process_definition_version AND def.project_code = p.code + AND inst.state=1 ) as inst_running_count from t_ds_project p join t_ds_user u on u.id=p.user_id where 1=1 diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 35709f4509..b142ce8dd6 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -73,7 +73,7 @@ select state, count(0) as count from t_ds_task_instance t left join t_ds_process_definition d on d.code=t.process_definition_code - left join t_ds_project p on p.id=d.project_id + left join t_ds_project p on p.code=d.project_code where 1=1 and d.project_code in diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java index 17e46ffda0..6e18aa97d2 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ErrorCommandMapperTest.java @@ -73,6 +73,7 @@ public class ErrorCommandMapperTest { ErrorCommand errorCommand = insertOne(); ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(1L); processDefinition.setName("def 1"); processDefinition.setProjectCode(1010L); processDefinition.setUserId(101); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java index b294eeeb4c..6e7bbe00a7 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java @@ -98,7 +98,7 @@ public class ProcessDefinitionLogMapperTest { processDefinitionLogMapper.insert(processDefinitionLog); return processDefinitionLog; } - + @Test public void testInsert() { ProcessDefinitionLog processDefinitionLog = insertOne(); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java index 815287aa65..6ff1778edf 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java @@ -20,9 +20,14 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.dao.entity.*; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; + +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,8 +37,8 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.List; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @RunWith(SpringRunner.class) @SpringBootTest @@ -54,15 +59,12 @@ public class ProcessInstanceMapperTest { /** * insert process instance with specified start time and end time,set state to SUCCESS - * - * @param startTime - * @param endTime - * @return */ private ProcessInstance insertOne(Date startTime, Date endTime) { ProcessInstance processInstance = new ProcessInstance(); Date start = startTime; Date end = endTime; + processInstance.setProcessDefinitionCode(1L); processInstance.setStartTime(start); processInstance.setEndTime(end); processInstance.setState(ExecutionStatus.SUCCESS); @@ -73,13 +75,15 @@ public class ProcessInstanceMapperTest { /** * insert + * * @return ProcessInstance */ - private ProcessInstance insertOne(){ + private ProcessInstance insertOne() { //insertOne ProcessInstance processInstance = new ProcessInstance(); - Date start = new Date(2019-1900, 1-1, 1, 0, 10,0); - Date end = new Date(2019-1900, 1-1, 1, 1, 0,0); + Date start = new Date(2019 - 1900, 1 - 1, 1, 0, 10, 0); + Date end = new Date(2019 - 1900, 1 - 1, 1, 1, 0, 0); + processInstance.setProcessDefinitionCode(1L); processInstance.setStartTime(start); processInstance.setEndTime(end); processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); @@ -92,7 +96,7 @@ public class ProcessInstanceMapperTest { * test update */ @Test - public void testUpdate(){ + public void testUpdate() { //insertOne ProcessInstance processInstanceMap = insertOne(); //update @@ -105,7 +109,7 @@ public class ProcessInstanceMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { ProcessInstance processInstanceMap = insertOne(); int delete = processInstanceMapper.deleteById(processInstanceMap.getId()); Assert.assertEquals(1, delete); @@ -168,12 +172,16 @@ public class ProcessInstanceMapperTest { ExecutionStatus.SUCCESS.ordinal()}; ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setCode(1L); processDefinition.setProjectId(1010); + processDefinition.setProjectCode(1L); processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinition.setUpdateTime(new Date()); + processDefinition.setCreateTime(new Date()); processDefinitionMapper.insert(processDefinition); ProcessInstance processInstance = insertOne(); - processInstance.setProcessDefinitionId(processDefinition.getId()); + processInstance.setProcessDefinitionCode(processDefinition.getCode()); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setIsSubProcess(Flag.NO); processInstance.setStartTime(new Date()); @@ -185,8 +193,8 @@ public class ProcessInstanceMapperTest { IPage processInstanceIPage = processInstanceMapper.queryProcessInstanceListPaging( page, - processDefinition.getProjectId(), - processInstance.getProcessDefinitionId(), + processDefinition.getProjectCode(), + processInstance.getProcessDefinitionCode(), processInstance.getName(), 0, stateArray, @@ -252,10 +260,18 @@ public class ProcessInstanceMapperTest { Project project = new Project(); project.setName("testProject"); + project.setCode(1L); + project.setCreateTime(new Date()); + project.setUpdateTime(new Date()); projectMapper.insert(project); ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setProjectId(project.getId()); + processDefinition.setCode(1L); + processDefinition.setProjectId(1010); + processDefinition.setProjectCode(1L); + processDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinition.setUpdateTime(new Date()); + processDefinition.setCreateTime(new Date()); processDefinitionMapper.insert(processDefinition); ProcessInstance processInstance = insertOne(); @@ -283,10 +299,10 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance1 = insertOne(); - List processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 1); + List processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 1); Assert.assertEquals(1, processInstances.size()); - processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 2); + processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 2); Assert.assertEquals(2, processInstances.size()); processInstanceMapper.deleteById(processInstance.getId()); @@ -302,7 +318,7 @@ public class ProcessInstanceMapperTest { processInstance.setScheduleTime(new Date()); processInstanceMapper.updateById(processInstance); - ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionId(), null, null ); + ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null); Assert.assertNotEquals(processInstance1, null); processInstanceMapper.deleteById(processInstance.getId()); } @@ -320,7 +336,7 @@ public class ProcessInstanceMapperTest { ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.SUBMITTED_SUCCESS.ordinal()}; - ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionId(), null, null , stateArray); + ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionCode(), null, null, stateArray); Assert.assertNotEquals(processInstance1, null); processInstanceMapper.deleteById(processInstance.getId()); @@ -334,14 +350,14 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance = insertOne(); processInstanceMapper.updateById(processInstance); - Date start = new Date(2019-1900, 1-1, 01, 0, 0, 0); - Date end = new Date(2019-1900, 1-1, 01, 5, 0, 0); - ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end + Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0); + Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0); + ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end ); Assert.assertEquals(processInstance1.getId(), processInstance.getId()); - start = new Date(2019-1900, 1-1, 01, 1, 0, 0); - processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end + start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0); + processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end ); Assert.assertNull(processInstance1); @@ -352,9 +368,6 @@ public class ProcessInstanceMapperTest { /** * test whether it is in descending order by running duration - * - * @param processInstances - * @return */ private boolean isSortedByDuration(List processInstances) { for (int i = 1; i < processInstances.size(); i++) { @@ -383,7 +396,7 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance3 = insertOne(startTime3, endTime3); Date start = new Date(2020, 1, 1, 1, 1, 1); Date end = new Date(2021, 1, 1, 1, 1, 1); - List processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end,ExecutionStatus.SUCCESS); + List processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS); Assert.assertEquals(2, processInstances.size()); Assert.assertTrue(isSortedByDuration(processInstances)); for (ProcessInstance processInstance : processInstances) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 7f76baaa52..09c98fc86a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -101,7 +101,7 @@ public class DependentExecute { DependResult result = DependResult.FAILED; for(DateInterval dateInterval : dateIntervals){ - ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), + ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(), dateInterval); if(processInstance == null){ return DependResult.WAITING; @@ -170,24 +170,20 @@ public class DependentExecute { * find the last one process instance that : * 1. manual run and finish between the interval * 2. schedule run and schedule time between the interval - * @param definitionId definition id + * @param definitionCode definition code * @param dateInterval date interval * @return ProcessInstance */ - private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) { + private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) { - ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime()); + ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); if(runningProcess != null){ return runningProcess; } - ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval( - definitionId, dateInterval - ); + ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(definitionCode, dateInterval); - ProcessInstance lastManualProcess = processService.findLastManualProcessInterval( - definitionId, dateInterval - ); + ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval); if(lastManualProcess ==null){ return lastSchedulerProcess; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 7609c4549c..8b0410b3f3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -120,7 +120,7 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel = new DependentTaskModel(); dependentTaskModel.setRelation(DependentRelation.AND); dependentTaskModel.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day") ).collect(Collectors.toList())); DependentParameters dependentParameters = new DependentParameters(); @@ -140,7 +140,7 @@ public class DependentTaskTest { getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(dependentProcessInstance); // for DependentExecute.getDependTaskResult @@ -163,7 +163,7 @@ public class DependentTaskTest { getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(dependentProcessInstance); // for DependentExecute.getDependTaskResult @@ -184,15 +184,15 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel1 = new DependentTaskModel(); dependentTaskModel1.setRelation(DependentRelation.AND); dependentTaskModel1.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day"), - getDependentItemFromTaskNode(3, "B", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day"), + getDependentItemFromTaskNode(3L, "B", "today", "day") ).collect(Collectors.toList())); DependentTaskModel dependentTaskModel2 = new DependentTaskModel(); dependentTaskModel2.setRelation(DependentRelation.OR); dependentTaskModel2.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day"), - getDependentItemFromTaskNode(3, "C", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day"), + getDependentItemFromTaskNode(3L, "C", "today", "day") ).collect(Collectors.toList())); /* @@ -217,10 +217,10 @@ public class DependentTaskTest { // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(processInstance200); Mockito.when(processService - .findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any())) .thenReturn(processInstance300); // for DependentExecute.getDependTaskResult @@ -249,7 +249,7 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel = new DependentTaskModel(); dependentTaskModel.setRelation(DependentRelation.AND); dependentTaskModel.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day") + getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL, "today", "day") ).collect(Collectors.toList())); DependentParameters dependentParameters = new DependentParameters(); @@ -267,7 +267,7 @@ public class DependentTaskTest { testDependentOnAllInit(); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS)); DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); @@ -280,7 +280,7 @@ public class DependentTaskTest { testDependentOnAllInit(); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE)); DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance); @@ -302,7 +302,7 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel = new DependentTaskModel(); dependentTaskModel.setRelation(DependentRelation.AND); dependentTaskModel.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day") ).collect(Collectors.toList())); DependentParameters dependentParameters = new DependentParameters(); @@ -318,7 +318,7 @@ public class DependentTaskTest { getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(dependentProcessInstance); DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); @@ -373,12 +373,9 @@ public class DependentTaskTest { /** * DependentItem defines the condition for the dependent */ - private DependentItem getDependentItemFromTaskNode( - int processDefinitionId, String taskName, - String date, String cycle - ) { + private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, String taskName, String date, String cycle) { DependentItem dependentItem = new DependentItem(); - dependentItem.setDefinitionId(processDefinitionId); + dependentItem.setDefinitionCode(processDefinitionCode); dependentItem.setDepTasks(taskName); dependentItem.setDateValue(date); dependentItem.setCycle(cycle); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 13eb7266c8..18c8b8c1c8 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1582,7 +1582,6 @@ public class ProcessService { String locations, String connects) { ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); if (processInstance != null) { - processInstance.setProcessInstanceJson(processJson); processInstance.setGlobalParams(globalParams); processInstance.setScheduleTime(scheduleTime); processInstance.setLocations(locations); @@ -1911,12 +1910,12 @@ public class ProcessService { /** * find last scheduler process instance in the date interval * - * @param definitionId definitionId + * @param definitionCode definitionCode * @param dateInterval dateInterval * @return process instance */ - public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) { - return processInstanceMapper.queryLastSchedulerProcess(definitionId, + public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) { + return processInstanceMapper.queryLastSchedulerProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); } @@ -1924,12 +1923,12 @@ public class ProcessService { /** * find last manual process instance interval * - * @param definitionId process definition id + * @param definitionCode process definition code * @param dateInterval dateInterval * @return process instance */ - public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) { - return processInstanceMapper.queryLastManualProcess(definitionId, + public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) { + return processInstanceMapper.queryLastManualProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); } @@ -1937,13 +1936,13 @@ public class ProcessService { /** * find last running process instance * - * @param definitionId process definition id + * @param definitionCode process definition code * @param startTime start time * @param endTime end time * @return process instance */ - public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { - return processInstanceMapper.queryLastRunningProcess(definitionId, + public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) { + return processInstanceMapper.queryLastRunningProcess(definitionCode, startTime, endTime, stateArray); @@ -2460,4 +2459,4 @@ public class ProcessService { }); return new ArrayList<>(taskNodeMap.values()); } -} +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 32ffe99d66..364b283c34 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -315,28 +315,11 @@ public class ProcessServiceTest { public void testRecurseFindSubProcessId() { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setCode(10L); - processDefinition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\":" - + "{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\"" - + ",\"dependence\":{},\"description\":\"\",\"id\":\"tasks-76544\"" - + ",\"maxRetryTimes\":\"0\",\"name\":\"test\",\"params\":{\"localParams\":[]," - + "\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[],\"processDefinitionId\"" - + ":\"222\"},\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\"," - + "\"taskInstancePriority\":\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":" - + "null,\"strategy\":\"\"},\"type\":\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}]," - + "\"tenantId\":4,\"timeout\":0}"); int parentId = 111; List ids = new ArrayList<>(); ProcessDefinition processDefinition2 = new ProcessDefinition(); processDefinition2.setCode(11L); - processDefinition2.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"conditionResult\"" - + ":{\"failedNode\":[\"\"],\"successNode\":[\"\"]},\"delayTime\":\"0\",\"dependence\":{}," - + "\"description\":\"\",\"id\":\"tasks-76544\",\"maxRetryTimes\":\"0\",\"name\":\"test\"," - + "\"params\":{\"localParams\":[],\"rawScript\":\"echo \\\"123123\\\"\",\"resourceList\":[]}," - + "\"preTasks\":[],\"retryInterval\":\"1\",\"runFlag\":\"NORMAL\",\"taskInstancePriority\":" - + "\"MEDIUM\",\"timeout\":{\"enable\":false,\"interval\":null,\"strategy\":\"\"},\"type\":" - + "\"SHELL\",\"waitStartTimeout\":{},\"workerGroup\":\"default\"}],\"tenantId\":4,\"timeout\":0}"); Mockito.when(processDefineMapper.selectById(parentId)).thenReturn(processDefinition); - List relationLogList = new ArrayList<>(); Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong() , Mockito.anyInt()))