From 9d9ae9ad540a9f2a8bf93345d13418ef417f7227 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Wed, 29 Mar 2023 18:18:03 +0800 Subject: [PATCH] [improvement] support self-dependent (#13818) --- .../dao/mapper/ProcessInstanceMapper.java | 16 ++++ .../dao/repository/ProcessInstanceDao.java | 48 ++++++++++++ .../impl/ProcessInstanceDaoImpl.java | 73 +++++++++++++++++++ .../dao/mapper/ProcessInstanceMapper.xml | 19 +++++ .../runner/task/DependentTaskProcessor.java | 3 +- .../server/master/utils/DependentExecute.java | 70 ++++++++++++++++-- .../server/master/DependentTaskTest.java | 60 +++++++++++++-- .../service/process/ProcessService.java | 8 -- .../service/process/ProcessServiceImpl.java | 49 ------------- .../service/process/ProcessServiceTest.java | 19 ----- 10 files changed, 273 insertions(+), 92 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 7807998b16..ea9fe453b8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -227,6 +227,22 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("endTime") Date endTime, @Param("testFlag") int testFlag); + /** + * query first schedule process instance + * + * @param definitionCode definitionCode + * @return process instance + */ + ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode); + + /** + * query first manual process instance + * + * @param definitionCode definitionCode + * @return process instance + */ + ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode); + /** * query top n process instance order by running duration * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java index b249e265e7..81064e7f84 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java @@ -18,7 +18,11 @@ package org.apache.dolphinscheduler.dao.repository; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; +import org.apache.ibatis.annotations.Param; + +import java.util.Date; import java.util.List; public interface ProcessInstanceDao { @@ -39,4 +43,48 @@ public interface ProcessInstanceDao { void deleteById(Integer workflowInstanceId); ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId); + + /** + * find last scheduler process instance in the date interval + * + * @param definitionCode definitionCode + * @param dateInterval dateInterval + * @return process instance + */ + ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); + + /** + * find last manual process instance interval + * + * @param definitionCode process definition code + * @param dateInterval dateInterval + * @return process instance + */ + ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag); + + /** + * find last running process instance + * + * @param definitionCode process definition code + * @param startTime start time + * @param endTime end time + * @return process instance + */ + ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag); + + /** + * query first schedule process instance + * + * @param definitionCode definitionCode + * @return process instance + */ + ProcessInstance queryFirstScheduleProcessInstance(@Param("processDefinitionCode") Long definitionCode); + + /** + * query first manual process instance + * + * @param definitionCode definitionCode + * @return process instance + */ + ProcessInstance queryFirstStartProcessInstance(@Param("processDefinitionCode") Long definitionCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java index ae36ca924e..d3d6f66280 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java @@ -17,12 +17,15 @@ package org.apache.dolphinscheduler.dao.repository.impl; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval; import org.apache.commons.collections4.CollectionUtils; +import java.util.Date; import java.util.List; import lombok.NonNull; @@ -74,4 +77,74 @@ public class ProcessInstanceDaoImpl implements ProcessInstanceDao { public ProcessInstance queryByWorkflowInstanceId(Integer workflowInstanceId) { return processInstanceMapper.selectById(workflowInstanceId); } + + /** + * find last scheduler process instance in the date interval + * + * @param definitionCode definitionCode + * @param dateInterval dateInterval + * @return process instance + */ + @Override + public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, + int testFlag) { + return processInstanceMapper.queryLastSchedulerProcess(definitionCode, + dateInterval.getStartTime(), + dateInterval.getEndTime(), + testFlag); + } + + /** + * find last manual process instance interval + * + * @param definitionCode process definition code + * @param dateInterval dateInterval + * @return process instance + */ + @Override + public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) { + return processInstanceMapper.queryLastManualProcess(definitionCode, + dateInterval.getStartTime(), + dateInterval.getEndTime(), + testFlag); + } + + /** + * find last running process instance + * + * @param definitionCode process definition code + * @param startTime start time + * @param endTime end time + * @return process instance + */ + @Override + public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) { + return processInstanceMapper.queryLastRunningProcess(definitionCode, + startTime, + endTime, + testFlag, + WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState()); + } + + /** + * query first schedule process instance + * + * @param definitionCode definitionCode + * @return process instance + */ + @Override + public ProcessInstance queryFirstScheduleProcessInstance(Long definitionCode) { + return processInstanceMapper.queryFirstScheduleProcessInstance(definitionCode); + } + + /** + * query first manual process instance + * + * @param definitionCode definitionCode + * @return process instance + */ + @Override + public ProcessInstance queryFirstStartProcessInstance(Long definitionCode) { + return processInstanceMapper.queryFirstStartProcessInstance(definitionCode); + } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 7c64c035c6..77f7b4e3ee 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -242,6 +242,25 @@ order by end_time desc limit 1 + + + + +