diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 1dfa4812cf..7ae5827f79 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -17,25 +17,37 @@ package org.apache.dolphinscheduler.server.master.runner.task; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; - +import com.google.auto.service.AutoService; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; import org.apache.dolphinscheduler.server.master.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; -import com.google.auto.service.AutoService; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; /** * dependent task processor @@ -45,6 +57,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor { private DependentParameters dependentParameters; + private final ProcessDefinitionMapper processDefinitionMapper = SpringApplicationContext.getBean(ProcessDefinitionMapper.class); + + private final TaskDefinitionMapper taskDefinitionMapper = SpringApplicationContext.getBean(TaskDefinitionMapper.class); + + private final ProjectMapper projectMapper = SpringApplicationContext.getBean(ProjectMapper.class); + /** * dependent task list */ @@ -56,6 +74,10 @@ public class DependentTaskProcessor extends BaseTaskProcessor { */ private Map dependResultMap = new HashMap<>(); + private Map projectCodeMap = new HashMap<>(); + private Map processDefinitionMap = new HashMap<>(); + private Map taskDefinitionMap = new HashMap<>(); + /** * dependent date */ @@ -67,24 +89,31 @@ public class DependentTaskProcessor extends BaseTaskProcessor { @Override public boolean submitTask() { - this.taskInstance = - processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); + try { + this.taskInstance = + processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); - if (this.taskInstance == null) { + if (this.taskInstance == null) { + return false; + } + this.setTaskExecutionLogger(); + logger.info("Dependent task submit success"); + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); + taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); + taskInstance.setStartTime(new Date()); + processService.updateTaskInstance(taskInstance); + initDependParameters(); + logger.info("Success initialize dependent task parameters, the dependent data is: {}", dependentDate); + return true; + } catch (Exception ex) { + logger.error("Submit/Initialize dependent task error", ex); return false; } - this.setTaskExecutionLogger(); - taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), - processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); - taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); - taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); - taskInstance.setStartTime(new Date()); - processService.updateTaskInstance(taskInstance); - initDependParameters(); - return true; } @Override @@ -128,14 +157,56 @@ public class DependentTaskProcessor extends BaseTaskProcessor { */ private void initDependParameters() { this.dependentParameters = taskInstance.getDependency(); - for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) { - this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation())); - } if (processInstance.getScheduleTime() != null) { this.dependentDate = this.processInstance.getScheduleTime(); } else { this.dependentDate = new Date(); } + // check dependent project is exist + List dependTaskList = dependentParameters.getDependTaskList(); + Set projectCodes = new HashSet<>(); + Set processDefinitionCodes = new HashSet<>(); + Set taskDefinitionCodes = new HashSet<>(); + dependTaskList.forEach(dependentTaskModel -> { + dependentTaskModel.getDependItemList().forEach(dependentItem -> { + projectCodes.add(dependentItem.getProjectCode()); + processDefinitionCodes.add(dependentItem.getDefinitionCode()); + taskDefinitionCodes.add(dependentItem.getDepTaskCode()); + }); + }); + projectCodeMap = projectMapper.queryByCodes(projectCodes).stream().collect(Collectors.toMap(Project::getCode, Function.identity())); + processDefinitionMap = processDefinitionMapper.queryByCodes(processDefinitionCodes).stream().collect(Collectors.toMap(ProcessDefinition::getCode, Function.identity())); + taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes).stream().collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); + + for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) { + logger.info("Add sub dependent check tasks, dependent relation: {}", taskModel.getRelation()); + for (DependentItem dependentItem : taskModel.getDependItemList()) { + Project project = projectCodeMap.get(dependentItem.getProjectCode()); + if (project == null) { + logger.error("The dependent task's project is not exist, dependentItem: {}", dependentItem); + throw new RuntimeException("The dependent task's project is not exist, dependentItem: " + dependentItem); + } + ProcessDefinition processDefinition = processDefinitionMap.get(dependentItem.getDefinitionCode()); + if (processDefinition == null) { + logger.error("The dependent task's workflow is not exist, dependentItem: {}", dependentItem); + throw new RuntimeException("The dependent task's workflow is not exist, dependentItem: " + dependentItem); + } + if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) { + logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: ALL, dependentKey: {}", + project.getName(), processDefinition.getName(), dependentItem.getKey()); + + } else { + TaskDefinition taskDefinition = taskDefinitionMap.get(dependentItem.getDepTaskCode()); + if (taskDefinition == null) { + logger.error("The dependent task's taskDefinition is not exist, dependentItem: {}", dependentItem); + throw new RuntimeException("The dependent task's taskDefinition is not exist, dependentItem: " + dependentItem); + } + logger.info("Add dependent task: projectName: {}, workflowName: {}, taskName: {}, dependentKey: {}", + project.getName(), processDefinition.getName(), taskDefinition.getName(), dependentItem.getKey()); + } + } + this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation())); + } } @Override @@ -166,7 +237,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { if (!dependResultMap.containsKey(entry.getKey())) { dependResultMap.put(entry.getKey(), entry.getValue()); // save depend result to log - logger.info("dependent item complete, task: {}, result: {}", entry.getKey(), entry.getValue()); + logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate); } } if (!dependentExecute.finish(dependentDate)) { @@ -188,7 +259,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor { dependResultList.add(dependResult); } result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList); - logger.info("dependent task completed, dependent result: {}", result); + logger.info("Dependent task completed, dependent result: {}", result); return result; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java index f1b6a5ea5f..3501db67c8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/model/DependentTaskModel.java @@ -17,29 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.api.model; +import lombok.Data; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import java.util.List; +@Data public class DependentTaskModel { - private List dependItemList; private DependentRelation relation; - public List getDependItemList() { - return dependItemList; - } - - public void setDependItemList(List dependItemList) { - this.dependItemList = dependItemList; - } - - public DependentRelation getRelation() { - return relation; - } - - public void setRelation(DependentRelation relation) { - this.relation = relation; - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java index bfbab6b759..e0e6b40068 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/DependentParameters.java @@ -17,11 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.api.parameters; +import lombok.Data; +import lombok.EqualsAndHashCode; import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation; import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel; import java.util.List; +@Data +@EqualsAndHashCode(callSuper = true) public class DependentParameters extends AbstractParameters { private List dependTaskList; @@ -32,19 +36,4 @@ public class DependentParameters extends AbstractParameters { return true; } - public List getDependTaskList() { - return dependTaskList; - } - - public void setDependTaskList(List dependTaskList) { - this.dependTaskList = dependTaskList; - } - - public DependentRelation getRelation() { - return relation; - } - - public void setRelation(DependentRelation relation) { - this.relation = relation; - } }