Browse Source

[Feature][jsonspilt]modify process instance for project home page (#4991)

* modify checkDAGRing and ProcessService method

* merge

* modify dagRing

* modify process instance for project home page

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
39a9c0c392
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  5. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  6. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java
  7. 58
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  8. 39
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  9. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml
  10. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  11. 41
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  12. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java
  13. 35
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  14. 18
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -151,19 +151,19 @@ public interface ProcessInstanceService {
/**
* query process instance by processDefinitionId and stateArray
*
* @param processDefinitionId processDefinitionId
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states);
List<ProcessInstance> queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states);
/**
* query process instance by processDefinitionId
* query process instance by processDefinitionCode
*
* @param processDefinitionId processDefinitionId
* @param processDefinitionCode processDefinitionCode
* @param size size
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineId(int processDefinitionId,int size);
List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode,int size);
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -484,7 +484,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
// check process instances is already running
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineIdAndStatus(processDefinitionId, Constants.NOT_TERMINATED_STATES);
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, processInstances.size());
return result;
@ -1259,7 +1259,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* List of process instances
*/
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineId(processId, limit);
List<ProcessInstance> processInstanceList = processInstanceService.queryByProcessDefineCode(processDefinition.getCode(), limit);
for (ProcessInstance processInstance : processInstanceList) {
processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()));

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -257,9 +257,11 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
PageInfo<ProcessInstance> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefineId);
IPage<ProcessInstance> processInstanceList =
processInstanceMapper.queryProcessInstanceListPaging(page,
project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end);
project.getCode(), processDefinition.getCode(), searchVal, executorId, statusArray, host, start, end);
List<ProcessInstance> processInstances = processInstanceList.getRecords();
@ -514,6 +516,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
/**
* query parent process instance detail info by sub process instance id
*
@ -645,10 +648,6 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* get local params
*
* @param processInstance
* @param timeParams
* @return
*/
private Map<String, Map<String, Object>> getLocalParams(ProcessInstance processInstance, Map<String, String> timeParams) {
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
@ -674,6 +673,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
return localUserDefParams;
}
/**
* encapsulation gantt structure
*
@ -732,25 +732,27 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
}
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode and stateArray
*
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @return process instance list
*/
@Override
public List<ProcessInstance> queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) {
return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states);
public List<ProcessInstance> queryByProcessDefineCodeAndStatus(Long processDefinitionCode, int[] states) {
return processInstanceMapper.queryByProcessDefineCodeAndStatus(processDefinitionCode, states);
}
/**
* query process instance by processDefinitionId
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode
*
* @param processDefinitionCode processDefinitionCode
* @param size size
* @return process instance list
*/
@Override
public List<ProcessInstance> queryByProcessDefineId(int processDefinitionId, int size) {
return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size);
public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) {
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
}
}

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -825,7 +825,7 @@ public class ProcessDefinitionServiceTest {
//task instance not exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
@ -893,7 +893,7 @@ public class ProcessDefinitionServiceTest {
//task instance exist
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Mockito.when(processInstanceService.queryByProcessDefineId(46, 10)).thenReturn(processInstanceList);
Mockito.when(processInstanceService.queryByProcessDefineCode(46L, 10)).thenReturn(processInstanceList);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -153,7 +153,7 @@ public class ProcessInstanceServiceTest {
when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result);
when(usersService.queryUser(loginUser.getId())).thenReturn(loginUser);
when(usersService.getUserIdByName(loginUser.getUserName())).thenReturn(loginUser.getId());
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(),
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
when(usersService.queryUser(processInstance.getExecutorId())).thenReturn(loginUser);
Map<String, Object> successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
@ -162,7 +162,7 @@ public class ProcessInstanceServiceTest {
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
// data parameter empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(-1), Mockito.any(),
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(-1), Mockito.any(),
eq("192.168.xx.xx"), eq(null), eq(null))).thenReturn(pageReturn);
successRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "",
"", "", loginUser.getUserName(), ExecutionStatus.SUBMITTED_SUCCESS,
@ -178,7 +178,7 @@ public class ProcessInstanceServiceTest {
Assert.assertEquals(Status.SUCCESS, executorExistRes.get(Constants.STATUS));
//executor name empty
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getId()), eq(1), eq(""), eq(0), Mockito.any(),
when(processInstanceMapper.queryProcessInstanceListPaging(Mockito.any(Page.class), eq(project.getCode()), eq(1L), eq(""), eq(0), Mockito.any(),
eq("192.168.xx.xx"), eq(start), eq(end))).thenReturn(pageReturn);
Map<String, Object> executorEmptyRes = processInstanceService.queryProcessInstanceList(loginUser, projectName, 1, "2020-01-01 00:00:00",
"2020-01-02 00:00:00", "", "", ExecutionStatus.SUBMITTED_SUCCESS,

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/DependentItem.java

@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
*/
public class DependentItem {
private int definitionId;
private Long definitionCode;
private String depTasks;
private String cycle;
private String dateValue;
@ -34,18 +34,18 @@ public class DependentItem {
public String getKey(){
return String.format("%d-%s-%s-%s",
getDefinitionId(),
getDefinitionCode(),
getDepTasks(),
getCycle(),
getDateValue());
}
public int getDefinitionId() {
return definitionId;
public Long getDefinitionCode() {
return definitionCode;
}
public void setDefinitionId(int definitionId) {
this.definitionId = definitionId;
public void setDefinitionCode(Long definitionCode) {
this.definitionCode = definitionCode;
}
public String getDepTasks() {

58
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -37,6 +37,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance detail info by id
*
* @param processId processId
* @return process instance
*/
@ -44,6 +45,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance by host and stateArray
*
* @param host host
* @param stateArray stateArray
* @return process instance list
@ -53,6 +55,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance by tenantId and stateArray
*
* @param tenantId tenantId
* @param states states array
* @return process instance list
@ -62,6 +65,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query process instance by worker group and stateArray
*
* @param workerGroupId workerGroupId
* @param states states array
* @return process instance list
@ -85,9 +89,10 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* process instance page
*
* @param page page
* @param projectId projectId
* @param processDefinitionId processDefinitionId
* @param projectCode projectCode
* @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal
* @param executorId executorId
* @param statusArray statusArray
@ -97,8 +102,8 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @return process instance page
*/
IPage<ProcessInstance> queryProcessInstanceListPaging(Page<ProcessInstance> page,
@Param("projectId") int projectId,
@Param("processDefinitionId") Integer processDefinitionId,
@Param("projectCode") Long projectCode,
@Param("processDefinitionCode") Long processDefinitionCode,
@Param("searchVal") String searchVal,
@Param("executorId") Integer executorId,
@Param("states") int[] statusArray,
@ -108,6 +113,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* set failover by host and state array
*
* @param host host
* @param stateArray stateArray
* @return set result
@ -117,6 +123,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* update process instance by state
*
* @param originState originState
* @param destState destState
* @return update result
@ -126,6 +133,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* update process instance by tenantId
*
* @param originTenantId originTenantId
* @param destTenantId destTenantId
* @return update result
@ -135,6 +143,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* update process instance by worker groupId
*
* @param originWorkerGroupId originWorkerGroupId
* @param destWorkerGroupId destWorkerGroupId
* @return update result
@ -143,6 +152,7 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* count process instance state by user
*
* @param startTime startTime
* @param endTime endTime
* @param projectCodes projectCodes
@ -154,55 +164,57 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("projectCodes") Long[] projectCodes);
/**
* query process instance by processDefinitionId
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode
*
* @param processDefinitionCode processDefinitionCode
* @param size size
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineId(
@Param("processDefinitionId") int processDefinitionId,
List<ProcessInstance> queryByProcessDefineCode(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("size") int size);
/**
* query last scheduler process instance
* @param definitionId processDefinitionId
*
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @return process instance
*/
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionId") int definitionId,
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
/**
* query last running process instance
* @param definitionId definitionId
*
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @param stateArray stateArray
* @return process instance
*/
ProcessInstance queryLastRunningProcess(@Param("processDefinitionId") int definitionId,
ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("states") int[] stateArray);
/**
* query last manual process instance
* @param definitionId definitionId
*
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @return process instance
*/
ProcessInstance queryLastManualProcess(@Param("processDefinitionId") int definitionId,
ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
/**
* query top n process instance order by running duration
* @param size
*
* @param status process instance status
* @param startTime
* @param endTime
* @return ProcessInstance list
*/
@ -210,18 +222,18 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("status") ExecutionStatus status);
/**
* query process instance by processDefinitionId and stateArray
* @param processDefinitionId processDefinitionId
* query process instance by processDefinitionCode and stateArray
*
* @param processDefinitionCode processDefinitionCode
* @param states states array
* @return process instance list
*/
List<ProcessInstance> queryByProcessDefineIdAndStatus(
@Param("processDefinitionId") int processDefinitionId,
List<ProcessInstance> queryByProcessDefineCodeAndStatus(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("states") int[] states);
int updateGlobalParamsById(
@Param("globalParams") String globalParams,
int updateGlobalParamsById(@Param("globalParams") String globalParams,
@Param("id") int id);
}

39
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -19,10 +19,10 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper">
<sql id="baseSql">
id, name, process_definition_id, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
id, name, process_definition_version, process_definition_code, state, recovery, start_time, end_time, run_times,host,
command_type, command_param, task_depend_type, max_try_times, failure_strategy, warning_type,
warning_group_id, schedule_time, command_start_time, global_params, process_instance_json, flag,
update_time, is_sub_process, executor_id, locations, connects, history_cmd, dependence_schedule_times,
warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id,
locations, connects, history_cmd, dependence_schedule_times,
process_instance_priority, worker_group, timeout, tenant_id, var_pool
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
@ -88,15 +88,15 @@
</select>
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.process_definition_id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.name, instance.state, instance.schedule_time, instance.start_time, instance.end_time,
instance.run_times, instance.recovery, instance.host
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_id = define.id
join t_ds_process_definition define ON instance.process_definition_code = define.code
where instance.is_sub_process=0
and define.project_id = #{projectId}
<if test="processDefinitionId != 0">
and instance.process_definition_id = #{processDefinitionId}
and define.project_code = #{projectCode}
<if test="processDefinitionCode != 0">
and instance.process_definition_code = #{processDefinitionCode}
</if>
<if test="searchVal != null and searchVal != ''">
and instance.name like concat('%', #{searchVal}, '%')
@ -147,8 +147,8 @@
<select id="countInstanceStateByUser" resultType="org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount">
select t.state, count(0) as count
from t_ds_process_instance t
join t_ds_process_definition d on d.id=t.process_definition_id
join t_ds_project p on p.id=d.project_id
join t_ds_process_definition d on d.code=t.process_definition_code
join t_ds_project p on p.code=d.project_code
where 1 = 1
and t.is_sub_process = 0
<if test="startTime != null and endTime != null">
@ -162,18 +162,18 @@
</if>
group by t.state
</select>
<select id="queryByProcessDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryByProcessDefineCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
order by start_time desc limit #{size}
</select>
<select id="queryLastSchedulerProcess" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
<if test="startTime!=null and endTime != null ">
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
</if>
@ -183,7 +183,7 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
<if test="states !=null and states.length != 0">
and state in
<foreach collection="states" item="i" index="index" open="(" separator="," close=")">
@ -200,19 +200,18 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
and schedule_time is null
<if test="startTime!=null and endTime != null ">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
order by end_time desc limit 1
</select>
<select id="queryByProcessDefineIdAndStatus"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
<select id="queryByProcessDefineCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_id=#{processDefinitionId}
where process_definition_code=#{processDefinitionCode}
and state in
<foreach collection="states" item="i" open="(" close=")" separator=",">
#{i}

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProjectMapper.xml

@ -75,9 +75,10 @@
</include>
,
u.user_name as user_name,
(SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_id = p.id) AS def_count,
(SELECT COUNT(*) FROM t_ds_process_definition def, t_ds_process_instance inst WHERE def.id =
inst.process_definition_id AND def.project_id = p.id AND inst.state=1 ) as inst_running_count
(SELECT COUNT(*) FROM t_ds_process_definition AS def WHERE def.project_code = p.code) AS def_count,
(SELECT COUNT(*) FROM t_ds_process_definition_log def, t_ds_process_instance inst WHERE def.code =
inst.process_definition_code and def.version = inst.process_definition_version AND def.project_code = p.code
AND inst.state=1 ) as inst_running_count
from t_ds_project p
join t_ds_user u on u.id=p.user_id
where 1=1

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -73,7 +73,7 @@
select state, count(0) as count
from t_ds_task_instance t
left join t_ds_process_definition d on d.code=t.process_definition_code
left join t_ds_project p on p.id=d.project_id
left join t_ds_project p on p.code=d.project_code
where 1=1
<if test="projectCodes != null and projectCodes.length != 0">
and d.project_code in

41
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -20,9 +20,14 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.*;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import java.util.Date;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -32,8 +37,8 @@ import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@RunWith(SpringRunner.class)
@SpringBootTest
@ -54,10 +59,6 @@ public class ProcessInstanceMapperTest {
/**
* insert process instance with specified start time and end time,set state to SUCCESS
*
* @param startTime
* @param endTime
* @return
*/
private ProcessInstance insertOne(Date startTime, Date endTime) {
ProcessInstance processInstance = new ProcessInstance();
@ -73,6 +74,7 @@ public class ProcessInstanceMapperTest {
/**
* insert
*
* @return ProcessInstance
*/
private ProcessInstance insertOne() {
@ -185,8 +187,8 @@ public class ProcessInstanceMapperTest {
IPage<ProcessInstance> processInstanceIPage = processInstanceMapper.queryProcessInstanceListPaging(
page,
processDefinition.getProjectId(),
processInstance.getProcessDefinitionId(),
processDefinition.getProjectCode(),
processInstance.getProcessDefinitionCode(),
processInstance.getName(),
0,
stateArray,
@ -283,10 +285,10 @@ public class ProcessInstanceMapperTest {
ProcessInstance processInstance1 = insertOne();
List<ProcessInstance> processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 1);
List<ProcessInstance> processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 1);
Assert.assertEquals(1, processInstances.size());
processInstances = processInstanceMapper.queryByProcessDefineId(processInstance.getProcessDefinitionId(), 2);
processInstances = processInstanceMapper.queryByProcessDefineCode(processInstance.getProcessDefinitionCode(), 2);
Assert.assertEquals(2, processInstances.size());
processInstanceMapper.deleteById(processInstance.getId());
@ -302,7 +304,7 @@ public class ProcessInstanceMapperTest {
processInstance.setScheduleTime(new Date());
processInstanceMapper.updateById(processInstance);
ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionId(), null, null );
ProcessInstance processInstance1 = processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null);
Assert.assertNotEquals(processInstance1, null);
processInstanceMapper.deleteById(processInstance.getId());
}
@ -320,7 +322,7 @@ public class ProcessInstanceMapperTest {
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal()};
ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionId(), null, null , stateArray);
ProcessInstance processInstance1 = processInstanceMapper.queryLastRunningProcess(processInstance.getProcessDefinitionCode(), null, null, stateArray);
Assert.assertNotEquals(processInstance1, null);
processInstanceMapper.deleteById(processInstance.getId());
@ -336,13 +338,11 @@ public class ProcessInstanceMapperTest {
Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end
);
ProcessInstance processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end);
Assert.assertEquals(processInstance1.getId(), processInstance.getId());
start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionId(),start, end
);
processInstance1 = processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end);
Assert.assertNull(processInstance1);
processInstanceMapper.deleteById(processInstance.getId());
@ -352,9 +352,6 @@ public class ProcessInstanceMapperTest {
/**
* test whether it is in descending order by running duration
*
* @param processInstances
* @return
*/
private boolean isSortedByDuration(List<ProcessInstance> processInstances) {
for (int i = 1; i < processInstances.size(); i++) {

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -101,7 +101,7 @@ public class DependentExecute {
DependResult result = DependResult.FAILED;
for(DateInterval dateInterval : dateIntervals){
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
if(processInstance == null){
return DependResult.WAITING;
@ -170,24 +170,20 @@ public class DependentExecute {
* find the last one process instance that :
* 1. manual run and finish between the interval
* 2. schedule run and schedule time between the interval
* @param definitionId definition id
* @param definitionCode definition code
* @param dateInterval date interval
* @return ProcessInstance
*/
private ProcessInstance findLastProcessInterval(int definitionId, DateInterval dateInterval) {
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) {
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionId, dateInterval.getStartTime(), dateInterval.getEndTime());
ProcessInstance runningProcess = processService.findLastRunningProcess(definitionCode, dateInterval.getStartTime(), dateInterval.getEndTime());
if(runningProcess != null){
return runningProcess;
}
ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(
definitionId, dateInterval
);
ProcessInstance lastSchedulerProcess = processService.findLastSchedulerProcessInterval(definitionCode, dateInterval);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(
definitionId, dateInterval
);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval);
if(lastManualProcess ==null){
return lastSchedulerProcess;

35
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -120,7 +120,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -140,7 +140,7 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
@ -163,7 +163,7 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
@ -184,15 +184,15 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel1 = new DependentTaskModel();
dependentTaskModel1.setRelation(DependentRelation.AND);
dependentTaskModel1.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "B", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day"),
getDependentItemFromTaskNode(3L, "B", "today", "day")
).collect(Collectors.toList()));
DependentTaskModel dependentTaskModel2 = new DependentTaskModel();
dependentTaskModel2.setRelation(DependentRelation.OR);
dependentTaskModel2.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "C", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day"),
getDependentItemFromTaskNode(3L, "C", "today", "day")
).collect(Collectors.toList()));
/*
@ -217,10 +217,10 @@ public class DependentTaskTest {
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(processInstance200);
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any()))
.thenReturn(processInstance300);
// for DependentExecute.getDependTaskResult
@ -249,7 +249,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day")
getDependentItemFromTaskNode(2L, Constants.DEPENDENT_ALL, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -267,7 +267,7 @@ public class DependentTaskTest {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
@ -280,7 +280,7 @@ public class DependentTaskTest {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE));
DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance);
@ -302,7 +302,7 @@ public class DependentTaskTest {
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
getDependentItemFromTaskNode(2L, "A", "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
@ -318,7 +318,7 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
@ -373,12 +373,9 @@ public class DependentTaskTest {
/**
* DependentItem defines the condition for the dependent
*/
private DependentItem getDependentItemFromTaskNode(
int processDefinitionId, String taskName,
String date, String cycle
) {
private DependentItem getDependentItemFromTaskNode(Long processDefinitionCode, String taskName, String date, String cycle) {
DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionId(processDefinitionId);
dependentItem.setDefinitionCode(processDefinitionCode);
dependentItem.setDepTasks(taskName);
dependentItem.setDateValue(date);
dependentItem.setCycle(cycle);

18
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1911,12 +1911,12 @@ public class ProcessService {
/**
* find last scheduler process instance in the date interval
*
* @param definitionId definitionId
* @param definitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastSchedulerProcessInterval(int definitionId, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionId,
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
}
@ -1924,12 +1924,12 @@ public class ProcessService {
/**
* find last manual process instance interval
*
* @param definitionId process definition id
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastManualProcessInterval(int definitionId, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionId,
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
}
@ -1937,13 +1937,13 @@ public class ProcessService {
/**
* find last running process instance
*
* @param definitionId process definition id
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(int definitionId, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionId,
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
stateArray);

Loading…
Cancel
Save