Browse Source

fix dependent task when the dependent on process have forbidden tasks (#10952)

* fix dependent task when the dependent on process have forbidden tasks

* fix dependent task when the dependent on process have forbidden tasks

Co-authored-by: shenk-b <shenk-b@glodon.com>
2.0.7-release
Kevin.Shin 2 years ago committed by GitHub
parent
commit
749a9d2f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DependentExecute.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
@ -145,6 +146,7 @@ public class DependentExecute {
Map<Long, String> definiteTask = taskDefinitionLogs.stream().filter(log -> !log.getTaskType().equals(TaskType.SUB_PROCESS.getDesc())
|| !log.getTaskType().equals(TaskType.DEPENDENT.getDesc())
|| !log.getTaskType().equals(TaskType.CONDITIONS.getDesc()))
.filter(log -> log.getFlag().equals(Flag.YES))
.collect(Collectors.toMap(TaskDefinition::getCode, TaskDefinitionLog::getName));
if (!definiteTask.isEmpty()) {
List<TaskInstance> taskInstanceList = processService.findLastTaskInstanceListInterval(definiteTask.keySet(), dateInterval);
@ -194,25 +196,26 @@ public class DependentExecute {
*/
private DependResult getDependTaskResult(ProcessInstance processInstance, long taskCode, DateInterval dateInterval) {
TaskInstance taskInstance = processService.findLastTaskInstanceInterval(taskCode, dateInterval);
DependResult result;
if (taskInstance == null) {
if (!processInstance.getState().typeIsFinished()) {
logger.info("Wait for the dependent workflow to complete, taskCode:{}, processInstanceId:{}, processInstance state:{}",
taskCode, processInstance.getId(), processInstance.getState());
return DependResult.WAITING;
}
TaskDefinition taskDefinition = processService.findTaskDefinitionByCode(taskCode);
if (taskDefinition == null) {
logger.error("Cannot find the task definition, something error, taskCode: {}", taskCode);
} else {
logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName());
return DependResult.FAILED;
}
if (taskDefinition.getFlag() == Flag.NO) {
logger.warn("Cannot find the task instance, but the task is forbidden, so dependent success, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName());
return DependResult.SUCCESS;
}
result = DependResult.FAILED;
if (!processInstance.getState().typeIsFinished()) {
logger.info("Wait for the dependent workflow to complete, taskCode:{}, processInstanceId:{}, processInstance state:{}", taskCode, processInstance.getId(), processInstance.getState());
return DependResult.WAITING;
}
logger.warn("Cannot find the task in the process instance when the ProcessInstance is finish, taskCode: {}, taskName: {}", taskCode, taskDefinition.getName());
return DependResult.FAILED;
} else {
logger.info("The running task, taskId:{}, taskCode:{}, taskName:{}", taskInstance.getId(), taskInstance.getTaskCode(), taskInstance.getName());
result = getDependResultByState(taskInstance.getState());
return getDependResultByState(taskInstance.getState());
}
return result;
}
/**

Loading…
Cancel
Save