diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 2348b47f46..03ce0ee6bf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -17,6 +17,11 @@ package org.apache.dolphinscheduler.server.entity; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; + import java.io.Serializable; import java.util.Date; import java.util.Map; @@ -397,6 +402,18 @@ public class TaskExecutionContext implements Serializable{ this.dataxTaskExecutionContext = dataxTaskExecutionContext; } + public Command toCommand(){ + TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); + requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this)); + return requestCommand.convert2Command(); + } + + public Command toKillCommand(){ + TaskKillRequestCommand requestCommand = new TaskKillRequestCommand(); + requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this)); + return requestCommand.convert2Command(); + } + @Override public String toString() { return "TaskExecutionContext{" + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 97b489ef1b..38914e5ca7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.dispatch; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; @@ -45,9 +44,6 @@ public class ExecutorDispatcher implements InitializingBean { @Autowired private NettyExecutorManager nettyExecutorManager; - @Autowired - private MasterConfig masterConfig; - /** * round robin host manager */ @@ -87,10 +83,9 @@ public class ExecutorDispatcher implements InitializingBean { */ Host host = hostManager.select(context); if (StringUtils.isEmpty(host.getAddress())) { - throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext())); + throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getCommand())); } context.setHost(host); - context.getContext().setHost(host.getAddress()); executorManager.beforeExecute(context); try { /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java index 19124d3a0c..9d04511848 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/context/ExecutionContext.java @@ -17,8 +17,8 @@ package org.apache.dolphinscheduler.server.master.dispatch.context; +import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; /** @@ -32,30 +32,47 @@ public class ExecutionContext { private Host host; /** - * context + * command */ - private final TaskExecutionContext context; + private final Command command; /** * executor type : worker or client */ private final ExecutorType executorType; - public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) { - this.context = context; + /** + * worker group + */ + private String workerGroup; + + + public ExecutionContext(Command command, ExecutorType executorType) { + this.command = command; this.executorType = executorType; } - public String getWorkerGroup(){ - return context.getWorkerGroup(); + public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) { + this.command = command; + this.executorType = executorType; + this.workerGroup = workerGroup; + } + + public Command getCommand() { + return command; } public ExecutorType getExecutorType() { return executorType; } - public TaskExecutionContext getContext() { - return context; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + + public String getWorkerGroup(){ + return this.workerGroup; } public Host getHost() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 7719cf82d2..e3d45f0055 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -21,12 +21,8 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; @@ -98,7 +94,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ /** * build command accord executeContext */ - Command command = buildCommand(context); + Command command = context.getCommand(); /** * execute task host @@ -111,14 +107,14 @@ public class NettyExecutorManager extends AbstractExecutorManager{ success = true; context.setHost(host); } catch (ExecuteException ex) { - logger.error(String.format("execute context : %s error", context.getContext()), ex); + logger.error(String.format("execute command : %s error", command), ex); try { failNodeSet.add(host.getAddress()); Set tmpAllIps = new HashSet<>(allNodes); Collection remained = CollectionUtils.subtract(tmpAllIps, failNodeSet); if (remained != null && remained.size() > 0) { host = Host.of(remained.iterator().next()); - logger.error("retry execute context : {} host : {}", context.getContext(), host); + logger.error("retry execute command : {} host : {}", command, host); } else { throw new ExecuteException("fail after try all nodes"); } @@ -133,53 +129,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{ @Override public void executeDirectly(ExecutionContext context) throws ExecuteException { - Command command = buildKillCommand(context); Host host = context.getHost(); - doExecute(host,command); - } - - /** - * build command - * @param context context - * @return command - */ - private Command buildCommand(ExecutionContext context) { - TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); - ExecutorType executorType = context.getExecutorType(); - switch (executorType){ - case WORKER: - TaskExecutionContext taskExecutionContext = context.getContext(); - requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); - break; - case CLIENT: - break; - default: - throw new IllegalArgumentException("invalid executor type : " + executorType); - - } - return requestCommand.convert2Command(); - } - - /** - * build command - * @param context context - * @return command - */ - private Command buildKillCommand(ExecutionContext context) { - TaskKillRequestCommand requestCommand = new TaskKillRequestCommand(); - ExecutorType executorType = context.getExecutorType(); - switch (executorType){ - case WORKER: - TaskExecutionContext taskExecutionContext = context.getContext(); - requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); - break; - case CLIENT: - break; - default: - throw new IllegalArgumentException("invalid executor type : " + executorType); - - } - return requestCommand.convert2Command(); + doExecute(host, context.getCommand()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 9d40de98d8..71bb8f8c19 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; -import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE; /** * master task exec base class @@ -131,7 +130,7 @@ public class MasterBaseTaskExecThread implements Callable { */ private Boolean dispatch(TaskInstance taskInstance){ TaskExecutionContext context = getTaskExecutionContext(taskInstance); - ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER); + ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup()); try { return dispatcher.dispatch(executionContext); } catch (ExecuteException e) { @@ -227,8 +226,8 @@ public class MasterBaseTaskExecThread implements Callable { } } if(submitDB && !submitTask){ - // dispatcht task - submitTask = dispatchtTask(task); + // dispatch task + submitTask = dispatchTask(task); } if(submitDB && submitTask){ return task; @@ -254,7 +253,7 @@ public class MasterBaseTaskExecThread implements Callable { * @param taskInstance taskInstance * @return whether submit task success */ - public Boolean dispatchtTask(TaskInstance taskInstance) { + public Boolean dispatchTask(TaskInstance taskInstance) { try{ if(taskInstance.isSubProcess()){ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index a19683215b..fb3f8e9361 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import com.alibaba.fastjson.JSONObject; -import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; @@ -184,7 +183,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { alreadyKilled = true; TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance); - ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER); + ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup()); Host host = Host.of(taskInstance.getHost()); executionContext.setHost(host);