From 33bf550a33caa47b701b43a787e362dc7a4601f4 Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 25 Feb 2020 14:00:59 +0800 Subject: [PATCH] ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type --- .../master/dispatch/ExecutorDispatcher.java | 8 +++---- .../executor/AbstractExecutorManager.java | 2 +- .../dispatch/executor/ExecutorManager.java | 4 ++-- .../executor/NettyExecutorManager.java | 12 +++++++--- .../runner/MasterBaseTaskExecThread.java | 22 +++++-------------- 5 files changed, 21 insertions(+), 27 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 01fb840303..8a803a2d0f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean { @Autowired private RoundRobinHostManager hostManager; - private final ConcurrentHashMap executorManagers; + private final ConcurrentHashMap> executorManagers; public ExecutorDispatcher(){ this.executorManagers = new ConcurrentHashMap<>(); @@ -61,11 +61,11 @@ public class ExecutorDispatcher implements InitializingBean { * @param context context * @throws ExecuteException */ - public void dispatch(final ExecutionContext context) throws ExecuteException { + public Boolean dispatch(final ExecutionContext context) throws ExecuteException { /** * get executor manager */ - ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); + ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); if(executorManager == null){ throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType()); } @@ -83,7 +83,7 @@ public class ExecutorDispatcher implements InitializingBean { /** * task execute */ - executorManager.execute(context); + return executorManager.execute(context); } finally { executorManager.afterExecute(context); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java index e1f0c3c976..c0be5a875f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce /** * abstract executor manager */ -public abstract class AbstractExecutorManager implements ExecutorManager{ +public abstract class AbstractExecutorManager implements ExecutorManager{ /** * before execute , add time monitor , timeout diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java index 1d78d2f08f..9b0b9af0e4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce /** * executor manager */ -public interface ExecutorManager { +public interface ExecutorManager { /** * before execute @@ -37,7 +37,7 @@ public interface ExecutorManager { * @param context context * @throws ExecuteException */ - void execute(ExecutionContext context) throws ExecuteException; + T execute(ExecutionContext context) throws ExecuteException; /** * after execute diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index cf1a2646a6..bdfe71cf5f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -46,7 +46,7 @@ import java.util.Set; * netty executor manager */ @Service -public class NettyExecutorManager extends AbstractExecutorManager{ +public class NettyExecutorManager extends AbstractExecutorManager{ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); @@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{ public NettyExecutorManager(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + /** + * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor + * register EXECUTE_TASK_ACK command type TaskAckProcessor + */ this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); } @@ -74,7 +78,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ * @throws ExecuteException */ @Override - public void execute(ExecutionContext context) throws ExecuteException { + public Boolean execute(ExecutionContext context) throws ExecuteException { /** * all nodes @@ -118,6 +122,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{ } } } + + return success; } /** @@ -189,7 +195,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ break; case CLIENT: break; - default: + default: throw new IllegalArgumentException("invalid executor type : " + executorType); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f675493bf3..9bf69ddec8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -20,22 +20,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.utils.BeanContext; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.exceptions.RemotingException; -import org.apache.dolphinscheduler.remote.future.InvokeCallback; -import org.apache.dolphinscheduler.remote.future.ResponseFuture; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -48,7 +36,6 @@ import org.apache.dolphinscheduler.service.queue.ITaskQueue; import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.Callable; @@ -138,14 +125,16 @@ public class MasterBaseTaskExecThread implements Callable { * dispatch task to worker * @param taskInstance */ - public void dispatch(TaskInstance taskInstance){ + private Boolean dispatch(TaskInstance taskInstance){ TaskExecutionContext context = getTaskExecutionContext(taskInstance); ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER); try { - dispatcher.dispatch(executionContext); + return dispatcher.dispatch(executionContext); } catch (ExecuteException e) { logger.error("execute exception", e); + return false; } + } /** @@ -234,8 +223,7 @@ public class MasterBaseTaskExecThread implements Callable { } if(submitDB && !submitQueue){ // submit task to queue - dispatch(task); - submitQueue = true; + submitQueue = dispatch(task); } if(submitDB && submitQueue){ return task;