From 1a78c1150c89d3bbea9029bcc9b2489b6460b278 Mon Sep 17 00:00:00 2001 From: "gabry.wu" Date: Tue, 19 May 2020 19:49:39 +0800 Subject: [PATCH] simply NettyExecutorManager.execute logic making it readable (#2744) * simply NettyExecutorManager.execute logic making it readable * fix NPE * remove unused import Co-authored-by: dailidong --- .../executor/NettyExecutorManager.java | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 7ded3b0056..6fc3f45ec4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; -import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -36,10 +35,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.util.*; /** * netty executor manager @@ -87,17 +83,11 @@ public class NettyExecutorManager extends AbstractExecutorManager{ */ @Override public Boolean execute(ExecutionContext context) throws ExecuteException { - - /** - * all nodes - */ - Set allNodes = getAllNodes(context); - - /** - * fail nodes - */ - Set failNodeSet = new HashSet<>(); - + LinkedList allNodes = new LinkedList<>(); + Set nodes = getAllNodes(context); + if (nodes != null) { + allNodes.addAll(nodes); + } /** * build command accord executeContext */ @@ -106,31 +96,27 @@ public class NettyExecutorManager extends AbstractExecutorManager{ /** * execute task host */ - Host host = context.getHost(); + String startHostAddress = context.getHost().getAddress(); + // remove start host address and add it to head + allNodes.remove(startHostAddress); + allNodes.addFirst(startHostAddress); + boolean success = false; - while (!success) { + for (String address : allNodes) { try { - doExecute(host,command); + Host host = Host.of(address); + doExecute(host, command); success = true; context.setHost(host); + break; } catch (ExecuteException ex) { - logger.error(String.format("execute command : %s error", command), ex); - try { - failNodeSet.add(host.getAddress()); - Set tmpAllIps = new HashSet<>(allNodes); - Collection remained = CollectionUtils.subtract(tmpAllIps, failNodeSet); - if (remained != null && remained.size() > 0) { - host = Host.of(remained.iterator().next()); - logger.error("retry execute command : {} host : {}", command, host); - } else { - throw new ExecuteException("fail after try all nodes"); - } - } catch (Throwable t) { - throw new ExecuteException("fail after try all nodes"); - } + logger.error("retry execute command : {} host : {}", command, address); } } - + if (!success) { + throw new ExecuteException("fail after try all nodes"); + } + return success; }