From b52542f250f2339bf100835471b9074d2739d4ab Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 3 Mar 2020 17:58:22 +0800 Subject: [PATCH 1/2] master kill task refactor (#2069) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand --- .../server/master/MasterServer.java | 2 +- .../executor/NettyExecutorManager.java | 28 +++++++++++++++++-- .../processor/TaskKillResponseProcessor.java | 3 +- .../master/runner/MasterTaskExecThread.java | 2 +- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 9c46ad66fd..6923cd02e1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -128,7 +128,7 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.start(); // diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 286b0e6391..7719cf82d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; @@ -71,7 +72,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ */ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); } @@ -130,8 +131,9 @@ public class NettyExecutorManager extends AbstractExecutorManager{ return success; } + @Override public void executeDirectly(ExecutionContext context) throws ExecuteException { - Command command = buildCommand(context); + Command command = buildKillCommand(context); Host host = context.getHost(); doExecute(host,command); } @@ -158,6 +160,28 @@ public class NettyExecutorManager extends AbstractExecutorManager{ return requestCommand.convert2Command(); } + /** + * build command + * @param context context + * @return command + */ + private Command buildKillCommand(ExecutionContext context) { + TaskKillRequestCommand requestCommand = new TaskKillRequestCommand(); + ExecutorType executorType = context.getExecutorType(); + switch (executorType){ + case WORKER: + TaskExecutionContext taskExecutionContext = context.getContext(); + requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); + break; + case CLIENT: + break; + default: + throw new IllegalArgumentException("invalid executor type : " + executorType); + + } + return requestCommand.convert2Command(); + } + /** * execute logic * @param host host diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index 4986e89e67..3e8cdfdadc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -46,8 +46,7 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class); - logger.info("received command : {}", responseCommand); - logger.info("已经接受到了worker杀任务的回应"); + logger.info("received task kill response command : {}", responseCommand); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 995a4e8c21..51a4f44cda 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; @@ -183,7 +184,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { alreadyKilled = true; TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance); - ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER); Host host = new Host(); From 72e8f8d195ebd47869600017bc220f19088930fb Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 3 Mar 2020 22:18:51 +0800 Subject: [PATCH 2/2] host add host:port format (#2070) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * taskInstanceCache is null ,need load from db * 1,worker TaskPros use TaskExecutionContext replase 2,Master kill Task , KillTaskProcessor modify * worker remove db * ShellTask modify * master persistence processId and appIds * master persistence processId and appIds * master add kill task logic * master add kill task logic * master add kill task logic * javadoc error modify * remove chinese log * executeDirectly method add Override * remote module modify * TaskKillResponseProcessor command type modify * create buildKillCommand * host add host:port format * host add host:port format --- .../api/service/LoggerService.java | 5 +- .../dao/entity/TaskInstance.java | 10 -- .../builder/TaskExecutionContextBuilder.java | 1 - .../server/entity/TaskExecutionContext.java | 139 +++++++++--------- .../master/dispatch/ExecutorDispatcher.java | 1 + .../master/runner/MasterTaskExecThread.java | 4 +- .../processor/TaskExecuteProcessor.java | 6 +- 7 files changed, 77 insertions(+), 89 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index 1f65208240..91316af455 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -68,7 +69,7 @@ public class LoggerService { return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg()); } - String host = taskInstance.getHost(); + String host = Host.of(taskInstance.getHost()).getIp(); if(StringUtils.isEmpty(host)){ return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg()); } @@ -94,7 +95,7 @@ public class LoggerService { if (taskInstance == null){ throw new RuntimeException("task instance is null"); } - String host = taskInstance.getHost(); + String host = Host.of(taskInstance.getHost()).getIp(); return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 92cb3af220..e444ad2221 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -186,21 +186,11 @@ public class TaskInstance implements Serializable { @TableField(exist = false) private String dependentResult; - /** * worker group id */ private int workerGroupId; - - - public void init(String host,Date startTime,String executePath){ - this.host = host; - this.startTime = startTime; - this.executePath = executePath; - } - - public ProcessInstance getProcessInstance() { return processInstance; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 1388e79c5a..34d96aa4d7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -48,7 +48,6 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); - taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); return this; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index fb3aab9761..2348b47f46 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -29,7 +29,7 @@ public class TaskExecutionContext implements Serializable{ /** * task id */ - private Integer taskInstanceId; + private int taskInstanceId; /** @@ -51,7 +51,7 @@ public class TaskExecutionContext implements Serializable{ * host */ private String host; - + /** * task execute path */ @@ -70,7 +70,7 @@ public class TaskExecutionContext implements Serializable{ /** * processId */ - private Integer processId; + private int processId; /** * appIds @@ -80,7 +80,7 @@ public class TaskExecutionContext implements Serializable{ /** * process instance id */ - private Integer processInstanceId; + private int processInstanceId; /** @@ -97,13 +97,13 @@ public class TaskExecutionContext implements Serializable{ /** * execute user id */ - private Integer executorId; + private int executorId; /** * command type if complement */ - private Integer cmdTypeIfComplement; + private int cmdTypeIfComplement; /** @@ -120,12 +120,12 @@ public class TaskExecutionContext implements Serializable{ /** * process define id */ - private Integer processDefineId; + private int processDefineId; /** * project id */ - private Integer projectId; + private int projectId; /** * taskParams @@ -173,22 +173,11 @@ public class TaskExecutionContext implements Serializable{ */ private DataxTaskExecutionContext dataxTaskExecutionContext; - - - - public String getWorkerGroup() { - return workerGroup; - } - - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; - } - - public Integer getTaskInstanceId() { + public int getTaskInstanceId() { return taskInstanceId; } - public void setTaskInstanceId(Integer taskInstanceId) { + public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } @@ -216,6 +205,14 @@ public class TaskExecutionContext implements Serializable{ this.taskType = taskType; } + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + public String getExecutePath() { return executePath; } @@ -224,6 +221,14 @@ public class TaskExecutionContext implements Serializable{ this.executePath = executePath; } + public String getLogPath() { + return logPath; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + public String getTaskJson() { return taskJson; } @@ -232,11 +237,27 @@ public class TaskExecutionContext implements Serializable{ this.taskJson = taskJson; } - public Integer getProcessInstanceId() { + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public String getAppIds() { + return appIds; + } + + public void setAppIds(String appIds) { + this.appIds = appIds; + } + + public int getProcessInstanceId() { return processInstanceId; } - public void setProcessInstanceId(Integer processInstanceId) { + public void setProcessInstanceId(int processInstanceId) { this.processInstanceId = processInstanceId; } @@ -256,6 +277,22 @@ public class TaskExecutionContext implements Serializable{ this.globalParams = globalParams; } + public int getExecutorId() { + return executorId; + } + + public void setExecutorId(int executorId) { + this.executorId = executorId; + } + + public int getCmdTypeIfComplement() { + return cmdTypeIfComplement; + } + + public void setCmdTypeIfComplement(int cmdTypeIfComplement) { + this.cmdTypeIfComplement = cmdTypeIfComplement; + } + public String getTenantCode() { return tenantCode; } @@ -272,46 +309,22 @@ public class TaskExecutionContext implements Serializable{ this.queue = queue; } - public Integer getProcessDefineId() { + public int getProcessDefineId() { return processDefineId; } - public void setProcessDefineId(Integer processDefineId) { + public void setProcessDefineId(int processDefineId) { this.processDefineId = processDefineId; } - public Integer getProjectId() { + public int getProjectId() { return projectId; } - public void setProjectId(Integer projectId) { + public void setProjectId(int projectId) { this.projectId = projectId; } - public Integer getExecutorId() { - return executorId; - } - - public void setExecutorId(Integer executorId) { - this.executorId = executorId; - } - - public Integer getCmdTypeIfComplement() { - return cmdTypeIfComplement; - } - - public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) { - this.cmdTypeIfComplement = cmdTypeIfComplement; - } - - public String getLogPath() { - return logPath; - } - - public void setLogPath(String logPath) { - this.logPath = logPath; - } - public String getTaskParams() { return taskParams; } @@ -360,28 +373,12 @@ public class TaskExecutionContext implements Serializable{ this.taskTimeout = taskTimeout; } - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public Integer getProcessId() { - return processId; - } - - public void setProcessId(Integer processId) { - this.processId = processId; - } - - public String getAppIds() { - return appIds; + public String getWorkerGroup() { + return workerGroup; } - public void setAppIds(String appIds) { - this.appIds = appIds; + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; } public SQLTaskExecutionContext getSqlTaskExecutionContext() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index df563a6209..97b489ef1b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -90,6 +90,7 @@ public class ExecutorDispatcher implements InitializingBean { throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext())); } context.setHost(host); + context.getContext().setHost(host.getAddress()); executorManager.beforeExecute(context); try { /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 51a4f44cda..a19683215b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -186,9 +186,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance); ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER); - Host host = new Host(); - host.setIp(taskInstance.getHost()); - host.setPort(12346); + Host host = Host.of(taskInstance.getHost()); executionContext.setHost(host); nettyExecutorManager.executeDirectly(executionContext); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index b53ef1720b..5042c97baf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -78,10 +78,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - logger.info("received command : {}", command); + TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( command.getBody(), TaskExecuteRequestCommand.class); + logger.info("received command : {}", taskRequestCommand); + String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); @@ -141,7 +143,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); - ackCommand.setHost(OSUtils.getHost()); + ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setStartTime(new Date()); if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ ackCommand.setExecutePath(null);