Browse Source

fix bug: send task result to master until success. (#2839)

* fix bug: send task result to master until success.

* fix bug: send task result to master until success.

* add sleep

* set null if send result failed

* set null if send result failed

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
68bc59765b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -102,20 +102,23 @@ public class TaskCallbackService {
while (Stopper.isRunning()) {
masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if (CollectionUtils.isEmpty(masterNodes)) {
masterNodes = null;
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}else {
logger.error("find {} masters for task : {}.",
masterNodes.size(),
taskInstanceId);
break;
continue;
}
}
for(String masterNode : masterNodes){
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
if(newChannel != null){
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
logger.info("find {} masters for task : {}.",
masterNodes.size(),
taskInstanceId);
for (String masterNode : masterNodes) {
newChannel = nettyRemotingClient.getChannel(Host.of(masterNode));
if (newChannel != null) {
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
}
}
masterNodes = null;
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId));
}

Loading…
Cancel
Save