From d74f6e71e4d6851fa09c47474e7341ec9ae1d578 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Mon, 21 Sep 2020 16:04:48 +0800 Subject: [PATCH] [Fix-3616][Server] when worker akc/response master exception , async retry (#3776) * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry * [Fix-3616][Server] when worker akc/response master exception , async retry Co-authored-by: qiaozhanwei --- .../server/worker/processor/TaskCallbackService.java | 3 +++ .../worker/runner/RetryReportTaskStatusThread.java | 10 ++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index ca7d3c643e..5703f18b52 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -69,6 +70,8 @@ public class TaskCallbackService { public TaskCallbackService(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index ea9bb03e16..ec79238d39 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -41,7 +41,7 @@ public class RetryReportTaskStatusThread implements Runnable { /** * every 5 minutes */ - private static long RETRY_REPORT_TASK_STATUS_TIME = 5 * 60 * 1000L; + private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L; /** * task callback service */ @@ -64,6 +64,10 @@ public class RetryReportTaskStatusThread implements Runnable { ResponceCache responceCache = ResponceCache.get(); while (Stopper.isRunning()){ + + // sleep 5 minutes + ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL); + try { if (!responceCache.getAckCache().isEmpty()){ Map ackCache = responceCache.getAckCache(); @@ -79,14 +83,12 @@ public class RetryReportTaskStatusThread implements Runnable { for (Map.Entry entry : responseCache.entrySet()){ Integer taskInstanceId = entry.getKey(); Command responseCommand = entry.getValue(); - taskCallbackService.sendAck(taskInstanceId,responseCommand); + taskCallbackService.sendResult(taskInstanceId,responseCommand); } } }catch (Exception e){ logger.warn("retry report task status error", e); } - - ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_TIME); } } }