Browse Source

[improvement] support self-dependent (#13818)

3.2.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
9d9ae9ad54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  2. 48
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  3. 73
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  4. 19
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  5. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  6. 70
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
  7. 60
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  8. 8
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  9. 49
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  10. 19
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

16
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -227,6 +227,22 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
@Param("endTime") Date endTime, @Param("endTime") Date endTime,
@Param("testFlag") int testFlag); @Param("testFlag") int testFlag);
/**
* query first schedule process instance
*
* @param definitionCode definitionCode
* @return process instance
*/
ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
/**
* query first manual process instance
*
* @param definitionCode definitionCode
* @return process instance
*/
ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
/** /**
* query top n process instance order by running duration * query top n process instance order by running duration
* *

48
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -18,7 +18,11 @@
package org.apache.dolphinscheduler.dao.repository; package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.ibatis.annotations.Param;
import java.util.Date;
import java.util.List; import java.util.List;
public interface ProcessInstanceDao { public interface ProcessInstanceDao {
@ -39,4 +43,48 @@ public interface ProcessInstanceDao {
void deleteById(Integer workflowInstanceId); void deleteById(Integer workflowInstanceId);
ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId); ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId);
/**
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
/**
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
/**
* find last running process instance
*
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
/**
* query first schedule process instance
*
* @param definitionCode definitionCode
* @return process instance
*/
ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode);
/**
* query first manual process instance
*
* @param definitionCode definitionCode
* @return process instance
*/
ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode);
} }

73
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.dao.repository.impl; package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import java.util.Date;
import java.util.List; import java.util.List;
import lombok.NonNull; import lombok.NonNull;
@ -74,4 +77,74 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao {
public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) { public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) {
return processInstanceMapper.selectById(workflowInstanceId); return processInstanceMapper.selectById(workflowInstanceId);
} }
/**
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
int testFlag) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
}
/**
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
}
/**
* find last running process instance
*
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
@Override
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
testFlag,
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
* query first schedule process instance
*
* @param definitionCode definitionCode
* @return process instance
*/
@Override
public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) {
return processInstanceMapper.queryFirstScheduleProcessInstance(definitionCode);
}
/**
* query first manual process instance
*
* @param definitionCode definitionCode
* @return process instance
*/
@Override
public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) {
return processInstanceMapper.queryFirstStartProcessInstance(definitionCode);
}
} }

19
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -242,6 +242,25 @@
</if> </if>
order by end_time desc limit 1 order by end_time desc limit 1
</select> </select>
<select id="queryFirstScheduleProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code = #{processDefinitionCode} and schedule_time is not null
order by schedule_time
limit 1
</select>
<select id="queryFirstStartProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code = #{processDefinitionCode} and start_time is not null
order by start_time
limit 1
</select>
<select id="queryByProcessDefineCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance"> <select id="queryByProcessDefineCodeAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select select
<include refid="baseSql"/> <include refid="baseSql"/>

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -222,7 +222,8 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
dependentItem.getKey()); dependentItem.getKey());
} }
} }
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation())); this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation(),
processInstance, taskInstance));
} }
} }

