From d38d504332042fcf60b6560284e4eb729600fbbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=97=BA=E9=98=B3?= Date: Thu, 6 Jul 2023 11:38:26 +0800 Subject: [PATCH] [Fix] Fix running task instance throught api gots failed (#14433) * update logic * split method --- .../runner/StreamTaskExecuteRunnable.java | 7 +++- .../execute/TaskExecutionContextFactory.java | 33 ++++++++++++------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index 5f64a3c099..40ef9e5df6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -54,6 +54,7 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory; +import org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.enums.ResourceType; @@ -104,6 +105,8 @@ public class StreamTaskExecuteRunnable implements Runnable { protected TaskExecuteStartMessage taskExecuteStartMessage; + protected TaskExecutionContextFactory taskExecutionContextFactory; + /** * task event queue */ @@ -122,6 +125,7 @@ public class StreamTaskExecuteRunnable implements Runnable { SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class); this.taskDefinition = taskDefinition; this.taskExecuteStartMessage = taskExecuteStartMessage; + this.taskExecutionContextFactory = SpringApplicationContext.getBean(TaskExecutionContextFactory.class); } public TaskInstance getTaskInstance() { @@ -337,7 +341,8 @@ public class StreamTaskExecuteRunnable implements Runnable { taskExecutionContext.setProcessDefineVersion(processDefinition.getVersion()); // process instance id default 0 taskExecutionContext.setProcessInstanceId(0); - + taskExecutionContextFactory.setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, tenantCode); + taskExecutionContextFactory.setK8sTaskRelatedInfo(taskExecutionContext, taskInstance); return taskExecutionContext; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java index 03005ff70c..0153c81364 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java @@ -121,33 +121,42 @@ public class TaskExecutionContextFactory { .orElse(null); setTaskResourceInfo(resources); - // TODO to be optimized - DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null; - if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) { - dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext(); - setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, workflowInstance.getTenantCode()); - } - - K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance); - Map businessParamsMap = curingParamsService.preBuildBusinessParams(workflowInstance); AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder() .taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build()); Map propertyMap = curingParamsService.paramParsingPreparation(taskInstance, baseParam, workflowInstance); - return TaskExecutionContextBuilder.get() + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildWorkflowInstanceHost(masterConfig.getMasterAddress()) .buildTaskInstanceRelatedInfo(taskInstance) .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine()) .buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance()) .buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine()) .buildResourceParametersInfo(resources) - .buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext) - .buildK8sTaskRelatedInfo(k8sTaskExecutionContext) .buildBusinessParamsMap(businessParamsMap) .buildParamInfo(propertyMap) .create(); + + setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, workflowInstance.getTenantCode()); + setK8sTaskRelatedInfo(taskExecutionContext, taskInstance); + return taskExecutionContext; + } + + public void setDataQualityTaskExecutionContext(TaskExecutionContext taskExecutionContext, TaskInstance taskInstance, + String tenantCode) { + // TODO to be optimized + DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null; + if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) { + dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext(); + setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenantCode); + } + taskExecutionContext.setDataQualityTaskExecutionContext(dataQualityTaskExecutionContext); + } + + public void setK8sTaskRelatedInfo(TaskExecutionContext taskExecutionContext, TaskInstance taskInstance) { + K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance); + taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext); } private Map getResourceFullNames(TaskInstance taskInstance) {