Browse Source

fix dependent task logic (#15795)

Co-authored-by: abzymeinsjtu <abzymeinsjtu@B-54Q8MD6R-0244.local>
dev_wenjun_refactorMaster
Evan Sun 2 months ago committed by GitHub
parent
commit
c0435e5bd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  2. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
  3. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
  4. 9
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  5. 3
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  6. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -188,13 +188,15 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query last scheduler process instance
*
* @param definitionCode definitionCode
* @param processDefinitionCode definitionCode
* @param taskDefinitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return process instance
*/
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode,
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long processDefinitionCode,
@Param("taskDefinitionCode") Long taskDefinitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java

@ -41,11 +41,13 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
/**
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
* @param processDefinitionCode definitionCode
* @param taskDefinitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
ProcessInstance queryLastSchedulerProcessInterval(Long processDefinitionCode, Long taskDefinitionCode,
DateInterval dateInterval, int testFlag);
/**
* find last manual process instance interval

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java

@ -67,14 +67,18 @@ public class ProcessInstanceDaoImpl extends BaseDao<ProcessInstance, ProcessInst
/**
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
* @param processDefinitionCode definitionCode
* @param taskDefinitionCode definitionCode
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval,
public ProcessInstance queryLastSchedulerProcessInterval(Long processDefinitionCode, Long taskDefinitionCode,
DateInterval dateInterval,
int testFlag) {
return mybatisMapper.queryLastSchedulerProcess(definitionCode,
return mybatisMapper.queryLastSchedulerProcess(
processDefinitionCode,
taskDefinitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);

9
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -197,12 +197,17 @@
order by start_time desc limit #{size}
</select>
<select id="queryLastSchedulerProcess" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
select t1.* from (select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
<if test="startTime!=null and endTime != null ">
<if test="startTime != null and endTime != null ">
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
</if>) as t1
<if test="taskDefinitionCode != null and taskDefinitionCode != 0 and taskDefinitionCode != -1">
inner join
t_ds_task_instance as t2
on t1.id = t2.process_instance_id and t2.task_code=#{taskDefinitionCode}
</if>
order by end_time desc limit 1
</select>

3
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -263,7 +263,8 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processInstanceMapper.updateById(processInstance);
ProcessInstance processInstance1 =
processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null,
processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), 0L, null,
null,
processInstance.getTestFlag());
Assertions.assertNotEquals(null, processInstance1);
processInstanceMapper.deleteById(processInstance.getId());

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

@ -320,7 +320,7 @@ public class DependentExecute {
int testFlag) {
ProcessInstance lastSchedulerProcess =
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, taskCode, dateInterval, testFlag);
ProcessInstance lastManualProcess =
processInstanceDao.queryLastManualProcessInterval(definitionCode, taskCode, dateInterval, testFlag);

Loading…
Cancel
Save