70
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.utils;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
@ -28,7 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
@ -36,15 +36,15 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import lombok.extern.slf4j.Slf4j;
/** /**
* dependent item execute * dependent item execute
*/ */
@Slf4j
public class DependentExecute { public class DependentExecute {
/** private final ProcessInstanceDao processInstanceDao = SpringApplicationContext.getBean(ProcessInstanceDao.class);
* process service
*/
private final ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class); private final TaskInstanceDao taskInstanceDao = SpringApplicationContext.getBean(TaskInstanceDao.class);
@ -58,6 +58,10 @@ public class DependentExecute {
*/ */
private DependentRelation relation; private DependentRelation relation;
private ProcessInstance processInstance;
private TaskInstance taskInstance;
/** /**
* depend result * depend result
*/ */
@ -74,9 +78,12 @@ public class DependentExecute {
* @param itemList item list * @param itemList item list
* @param relation relation * @param relation relation
*/ */
public DependentExecute(List<DependentItem> itemList, DependentRelation relation) { public DependentExecute(List<DependentItem> itemList, DependentRelation relation, ProcessInstance processInstance,
TaskInstance taskInstance) {
this.dependItemList = itemList; this.dependItemList = itemList;
this.relation = relation; this.relation = relation;
this.processInstance = processInstance;
this.taskInstance = taskInstance;
} }
/** /**
@ -185,10 +192,10 @@ public class DependentExecute {
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
ProcessInstance lastSchedulerProcess = ProcessInstance lastSchedulerProcess =
processService.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag); processInstanceDao.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
ProcessInstance lastManualProcess = ProcessInstance lastManualProcess =
processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag); processInstanceDao.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
if (lastManualProcess == null) { if (lastManualProcess == null) {
return lastSchedulerProcess; return lastSchedulerProcess;
@ -243,6 +250,15 @@ public class DependentExecute {
List<DependResult> dependResultList = new ArrayList<>(); List<DependResult> dependResultList = new ArrayList<>();
for (DependentItem dependentItem : dependItemList) { for (DependentItem dependentItem : dependItemList) {
if (isSelfDependent(dependentItem) && isFirstProcessInstance(dependentItem)) {
// if self-dependent, default success at first time
dependResultMap.put(dependentItem.getKey(), DependResult.SUCCESS);
dependResultList.add(DependResult.SUCCESS);
log.info(
"This dependent item is self-dependent and run at first time, default success, processDefinitionCode:{}, depTaskCode:{}",
dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode());
continue;
}
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag); DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
if (dependResult != DependResult.WAITING) { if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult); dependResultMap.put(dependentItem.getKey(), dependResult);
@ -272,4 +288,42 @@ public class DependentExecute {
return dependResultMap; return dependResultMap;
} }
/**
* check for self-dependent
* @param dependentItem
* @return
*/
public boolean isSelfDependent(DependentItem dependentItem) {
if (processInstance.getProcessDefinitionCode().equals(dependentItem.getDefinitionCode())) {
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
return true;
}
if (dependentItem.getDepTaskCode() == taskInstance.getTaskCode()) {
return true;
}
}
return false;
}
/**
* check for first-running
* query the first processInstance by scheduleTime(or startTime if scheduleTime is null)
* @param dependentItem
* @return
*/
public boolean isFirstProcessInstance(DependentItem dependentItem) {
ProcessInstance firstProcessInstance =
processInstanceDao.queryFirstScheduleProcessInstance(dependentItem.getDefinitionCode());
if (firstProcessInstance == null) {
firstProcessInstance = processInstanceDao.queryFirstStartProcessInstance(dependentItem.getDefinitionCode());
if (firstProcessInstance == null) {
log.warn("First process instance is null, processDefinitionCode:{}", dependentItem.getDefinitionCode());
return false;
}
}
if (firstProcessInstance.getId() == processInstance.getId()) {
return true;
}
return false;
}
} }

