From f0f8644753c17dbc117f494e508bc3c973241da0 Mon Sep 17 00:00:00 2001 From: Tboy Date: Tue, 25 Feb 2020 14:25:37 +0800 Subject: [PATCH 1/4] Refactor worker (#7) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei --- .../master/dispatch/ExecutorDispatcher.java | 8 +++---- .../executor/AbstractExecutorManager.java | 2 +- .../dispatch/executor/ExecutorManager.java | 4 ++-- .../executor/NettyExecutorManager.java | 12 +++++++--- .../runner/MasterBaseTaskExecThread.java | 22 +++++-------------- 5 files changed, 21 insertions(+), 27 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 01fb840303..8a803a2d0f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -50,7 +50,7 @@ public class ExecutorDispatcher implements InitializingBean { @Autowired private RoundRobinHostManager hostManager; - private final ConcurrentHashMap executorManagers; + private final ConcurrentHashMap> executorManagers; public ExecutorDispatcher(){ this.executorManagers = new ConcurrentHashMap<>(); @@ -61,11 +61,11 @@ public class ExecutorDispatcher implements InitializingBean { * @param context context * @throws ExecuteException */ - public void dispatch(final ExecutionContext context) throws ExecuteException { + public Boolean dispatch(final ExecutionContext context) throws ExecuteException { /** * get executor manager */ - ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); + ExecutorManager executorManager = this.executorManagers.get(context.getExecutorType()); if(executorManager == null){ throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType()); } @@ -83,7 +83,7 @@ public class ExecutorDispatcher implements InitializingBean { /** * task execute */ - executorManager.execute(context); + return executorManager.execute(context); } finally { executorManager.afterExecute(context); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java index e1f0c3c976..c0be5a875f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/AbstractExecutorManager.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce /** * abstract executor manager */ -public abstract class AbstractExecutorManager implements ExecutorManager{ +public abstract class AbstractExecutorManager implements ExecutorManager{ /** * before execute , add time monitor , timeout diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java index 1d78d2f08f..9b0b9af0e4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce /** * executor manager */ -public interface ExecutorManager { +public interface ExecutorManager { /** * before execute @@ -37,7 +37,7 @@ public interface ExecutorManager { * @param context context * @throws ExecuteException */ - void execute(ExecutionContext context) throws ExecuteException; + T execute(ExecutionContext context) throws ExecuteException; /** * after execute diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index cf1a2646a6..bdfe71cf5f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -46,7 +46,7 @@ import java.util.Set; * netty executor manager */ @Service -public class NettyExecutorManager extends AbstractExecutorManager{ +public class NettyExecutorManager extends AbstractExecutorManager{ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); @@ -64,6 +64,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{ public NettyExecutorManager(){ final NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + /** + * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor + * register EXECUTE_TASK_ACK command type TaskAckProcessor + */ this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); } @@ -74,7 +78,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ * @throws ExecuteException */ @Override - public void execute(ExecutionContext context) throws ExecuteException { + public Boolean execute(ExecutionContext context) throws ExecuteException { /** * all nodes @@ -118,6 +122,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{ } } } + + return success; } /** @@ -189,7 +195,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ break; case CLIENT: break; - default: + default: throw new IllegalArgumentException("invalid executor type : " + executorType); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index f675493bf3..9bf69ddec8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -20,22 +20,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.utils.BeanContext; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.exceptions.RemotingException; -import org.apache.dolphinscheduler.remote.future.InvokeCallback; -import org.apache.dolphinscheduler.remote.future.ResponseFuture; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -48,7 +36,6 @@ import org.apache.dolphinscheduler.service.queue.ITaskQueue; import org.apache.dolphinscheduler.service.queue.TaskQueueFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.Callable; @@ -138,14 +125,16 @@ public class MasterBaseTaskExecThread implements Callable { * dispatch task to worker * @param taskInstance */ - public void dispatch(TaskInstance taskInstance){ + private Boolean dispatch(TaskInstance taskInstance){ TaskExecutionContext context = getTaskExecutionContext(taskInstance); ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER); try { - dispatcher.dispatch(executionContext); + return dispatcher.dispatch(executionContext); } catch (ExecuteException e) { logger.error("execute exception", e); + return false; } + } /** @@ -234,8 +223,7 @@ public class MasterBaseTaskExecThread implements Callable { } if(submitDB && !submitQueue){ // submit task to queue - dispatch(task); - submitQueue = true; + submitQueue = dispatch(task); } if(submitDB && submitQueue){ return task; From 67e7a6c849c7ec5c2df5a9ec0ad76eae5c5d4935 Mon Sep 17 00:00:00 2001 From: Technoboy- Date: Tue, 25 Feb 2020 22:51:03 +0800 Subject: [PATCH 3/4] add kill command --- .../remote/command/CommandType.java | 2 +- .../command/KillTaskRequestCommand.java | 1 + .../server/worker/WorkerServer.java | 6 +- ...ckChannel.java => NettyRemoteChannel.java} | 39 +++++-- .../worker/processor/TaskCallbackService.java | 84 ++++++++------ ...ocessor.java => TaskExecuteProcessor.java} | 71 ++++++++++-- .../worker/processor/TaskKillProcessor.java | 103 ++++++++++++++++++ .../server/worker/runner/FetchTaskThread.java | 2 +- ...duleThread.java => TaskExecuteThread.java} | 71 ++---------- 9 files changed, 264 insertions(+), 115 deletions(-) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/{CallbackChannel.java => NettyRemoteChannel.java} (60%) rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/{WorkerRequestProcessor.java => TaskExecuteProcessor.java} (55%) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java rename dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/{TaskScheduleThread.java => TaskExecuteThread.java} (76%) diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 79ef2d9b2a..053b38f5a2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * kill task */ KILL_TASK_REQUEST, /** * kill task response */ KILL_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java new file mode 100644 index 0000000000..3ece6508de --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java @@ -0,0 +1 @@ +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { private int taskInstanceId; private int processId; private String host; private String tenantCode; private String logPath; private String executePath; public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getTenantCode() { return tenantCode; } public void setTenantCode(String tenantCode) { this.tenantCode = tenantCode; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 2625d68c1e..c2af7b12a6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -35,7 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -168,7 +169,8 @@ public class WorkerServer implements IStoppable { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); + this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor(processService)); + this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java similarity index 60% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java index e3d893fb7b..cbb8972a33 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java @@ -18,24 +18,35 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.utils.ChannelUtils; +import org.apache.dolphinscheduler.remote.utils.Host; /** * callback channel */ -public class CallbackChannel { +public class NettyRemoteChannel { /** * channel */ - private Channel channel; + private final Channel channel; /** * equest unique identification */ - private long opaque; + private final long opaque; - public CallbackChannel(Channel channel, long opaque) { + /** + * master host + */ + private final Host host; + + + public NettyRemoteChannel(Channel channel, long opaque) { this.channel = channel; + this.host = ChannelUtils.toAddress(channel); this.opaque = opaque; } @@ -43,15 +54,23 @@ public class CallbackChannel { return channel; } - public void setChannel(Channel channel) { - this.channel = channel; - } - public long getOpaque() { return opaque; } - public void setOpaque(long opaque) { - this.opaque = opaque; + public Host getHost() { + return host; + } + + public boolean isActive(){ + return this.channel.isActive(); + } + + public ChannelFuture writeAndFlush(Command command){ + return this.channel.writeAndFlush(command); + } + + public void close(){ + this.channel.close(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 632d2f7624..23ac7e2ddc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -21,8 +21,12 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; @@ -31,18 +35,31 @@ import java.util.concurrent.ConcurrentHashMap; */ public class TaskCallbackService { + private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class); + + /** + * remote channels + */ + private static final ConcurrentHashMap REMOTE_CHANNELS = new ConcurrentHashMap<>(); + /** - * callback channels + * netty remoting client */ - private static final ConcurrentHashMap CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); + private final NettyRemotingClient nettyRemotingClient; + + + public TaskCallbackService(){ + final NettyClientConfig clientConfig = new NettyClientConfig(); + this.nettyRemotingClient = new NettyRemotingClient(clientConfig); + } /** * add callback channel * @param taskInstanceId taskInstanceId * @param channel channel */ - public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){ - CALL_BACK_CHANNELS.put(taskInstanceId, channel); + public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){ + REMOTE_CHANNELS.put(taskInstanceId, channel); } /** @@ -50,15 +67,18 @@ public class TaskCallbackService { * @param taskInstanceId taskInstanceId * @return callback channel */ - public CallbackChannel getCallbackChannel(int taskInstanceId){ - CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId); - if(callbackChannel.getChannel().isActive()){ - return callbackChannel; + public NettyRemoteChannel getRemoteChannel(int taskInstanceId){ + NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); + if(nettyRemoteChannel.isActive()){ + return nettyRemoteChannel; + } + Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); + if(newChannel != null){ + NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque()); + addRemoteChannel(taskInstanceId, remoteChannel); + return remoteChannel; } - Channel newChannel = createChannel(); - callbackChannel.setChannel(newChannel); - CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel); - return callbackChannel; + return null; } /** @@ -66,7 +86,7 @@ public class TaskCallbackService { * @param taskInstanceId taskInstanceId */ public void remove(int taskInstanceId){ - CALL_BACK_CHANNELS.remove(taskInstanceId); + REMOTE_CHANNELS.remove(taskInstanceId); } /** @@ -75,8 +95,12 @@ public class TaskCallbackService { * @param ackCommand ackCommand */ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ - CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); - callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command()); + NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); + if(nettyRemoteChannel == null){ + //TODO + } else{ + nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command()); + } } /** @@ -86,22 +110,20 @@ public class TaskCallbackService { * @param responseCommand responseCommand */ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ - CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); - callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ - remove(taskInstanceId); - return; - } - } - }); - } + NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); + if(nettyRemoteChannel == null){ + //TODO + } else{ + nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ - // TODO - private Channel createChannel(){ - return null; + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()){ + remove(taskInstanceId); + return; + } + } + }); + } } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java similarity index 55% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 39dc136de7..818e223d3d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -17,32 +17,41 @@ package org.apache.dolphinscheduler.server.worker.processor; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; import java.util.concurrent.ExecutorService; /** * worker request processor */ -public class WorkerRequestProcessor implements NettyRequestProcessor { +public class TaskExecuteProcessor implements NettyRequestProcessor { - private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class); /** * process service @@ -64,7 +73,7 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { */ private final TaskCallbackService taskCallbackService; - public WorkerRequestProcessor(ProcessService processService){ + public TaskExecuteProcessor(ProcessService processService){ this.processService = processService; this.taskCallbackService = new TaskCallbackService(); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); @@ -92,14 +101,62 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } - taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(), - new CallbackChannel(channel, command.getOpaque())); + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), + new NettyRemoteChannel(channel, command.getOpaque())); + this.doAck(taskExecutionContext); // submit task - workerExecService.submit(new TaskScheduleThread(taskExecutionContext, + workerExecService.submit(new TaskExecuteThread(taskExecutionContext, processService, taskCallbackService)); } + private void doAck(TaskExecutionContext taskExecutionContext){ + // tell master that task is in executing + ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext); + taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + } + + /** + * get task log path + * @return log path + */ + private String getTaskLogPath(TaskExecutionContext taskExecutionContext) { + String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) + .getLogger("ROOT") + .getAppender("TASKLOGFILE")) + .getDiscriminator()).getLogBase(); + if (baseLog.startsWith(Constants.SINGLE_SLASH)){ + return baseLog + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskExecutionContext.getTaskInstanceId() + ".log"; + } + return System.getProperty("user.dir") + Constants.SINGLE_SLASH + + baseLog + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + + taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskExecutionContext.getTaskInstanceId() + ".log"; + } + + /** + * build ack command + * @param taskExecutionContext taskExecutionContext + * @return ExecuteTaskAckCommand + */ + private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { + ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); + ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); + ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); + ackCommand.setHost(OSUtils.getHost()); + ackCommand.setStartTime(new Date()); + if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ + ackCommand.setExecutePath(null); + }else{ + ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); + } + return ackCommand; + } /** * get execute local path diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java new file mode 100644 index 0000000000..29853743fa --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.Preconditions; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.service.log.LogClientService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * task kill processor + */ +public class TaskKillProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class); + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); + logger.info("received command : {}", command); + KillTaskRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class); + doKill(killCommand); + } + + + private void doKill(KillTaskRequestCommand killCommand){ + try { + if(killCommand.getProcessId() == 0 ){ + logger.error("process kill failed, process id :{}, task id:{}", killCommand.getProcessId(), killCommand.getTaskInstanceId()); + return; + } + String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId())); + + logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd); + + OSUtils.exeCmd(cmd); + + // find log and kill yarn job + killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode()); + + } catch (Exception e) { + logger.error("kill task failed", e); + } + } + + public void killYarnJob(String host, String logPath, String executePath, String tenantCode) { + try { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + LogClientService logClient = null; + String log = null; + try { + logClient = new LogClientService(); + log = logClient.viewLog(host, Constants.RPC_PORT, logPath); + } finally { + if(logClient != null){ + logClient.close(); + } + } + if (StringUtils.isNotEmpty(log)) { + List appIds = LoggerUtils.getAppIds(log, logger); + if (StringUtils.isEmpty(executePath)) { + logger.error("task instance work dir is empty"); + throw new RuntimeException("task instance work dir is empty"); + } + if (appIds.size() > 0) { + ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); + } + } + + } catch (Exception e) { + logger.error("kill yarn job failure",e); + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 9e0c452a80..0498848488 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{ logger.info("task : {} ready to submit to task scheduler thread",taskInstId); // submit task -// workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); +// workerExecService.submit(new TaskExecuteThread(taskInstance, processService)); // remove node from zk removeNodeFromTaskQueue(taskQueueStr); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java similarity index 76% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 735e4ba563..be89401961 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -17,21 +17,15 @@ package org.apache.dolphinscheduler.server.worker.runner; -import ch.qos.logback.classic.LoggerContext; -import ch.qos.logback.classic.sift.SiftingAppender; import com.alibaba.fastjson.JSONObject; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; @@ -51,12 +45,12 @@ import java.util.stream.Collectors; /** * task scheduler thread */ -public class TaskScheduleThread implements Runnable { +public class TaskExecuteThread implements Runnable { /** * logger */ - private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class); + private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class); /** * task instance @@ -74,32 +68,27 @@ public class TaskScheduleThread implements Runnable { private AbstractTask task; /** - * task instance callback service + * task callback service */ - private TaskCallbackService taskInstanceCallbackService; + private TaskCallbackService taskCallbackService; /** * constructor * @param taskExecutionContext taskExecutionContext * @param processService processService - * @param taskInstanceCallbackService taskInstanceCallbackService + * @param taskCallbackService taskCallbackService */ - public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ + public TaskExecuteThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskCallbackService){ this.processService = processService; this.taskExecutionContext = taskExecutionContext; - this.taskInstanceCallbackService = taskInstanceCallbackService; + this.taskCallbackService = taskCallbackService; } @Override public void run() { ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId()); - try { - // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext); - taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); - logger.info("script path : {}", taskExecutionContext.getExecutePath()); // task node TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); @@ -163,7 +152,7 @@ public class TaskScheduleThread implements Runnable { responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); } finally { - taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); } } @@ -183,48 +172,6 @@ public class TaskScheduleThread implements Runnable { return globalParamsMap; } - /** - * build ack command - * @param taskExecutionContext taskExecutionContext - * @return ExecuteTaskAckCommand - */ - private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { - ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); - ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); - ackCommand.setLogPath(getTaskLogPath()); - ackCommand.setHost(OSUtils.getHost()); - ackCommand.setStartTime(new Date()); - if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ - ackCommand.setExecutePath(null); - }else{ - ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); - } - return ackCommand; - } - - /** - * get task log path - * @return log path - */ - private String getTaskLogPath() { - String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory()) - .getLogger("ROOT") - .getAppender("TASKLOGFILE")) - .getDiscriminator()).getLogBase(); - if (baseLog.startsWith(Constants.SINGLE_SLASH)){ - return baseLog + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskExecutionContext.getTaskInstanceId() + ".log"; - } - return System.getProperty("user.dir") + Constants.SINGLE_SLASH + - baseLog + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH + - taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH + - taskExecutionContext.getTaskInstanceId() + ".log"; - } - /** * set task timeout * @param taskProps @@ -259,8 +206,6 @@ public class TaskScheduleThread implements Runnable { } - - /** * kill task */