From 6c819ee16d575e62039d98183b93fd3a06c628ed Mon Sep 17 00:00:00 2001 From: zwZjut Date: Fri, 24 Dec 2021 16:04:15 +0800 Subject: [PATCH] [Bug] [dolphinscheduler-server] workflow is always running when worker sendResult success but master not received (#7610) * [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238) * to #7237 * rerun test Co-authored-by: honghuo.zw * chery-pick 05aef27 and handle conflicts * to #7065: fix ExecutorService and schedulerService (#7072) Co-authored-by: honghuo.zw * [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081 (#7082) * to #7081 * fix #7081 * to #7081 Co-authored-by: honghuo.zw * chery-pick 8ebe060 and handle conflicts * cherry-pick 1f18444 and handle conflicts * fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808) Co-authored-by: honghuo.zw Co-authored-by: Kirs * add default constructor (#6780) Co-authored-by: honghuo.zw * to #7108 (#7109) * to #7609 Co-authored-by: honghuo.zw Co-authored-by: Kirs --- .../server/worker/processor/DBTaskResponseProcessor.java | 6 ++++-- .../server/worker/processor/TaskCallbackService.java | 9 ++++----- .../server/worker/processor/TaskKillProcessor.java | 2 ++ 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java index 6da9fdd5c1..f9e206ee75 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java @@ -47,13 +47,15 @@ public class DBTaskResponseProcessor implements NettyRequestProcessor { DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject( command.getBody(), DBTaskResponseCommand.class); - if (taskResponseCommand == null){ + if (taskResponseCommand == null) { return; } - if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){ + if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId()); logger.debug("removeResponseCache: taskinstance id:{}", taskResponseCommand.getTaskInstanceId()); + TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId()); + logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 96ec36b176..09b2c3aa67 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -19,14 +19,13 @@ package org.apache.dolphinscheduler.server.worker.processor; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; -import org.apache.dolphinscheduler.service.registry.RegistryClient; + +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,7 +124,7 @@ public class TaskCallbackService { * * @param taskInstanceId taskInstanceId */ - public void remove(int taskInstanceId) { + public static void remove(int taskInstanceId) { REMOTE_CHANNELS.remove(taskInstanceId); } @@ -156,7 +155,7 @@ public class TaskCallbackService { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - remove(taskInstanceId); + // remove(taskInstanceId); return; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index c0ecd67385..4341b8c6f4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -99,6 +99,8 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId()); + TaskCallbackService.remove(killCommand.getTaskInstanceId()); + logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId()); } /**