60
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
@ -34,14 +35,17 @@ import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.utils.DependentExecute;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
@ -72,6 +76,8 @@ public class DependentTaskTest {
private ProcessService processService; private ProcessService processService;
private ProcessInstanceDao processInstanceDao;
private TaskInstanceDao taskInstanceDao; private TaskInstanceDao taskInstanceDao;
private TaskDefinitionDao taskDefinitionDao; private TaskDefinitionDao taskDefinitionDao;
@ -101,6 +107,9 @@ public class DependentTaskTest {
processService = Mockito.mock(ProcessService.class); processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstanceDao = Mockito.mock(ProcessInstanceDao.class);
Mockito.when(applicationContext.getBean(ProcessInstanceDao.class)).thenReturn(processInstanceDao);
taskInstanceDao = Mockito.mock(TaskInstanceDao.class); taskInstanceDao = Mockito.mock(TaskInstanceDao.class);
Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao); Mockito.when(applicationContext.getBean(TaskInstanceDao.class)).thenReturn(taskInstanceDao);
@ -161,7 +170,7 @@ public class DependentTaskTest {
ProcessInstance dependentProcessInstance = ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE); getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval // for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance); .thenReturn(dependentProcessInstance);
@ -183,7 +192,7 @@ public class DependentTaskTest {
ProcessInstance dependentProcessInstance = ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS); getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval // for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance); .thenReturn(dependentProcessInstance);
@ -231,10 +240,10 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(300, WorkflowExecutionStatus.SUCCESS); getProcessInstanceForFindLastRunningProcess(300, WorkflowExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval // for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(processInstance200); .thenReturn(processInstance200);
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(processInstance300); .thenReturn(processInstance300);
@ -281,7 +290,7 @@ public class DependentTaskTest {
public void testDependentOnAllSuccess() { public void testDependentOnAllSuccess() {
testDependentOnAllInit(); testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval // for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS)); .thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS));
@ -291,7 +300,7 @@ public class DependentTaskTest {
public void testDependentOnAllFailure() { public void testDependentOnAllFailure() {
testDependentOnAllInit(); testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval // for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE)); .thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE));
@ -325,7 +334,7 @@ public class DependentTaskTest {
ProcessInstance dependentProcessInstance = ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.RUNNING_EXECUTION); getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval // for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processInstanceDao
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt())) .findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance); .thenReturn(dependentProcessInstance);
@ -345,9 +354,45 @@ public class DependentTaskTest {
} }
@Test
public void testIsSelfDependent() {
DependentExecute dependentExecute =
new DependentExecute(new ArrayList<>(), DependentRelation.AND, processInstance, taskInstance);
DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
dependentItem.setDepTaskCode(Constants.DEPENDENT_ALL_TASK_CODE);
Assertions.assertTrue(dependentExecute.isSelfDependent(dependentItem));
dependentItem.setDepTaskCode(taskInstance.getTaskCode());
Assertions.assertTrue(dependentExecute.isSelfDependent(dependentItem));
// no self task
dependentItem.setDepTaskCode(12345678);
Assertions.assertFalse(dependentExecute.isSelfDependent(dependentItem));
// no self wf
dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
Assertions.assertFalse(dependentExecute.isSelfDependent(dependentItem));
}
@Test
public void testIsFirstProcessInstance() {
Mockito.when(processInstanceDao.queryFirstScheduleProcessInstance(processInstance.getProcessDefinitionCode()))
.thenReturn(processInstance);
DependentExecute dependentExecute =
new DependentExecute(new ArrayList<>(), DependentRelation.AND, processInstance, taskInstance);
DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionCode(processInstance.getProcessDefinitionCode());
Assertions.assertTrue(dependentExecute.isFirstProcessInstance(dependentItem));
dependentItem.setDefinitionCode(12345678L);
Assertions.assertFalse(dependentExecute.isFirstProcessInstance(dependentItem));
}
private ProcessInstance getProcessInstance() { private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance(); ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(100); processInstance.setId(100);
processInstance.setProcessDefinitionCode(10000L);
processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION); processInstance.setState(WorkflowExecutionStatus.RUNNING_EXECUTION);
return processInstance; return processInstance;
} }
@ -355,6 +400,7 @@ public class DependentTaskTest {
private TaskInstance getTaskInstance() { private TaskInstance getTaskInstance() {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1000); taskInstance.setId(1000);
taskInstance.setTaskCode(10000L);
return taskInstance; return taskInstance;
} }

8
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -49,12 +49,10 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -136,12 +134,6 @@ public interface ProcessService {
List<Schedule> selectAllByProcessDefineCode(long[] codes); List<Schedule> selectAllByProcessDefineCode(long[] codes);
ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
String queryUserQueueByProcessInstance(ProcessInstance processInstance); String queryUserQueueByProcessInstance(ProcessInstance processInstance);
ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId); ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);

49
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -118,7 +118,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@ -1683,54 +1682,6 @@ public class ProcessServiceImpl implements ProcessService {
return scheduleMapper.selectAllByProcessDefineArray(codes); return scheduleMapper.selectAllByProcessDefineArray(codes);
} }
/**
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
int testFlag) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
}
/**
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
}
/**
* find last running process instance
*
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @return process instance
*/
@Override
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
testFlag,
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/** /**
* query user queue by process instance * query user queue by process instance
* *

19
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -76,7 +76,6 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType; import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest; import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
@ -750,24 +749,6 @@ public class ProcessServiceTest {
} }
@Test
public void testFindLastManualProcessInterval() {
long definitionCode = 1L;
DateInterval dateInterval = new DateInterval(new Date(), new Date());
int testFlag = 1;
// find test lastManualProcessInterval
ProcessInstance lastManualProcessInterval =
processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
Assertions.assertNull(lastManualProcessInterval);
// find online lastManualProcessInterval
testFlag = 0;
lastManualProcessInterval =
processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
Assertions.assertNull(lastManualProcessInterval);
}
@Test @Test
public void testQueryTestDataSourceId() { public void testQueryTestDataSourceId() {
Integer onlineDataSourceId = 1; Integer onlineDataSourceId = 1;

Loading…
Cancel
Save