From ea2a8d26a0d42683309b8bda9ef24f4daaab266b Mon Sep 17 00:00:00 2001 From: Kirs Date: Sat, 2 Oct 2021 13:09:00 +0800 Subject: [PATCH] [Improvement#6438][Worker]remove meaningless DB query (#6440) org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread In the process of task kill, it will query the DB to obtain the task instance object, but this judgment does not make any sense. In fact, the relevant context is already clear, so there is no need to query from DB. --- .../builder/TaskExecutionContextBuilder.java | 1 + .../worker/runner/TaskExecuteThread.java | 18 ++---------------- .../queue/entity/TaskExecutionContext.java | 14 ++++++++++++++ 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index afc9b9a036..807c5218ca 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -61,6 +61,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setResources(taskInstance.getResources()); taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); taskExecutionContext.setVarPool(taskInstance.getVarPool()); + taskExecutionContext.setDryRun(taskInstance.getDryRun()); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index eaa7cc40a3..d1f1c39717 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.RetryerUtils; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; @@ -41,8 +40,6 @@ import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.service.alert.AlertClientService; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.exception.PluginNotFoundException; import org.apache.dolphinscheduler.spi.task.AbstractTask; @@ -113,11 +110,6 @@ public class TaskExecuteThread implements Runnable, Delayed { private TaskPluginManager taskPluginManager; - /** - * process database access - */ - protected ProcessService processService; - /** * constructor * @@ -130,7 +122,6 @@ public class TaskExecuteThread implements Runnable, Delayed { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; - this.processService = SpringApplicationContext.getBean(ProcessService.class); } public TaskExecuteThread(TaskExecutionContext taskExecutionContext, @@ -141,7 +132,6 @@ public class TaskExecuteThread implements Runnable, Delayed { this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; this.taskPluginManager = taskPluginManager; - this.processService = SpringApplicationContext.getBean(ProcessService.class); } @Override @@ -167,8 +157,7 @@ public class TaskExecuteThread implements Runnable, Delayed { } logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId()); - TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); - int dryRun = taskInstance.getDryRun(); + int dryRun = taskExecutionContext.getDryRun(); // copy hdfs/minio file to local if (dryRun == Constants.DRY_RUN_FLAG_NO) { downloadResource(taskExecutionContext.getExecutePath(), @@ -294,10 +283,7 @@ public class TaskExecuteThread implements Runnable, Delayed { if (task != null) { try { task.cancelApplication(true); - TaskInstance taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId()); - if (taskInstance != null) { - ProcessUtils.killYarnJob(taskExecutionContext); - } + ProcessUtils.killYarnJob(taskExecutionContext); } catch (Exception e) { logger.error(e.getMessage(), e); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java index 9f73d82906..609566ab52 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/entity/TaskExecutionContext.java @@ -232,6 +232,11 @@ public class TaskExecutionContext implements Serializable { */ private String varPool; + /** + * dry run flag + */ + private int dryRun; + /** * business param */ @@ -552,6 +557,14 @@ public class TaskExecutionContext implements Serializable { this.sqoopTaskExecutionContext = sqoopTaskExecutionContext; } + public int getDryRun() { + return dryRun; + } + + public void setDryRun(int dryRun) { + this.dryRun = dryRun; + } + @Override public String toString() { return "TaskExecutionContext{" @@ -579,6 +592,7 @@ public class TaskExecutionContext implements Serializable { + ", projectCode=" + projectCode + ", taskParams='" + taskParams + '\'' + ", envFile='" + envFile + '\'' + + ", dryRun='" + dryRun + '\'' + ", definedParams=" + definedParams + ", taskAppId='" + taskAppId + '\'' + ", taskTimeoutStrategy=" + taskTimeoutStrategy