From 009fd01047283ca11d60714b128c3b52d61fc705 Mon Sep 17 00:00:00 2001 From: yaoyao <42244568+1445103710@users.noreply.github.com> Date: Sat, 30 May 2020 23:23:01 +0800 Subject: [PATCH] fix [BUG] TaskExecutionContextCacheManagerImpl Do not execute removeByTaskInstanceId #2745 (#2754) --- .../server/worker/processor/TaskKillProcessor.java | 1 + .../server/worker/runner/TaskExecuteThread.java | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index b6f58279b1..cf0c051e02 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -93,6 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); + taskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index d314c5535d..d2d783ac85 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -27,9 +27,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +66,11 @@ public class TaskExecuteThread implements Runnable { */ private TaskCallbackService taskCallbackService; + /** + * taskExecutionContextCacheManager + */ + private TaskExecutionContextCacheManager taskExecutionContextCacheManager; + /** * constructor * @param taskExecutionContext taskExecutionContext @@ -71,6 +79,7 @@ public class TaskExecuteThread implements Runnable { public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService){ this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; + this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } @Override @@ -134,6 +143,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setAppIds(task.getAppIds()); } finally { try { + taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); }catch (Exception e){ ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);