From 39a9c0c3925136b7c3a75ba12f56586dd94d1b5b Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Sun, 7 Mar 2021 18:12:01 +0800 Subject: [PATCH] [Feature][jsonspilt]modify process instance for project home page (#4991) * modify checkDAGRing and ProcessService method * merge * modify dagRing * modify process instance for project home page Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../api/service/ProcessInstanceService.java | 10 +-- .../impl/ProcessDefinitionServiceImpl.java | 4 +- .../impl/ProcessInstanceServiceImpl.java | 28 +++---- .../service/ProcessDefinitionServiceTest.java | 4 +- .../service/ProcessInstanceServiceTest.java | 6 +- .../common/model/DependentItem.java | 12 +-- .../dao/mapper/ProcessInstanceMapper.java | 74 +++++++++++-------- .../dao/mapper/ProcessInstanceMapper.xml | 39 +++++----- .../dao/mapper/ProjectMapper.xml | 7 +- .../dao/mapper/TaskInstanceMapper.xml | 2 +- .../dao/mapper/ProcessInstanceMapperTest.java | 59 +++++++-------- .../server/utils/DependentExecute.java | 16 ++-- .../server/master/DependentTaskTest.java | 35 ++++----- .../service/process/ProcessService.java | 18 ++--- 14 files changed, 159 insertions(+), 155 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 6896c3ae98..a3f541ffe2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -151,19 +151,19 @@ public interface ProcessInstanceService { /** * query process instance by processDefinitionId and stateArray * - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @param states states array * @return process instance list */ - List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states); + List queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states); /** - * query process instance by processDefinitionId + * query process instance by processDefinitionCode * - * @param processDefinitionId processDefinitionId + * @param processDefinitionCode processDefinitionCode * @param size size * @return process instance list */ - List queryByProcessDefineId(int processDefinitionId,int size); + List queryByProcessDefineCode(Long processDefinitionCode,int size); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index db972c4b6b..34e7cabf6b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -484,7 +484,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } // check process instances is already running - List processInstances = processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, Constants.NOT_TERMINATED_STATES); + List processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES); if (CollectionUtils.isNotEmpty(processInstances)) { putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, processInstances.size()); return result; @@ -1259,7 +1259,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * List of process instances */ - List processInstanceList = processInstanceService.queryByProcessDefineId(processId, limit); + List processInstanceList = processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), limit); for (ProcessInstance processInstance : processInstanceList) { processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index a83900e322..69480618d9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -257,9 +257,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); int executorId = usersService.getUserIdByName(executorName); + ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefineId); + IPage processInstanceList = processInstanceMapper.queryProcessInstanceListPaging(page, - project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end); + project.getCode(), processDefinition.getCode(), searchVal, executorId, statusArray, host, start, end); List processInstances = processInstanceList.getRecords(); @@ -514,6 +516,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.setProcessInstanceJson(processInstanceJson); processInstance.setGlobalParams(globalParams); } + /** * query parent process instance detail info by sub process instance id * @@ -645,10 +648,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * get local params - * - * @param processInstance - * @param timeParams - * @return */ private Map> getLocalParams(ProcessInstance processInstance, Map timeParams) { Map> localUserDefParams = new HashMap<>(); @@ -674,6 +673,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } return localUserDefParams; } + /** * encapsulation gantt structure * @@ -732,25 +732,27 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } /** - * query process instance by processDefinitionId and stateArray - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode and stateArray + * + * @param processDefinitionCode processDefinitionCode * @param states states array * @return process instance list */ @Override - public List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) { - return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states); + public List queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states) { + return processInstanceMapper.queryByProcessDefineCodeAndStatus(processDefinitionCode, states); } /** - * query process instance by processDefinitionId - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode + * + * @param processDefinitionCode processDefinitionCode * @param size size * @return process instance list */ @Override - public List queryByProcessDefineId(int processDefinitionId, int size) { - return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size); + public List queryByProcessDefineCode(Long processDefinitionCode, int size) { + return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size); } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 5f87999fcc..4f9620824f 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -825,7 +825,7 @@ public class ProcessDefinitionServiceTest { //task instance not exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); - Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); + Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); Map taskNullRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); @@ -893,7 +893,7 @@ public class ProcessDefinitionServiceTest { //task instance exist Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); - Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList); + Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList); Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index ea8f3c4faf..a17d2019e7 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -153,7 +153,7 @@ public class ProcessInstanceServiceTest { when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser); when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId()); - when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(), + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser); Map successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00", @@ -162,7 +162,7 @@ public class ProcessInstanceServiceTest { Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); // data parameter empty - when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(), + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(), eq("192.168.xx.xx"), eq(null), eq(null))).thenReturn(pageReturn); successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "", "", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS, @@ -178,7 +178,7 @@ public class ProcessInstanceServiceTest { Assert.assertEquals(Status.SUCCESS, executorExistRes.get(Constants.STATUS)); //executor name empty - when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(0), Mockito.any(), + when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(0), Mockito.any(), eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn); Map executorEmptyRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00", "2020-01-02 00:00:00", "", "", ExecutionStatus.SUBMITTED_SUCCESS, diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java index 6c09064eae..7d6f7d3df1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; */ public class DependentItem { - private int definitionId; + private Long definitionCode; private String depTasks; private String cycle; private String dateValue; @@ -34,18 +34,18 @@ public class DependentItem { public String getKey(){ return String.format("%d-%s-%s-%s", - getDefinitionId(), + getDefinitionCode(), getDepTasks(), getCycle(), getDateValue()); } - public int getDefinitionId() { - return definitionId; + public Long getDefinitionCode() { + return definitionCode; } - public void setDefinitionId(int definitionId) { - this.definitionId = definitionId; + public void setDefinitionCode(Long definitionCode) { + this.definitionCode = definitionCode; } public String getDepTasks() { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index e660d99be3..6e53b45eb4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -37,6 +37,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance detail info by id + * * @param processId processId * @return process instance */ @@ -44,6 +45,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance by host and stateArray + * * @param host host * @param stateArray stateArray * @return process instance list @@ -53,21 +55,23 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance by tenantId and stateArray + * * @param tenantId tenantId * @param states states array * @return process instance list */ List queryByTenantIdAndStatus(@Param("tenantId") int tenantId, - @Param("states") int[] states); + @Param("states") int[] states); /** * query process instance by worker group and stateArray + * * @param workerGroupId workerGroupId * @param states states array * @return process instance list */ List queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId, - @Param("states") int[] states); + @Param("states") int[] states); /** * process instance page @@ -85,9 +89,10 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * process instance page + * * @param page page - * @param projectId projectId - * @param processDefinitionId processDefinitionId + * @param projectCode projectCode + * @param processDefinitionCode processDefinitionCode * @param searchVal searchVal * @param executorId executorId * @param statusArray statusArray @@ -97,8 +102,8 @@ public interface ProcessInstanceMapper extends BaseMapper { * @return process instance page */ IPage queryProcessInstanceListPaging(Page page, - @Param("projectId") int projectId, - @Param("processDefinitionId") Integer processDefinitionId, + @Param("projectCode") Long projectCode, + @Param("processDefinitionCode") Long processDefinitionCode, @Param("searchVal") String searchVal, @Param("executorId") Integer executorId, @Param("states") int[] statusArray, @@ -108,6 +113,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * set failover by host and state array + * * @param host host * @param stateArray stateArray * @return set result @@ -117,7 +123,8 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * update process instance by state - * @param originState originState + * + * @param originState originState * @param destState destState * @return update result */ @@ -125,7 +132,8 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("destState") ExecutionStatus destState); /** - * update process instance by tenantId + * update process instance by tenantId + * * @param originTenantId originTenantId * @param destTenantId destTenantId * @return update result @@ -135,6 +143,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * update process instance by worker groupId + * * @param originWorkerGroupId originWorkerGroupId * @param destWorkerGroupId destWorkerGroupId * @return update result @@ -143,6 +152,7 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * count process instance state by user + * * @param startTime startTime * @param endTime endTime * @param projectCodes projectCodes @@ -154,74 +164,76 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("projectCodes") Long[] projectCodes); /** - * query process instance by processDefinitionId - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode + * + * @param processDefinitionCode processDefinitionCode * @param size size * @return process instance list */ - List queryByProcessDefineId( - @Param("processDefinitionId") int processDefinitionId, - @Param("size") int size); + List queryByProcessDefineCode(@Param("processDefinitionCode") Long processDefinitionCode, + @Param("size") int size); /** * query last scheduler process instance - * @param definitionId processDefinitionId + * + * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime * @return process instance */ - ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionId") int definitionId, + ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime); /** * query last running process instance - * @param definitionId definitionId + * + * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime * @param stateArray stateArray * @return process instance */ - ProcessInstance queryLastRunningProcess(@Param("processDefinitionId") int definitionId, + ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime, @Param("states") int[] stateArray); /** * query last manual process instance - * @param definitionId definitionId + * + * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime * @return process instance */ - ProcessInstance queryLastManualProcess(@Param("processDefinitionId") int definitionId, + ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime); + /** * query top n process instance order by running duration - * @param size + * * @param status process instance status - * @param startTime - * @param endTime * @return ProcessInstance list */ List queryTopNProcessInstance(@Param("size") int size, @Param("startTime") Date startTime, @Param("endTime") Date endTime, - @Param("status")ExecutionStatus status); + @Param("status") ExecutionStatus status); + /** - * query process instance by processDefinitionId and stateArray - * @param processDefinitionId processDefinitionId + * query process instance by processDefinitionCode and stateArray + * + * @param processDefinitionCode processDefinitionCode * @param states states array * @return process instance list */ - List queryByProcessDefineIdAndStatus( - @Param("processDefinitionId") int processDefinitionId, - @Param("states") int[] states); + List queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode, + @Param("states") int[] states); - int updateGlobalParamsById( - @Param("globalParams") String globalParams, - @Param("id") int id); + int updateGlobalParamsById(@Param("globalParams") String globalParams, + @Param("id") int id); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index a4eb5bd042..cb2304ec06 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -19,10 +19,10 @@ - id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, + id, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host, command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type, - warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag, - update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times, + warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, + locations, connects, history_cmd, dependence_schedule_times, process_instance_priority, worker_group, timeout, tenant_id, var_pool select t.state, count(0) as count from t_ds_process_instance t - join t_ds_process_definition d on d.id=t.process_definition_id - join t_ds_project p on p.id=d.project_id + join t_ds_process_definition d on d.code=t.process_definition_code + join t_ds_project p on p.code=d.project_code where 1 = 1 and t.is_sub_process = 0 @@ -162,18 +162,18 @@ group by t.state - select from t_ds_process_instance - where process_definition_id=#{processDefinitionId} + where process_definition_code=#{processDefinitionCode} order by start_time desc limit #{size} - select from t_ds_process_instance - where process_definition_id=#{processDefinitionId} + where process_definition_code=#{processDefinitionCode} and state in #{i} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml index 53f073650a..90718320ba 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml @@ -75,9 +75,10 @@ , u.user_name as user_name, - (SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_id = p.id) AS def_count, - (SELECT COUNT(*) FROM t_ds_process_definition def, t_ds_process_instance inst WHERE def.id = - inst.process_definition_id AND def.project_id = p.id AND inst.state=1 ) as inst_running_count + (SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_code = p.code) AS def_count, + (SELECT COUNT(*) FROM t_ds_process_definition_log def, t_ds_process_instance inst WHERE def.code = + inst.process_definition_code and def.version = inst.process_definition_version AND def.project_code = p.code + AND inst.state=1 ) as inst_running_count from t_ds_project p join t_ds_user u on u.id=p.user_id where 1=1 diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index 35709f4509..b142ce8dd6 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -73,7 +73,7 @@ select state, count(0) as count from t_ds_task_instance t left join t_ds_process_definition d on d.code=t.process_definition_code - left join t_ds_project p on p.id=d.project_id + left join t_ds_project p on p.code=d.project_code where 1=1 and d.project_code in diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java index 815287aa65..d162a230ee 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java @@ -20,9 +20,14 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.dao.entity.*; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; + +import java.util.Date; +import java.util.List; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,8 +37,8 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.List; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @RunWith(SpringRunner.class) @SpringBootTest @@ -54,10 +59,6 @@ public class ProcessInstanceMapperTest { /** * insert process instance with specified start time and end time,set state to SUCCESS - * - * @param startTime - * @param endTime - * @return */ private ProcessInstance insertOne(Date startTime, Date endTime) { ProcessInstance processInstance = new ProcessInstance(); @@ -73,13 +74,14 @@ public class ProcessInstanceMapperTest { /** * insert + * * @return ProcessInstance */ - private ProcessInstance insertOne(){ + private ProcessInstance insertOne() { //insertOne ProcessInstance processInstance = new ProcessInstance(); - Date start = new Date(2019-1900, 1-1, 1, 0, 10,0); - Date end = new Date(2019-1900, 1-1, 1, 1, 0,0); + Date start = new Date(2019 - 1900, 1 - 1, 1, 0, 10, 0); + Date end = new Date(2019 - 1900, 1 - 1, 1, 1, 0, 0); processInstance.setStartTime(start); processInstance.setEndTime(end); processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); @@ -92,7 +94,7 @@ public class ProcessInstanceMapperTest { * test update */ @Test - public void testUpdate(){ + public void testUpdate() { //insertOne ProcessInstance processInstanceMap = insertOne(); //update @@ -105,7 +107,7 @@ public class ProcessInstanceMapperTest { * test delete */ @Test - public void testDelete(){ + public void testDelete() { ProcessInstance processInstanceMap = insertOne(); int delete = processInstanceMapper.deleteById(processInstanceMap.getId()); Assert.assertEquals(1, delete); @@ -185,8 +187,8 @@ public class ProcessInstanceMapperTest { IPage processInstanceIPage = processInstanceMapper.queryProcessInstanceListPaging( page, - processDefinition.getProjectId(), - processInstance.getProcessDefinitionId(), + processDefinition.getProjectCode(), + processInstance.getProcessDefinitionCode(), processInstance.getName(), 0, stateArray, @@ -283,10 +285,10 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance1 = insertOne(); - List processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 1); + List processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 1); Assert.assertEquals(1, processInstances.size()); - processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 2); + processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 2); Assert.assertEquals(2, processInstances.size()); processInstanceMapper.deleteById(processInstance.getId()); @@ -302,7 +304,7 @@ public class ProcessInstanceMapperTest { processInstance.setScheduleTime(new Date()); processInstanceMapper.updateById(processInstance); - ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionId(), null, null ); + ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null); Assert.assertNotEquals(processInstance1, null); processInstanceMapper.deleteById(processInstance.getId()); } @@ -320,7 +322,7 @@ public class ProcessInstanceMapperTest { ExecutionStatus.RUNNING_EXECUTION.ordinal(), ExecutionStatus.SUBMITTED_SUCCESS.ordinal()}; - ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionId(), null, null , stateArray); + ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionCode(), null, null, stateArray); Assert.assertNotEquals(processInstance1, null); processInstanceMapper.deleteById(processInstance.getId()); @@ -334,15 +336,13 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance = insertOne(); processInstanceMapper.updateById(processInstance); - Date start = new Date(2019-1900, 1-1, 01, 0, 0, 0); - Date end = new Date(2019-1900, 1-1, 01, 5, 0, 0); - ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end - ); + Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0); + Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0); + ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end); Assert.assertEquals(processInstance1.getId(), processInstance.getId()); - start = new Date(2019-1900, 1-1, 01, 1, 0, 0); - processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end - ); + start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0); + processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end); Assert.assertNull(processInstance1); processInstanceMapper.deleteById(processInstance.getId()); @@ -352,9 +352,6 @@ public class ProcessInstanceMapperTest { /** * test whether it is in descending order by running duration - * - * @param processInstances - * @return */ private boolean isSortedByDuration(List processInstances) { for (int i = 1; i < processInstances.size(); i++) { @@ -383,7 +380,7 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance3 = insertOne(startTime3, endTime3); Date start = new Date(2020, 1, 1, 1, 1, 1); Date end = new Date(2021, 1, 1, 1, 1, 1); - List processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end,ExecutionStatus.SUCCESS); + List processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS); Assert.assertEquals(2, processInstances.size()); Assert.assertTrue(isSortedByDuration(processInstances)); for (ProcessInstance processInstance : processInstances) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java index 7f76baaa52..09c98fc86a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java @@ -101,7 +101,7 @@ public class DependentExecute { DependResult result = DependResult.FAILED; for(DateInterval dateInterval : dateIntervals){ - ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(), + ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(), dateInterval); if(processInstance == null){ return DependResult.WAITING; @@ -170,24 +170,20 @@ public class DependentExecute { * find the last one process instance that : * 1. manual run and finish between the interval * 2. schedule run and schedule time between the interval - * @param definitionId definition id + * @param definitionCode definition code * @param dateInterval date interval * @return ProcessInstance */ - private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) { + private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) { - ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime()); + ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); if(runningProcess != null){ return runningProcess; } - ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval( - definitionId, dateInterval - ); + ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(definitionCode, dateInterval); - ProcessInstance lastManualProcess = processService.findLastManualProcessInterval( - definitionId, dateInterval - ); + ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval); if(lastManualProcess ==null){ return lastSchedulerProcess; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 7609c4549c..8b0410b3f3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -120,7 +120,7 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel = new DependentTaskModel(); dependentTaskModel.setRelation(DependentRelation.AND); dependentTaskModel.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day") ).collect(Collectors.toList())); DependentParameters dependentParameters = new DependentParameters(); @@ -140,7 +140,7 @@ public class DependentTaskTest { getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(dependentProcessInstance); // for DependentExecute.getDependTaskResult @@ -163,7 +163,7 @@ public class DependentTaskTest { getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(dependentProcessInstance); // for DependentExecute.getDependTaskResult @@ -184,15 +184,15 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel1 = new DependentTaskModel(); dependentTaskModel1.setRelation(DependentRelation.AND); dependentTaskModel1.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day"), - getDependentItemFromTaskNode(3, "B", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day"), + getDependentItemFromTaskNode(3L, "B", "today", "day") ).collect(Collectors.toList())); DependentTaskModel dependentTaskModel2 = new DependentTaskModel(); dependentTaskModel2.setRelation(DependentRelation.OR); dependentTaskModel2.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day"), - getDependentItemFromTaskNode(3, "C", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day"), + getDependentItemFromTaskNode(3L, "C", "today", "day") ).collect(Collectors.toList())); /* @@ -217,10 +217,10 @@ public class DependentTaskTest { // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(processInstance200); Mockito.when(processService - .findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any())) .thenReturn(processInstance300); // for DependentExecute.getDependTaskResult @@ -249,7 +249,7 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel = new DependentTaskModel(); dependentTaskModel.setRelation(DependentRelation.AND); dependentTaskModel.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day") + getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL, "today", "day") ).collect(Collectors.toList())); DependentParameters dependentParameters = new DependentParameters(); @@ -267,7 +267,7 @@ public class DependentTaskTest { testDependentOnAllInit(); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS)); DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); @@ -280,7 +280,7 @@ public class DependentTaskTest { testDependentOnAllInit(); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE)); DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance); @@ -302,7 +302,7 @@ public class DependentTaskTest { DependentTaskModel dependentTaskModel = new DependentTaskModel(); dependentTaskModel.setRelation(DependentRelation.AND); dependentTaskModel.setDependItemList(Stream.of( - getDependentItemFromTaskNode(2, "A", "today", "day") + getDependentItemFromTaskNode(2L, "A", "today", "day") ).collect(Collectors.toList())); DependentParameters dependentParameters = new DependentParameters(); @@ -318,7 +318,7 @@ public class DependentTaskTest { getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION); // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any())) .thenReturn(dependentProcessInstance); DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); @@ -373,12 +373,9 @@ public class DependentTaskTest { /** * DependentItem defines the condition for the dependent */ - private DependentItem getDependentItemFromTaskNode( - int processDefinitionId, String taskName, - String date, String cycle - ) { + private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, String taskName, String date, String cycle) { DependentItem dependentItem = new DependentItem(); - dependentItem.setDefinitionId(processDefinitionId); + dependentItem.setDefinitionCode(processDefinitionCode); dependentItem.setDepTasks(taskName); dependentItem.setDateValue(date); dependentItem.setCycle(cycle); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 13eb7266c8..2adc57b2d6 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1911,12 +1911,12 @@ public class ProcessService { /** * find last scheduler process instance in the date interval * - * @param definitionId definitionId + * @param definitionCode definitionCode * @param dateInterval dateInterval * @return process instance */ - public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) { - return processInstanceMapper.queryLastSchedulerProcess(definitionId, + public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) { + return processInstanceMapper.queryLastSchedulerProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); } @@ -1924,12 +1924,12 @@ public class ProcessService { /** * find last manual process instance interval * - * @param definitionId process definition id + * @param definitionCode process definition code * @param dateInterval dateInterval * @return process instance */ - public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) { - return processInstanceMapper.queryLastManualProcess(definitionId, + public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) { + return processInstanceMapper.queryLastManualProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime()); } @@ -1937,13 +1937,13 @@ public class ProcessService { /** * find last running process instance * - * @param definitionId process definition id + * @param definitionCode process definition code * @param startTime start time * @param endTime end time * @return process instance */ - public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) { - return processInstanceMapper.queryLastRunningProcess(definitionId, + public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) { + return processInstanceMapper.queryLastRunningProcess(definitionCode, startTime, endTime, stateArray);