From 68bc59765b1f2d416fae0efc2e2f7a87d730b107 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Fri, 29 May 2020 15:43:05 +0800 Subject: [PATCH] fix bug: send task result to master until success. (#2839) * fix bug: send task result to master until success. * fix bug: send task result to master until success. * add sleep * set null if send result failed * set null if send result failed Co-authored-by: baoliang --- .../worker/processor/TaskCallbackService.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 173140792b..c308f3b23d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -102,20 +102,23 @@ public class TaskCallbackService { while (Stopper.isRunning()) { masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); if (CollectionUtils.isEmpty(masterNodes)) { + masterNodes = null; ThreadUtils.sleep(SLEEP_TIME_MILLIS); - }else { - logger.error("find {} masters for task : {}.", - masterNodes.size(), - taskInstanceId); - break; + continue; } - } - for(String masterNode : masterNodes){ - newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); - if(newChannel != null){ - return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + logger.info("find {} masters for task : {}.", + masterNodes.size(), + taskInstanceId); + for (String masterNode : masterNodes) { + newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); + if (newChannel != null) { + return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + } } + masterNodes = null; + ThreadUtils.sleep(SLEEP_TIME_MILLIS); } + throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId)); }