Browse Source

Refactor worker (#2018)

* Refactor worker (#7)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* Refactor worker (#8)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* add kill command

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 4 years ago committed by GitHub
parent
commit
9dc9745a80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  2. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java
  3. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  4. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java
  5. 84
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  6. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  7. 103
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  8. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
  9. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack EXECUTE_TASK_ACK, /** * execute task ack */ */ EXECUTE_TASK_ACK, /** /** * execute task response */ EXECUTE /** * kill task response */ KILL_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; }

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java

@ -0,0 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { private int taskInstanceId; private int processId; private String host; private String tenantCode; private String logPath; private String executePath; public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getTenantCode() { return tenantCode; } public void setTenantCode(String tenantCode) { this.tenantCode = tenantCode; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -35,7 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -168,7 +169,8 @@ public class WorkerServer implements IStoppable {
//init remoting server //init remoting server
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor(processService));
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/NettyRemoteChannel.java

@ -18,24 +18,35 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
/** /**
* callback channel * callback channel
*/ */
public class CallbackChannel { public class NettyRemoteChannel {
/** /**
* channel * channel
*/ */
private Channel channel; private final Channel channel;
/** /**
* equest unique identification * equest unique identification
*/ */
private long opaque; private final long opaque;
public CallbackChannel(Channel channel, long opaque) { /**
* master host
*/
private final Host host;
public NettyRemoteChannel(Channel channel, long opaque) {
this.channel = channel; this.channel = channel;
this.host = ChannelUtils.toAddress(channel);
this.opaque = opaque; this.opaque = opaque;
} }
@ -43,15 +54,23 @@ public class CallbackChannel {
return channel; return channel;
} }
public void setChannel(Channel channel) {
this.channel = channel;
}
public long getOpaque() { public long getOpaque() {
return opaque; return opaque;
} }
public void setOpaque(long opaque) { public Host getHost() {
this.opaque = opaque; return host;
}
public boolean isActive(){
return this.channel.isActive();
}
public ChannelFuture writeAndFlush(Command command){
return this.channel.writeAndFlush(command);
}
public void close(){
this.channel.close();
} }
} }

84
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -21,8 +21,12 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -31,18 +35,31 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class TaskCallbackService { public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
/**
* remote channels
*/
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/** /**
* callback channels * netty remoting client
*/ */
private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); private final NettyRemotingClient nettyRemotingClient;
public TaskCallbackService(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/** /**
* add callback channel * add callback channel
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @param channel channel * @param channel channel
*/ */
public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){ public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
CALL_BACK_CHANNELS.put(taskInstanceId, channel); REMOTE_CHANNELS.put(taskInstanceId, channel);
} }
/** /**
@ -50,15 +67,18 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
* @return callback channel * @return callback channel
*/ */
public CallbackChannel getCallbackChannel(int taskInstanceId){ public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId); NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if(callbackChannel.getChannel().isActive()){ if(nettyRemoteChannel.isActive()){
return callbackChannel; return nettyRemoteChannel;
}
Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
} }
Channel newChannel = createChannel(); return null;
callbackChannel.setChannel(newChannel);
CALL_BACK_CHANNELS.put(taskInstanceId, callbackChannel);
return callbackChannel;
} }
/** /**
@ -66,7 +86,7 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId * @param taskInstanceId taskInstanceId
*/ */
public void remove(int taskInstanceId){ public void remove(int taskInstanceId){
CALL_BACK_CHANNELS.remove(taskInstanceId); REMOTE_CHANNELS.remove(taskInstanceId);
} }
/** /**
@ -75,8 +95,12 @@ public class TaskCallbackService {
* @param ackCommand ackCommand * @param ackCommand ackCommand
*/ */
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command()); if(nettyRemoteChannel == null){
//TODO
} else{
nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
}
} }
/** /**
@ -86,22 +110,20 @@ public class TaskCallbackService {
* @param responseCommand responseCommand * @param responseCommand responseCommand
*/ */
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ if(nettyRemoteChannel == null){
//TODO
@Override } else{
public void operationComplete(ChannelFuture future) throws Exception { nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
if(future.isSuccess()){
remove(taskInstanceId);
return;
}
}
});
}
// TODO @Override
private Channel createChannel(){ public void operationComplete(ChannelFuture future) throws Exception {
return null; if(future.isSuccess()){
remove(taskInstanceId);
return;
}
}
});
}
} }
} }

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,32 +17,41 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskScheduleThread; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/** /**
* worker request processor * worker request processor
*/ */
public class WorkerRequestProcessor implements NettyRequestProcessor { public class TaskExecuteProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class); private final Logger logger = LoggerFactory.getLogger(TaskExecuteProcessor.class);
/** /**
* process service * process service
@ -64,7 +73,7 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
*/ */
private final TaskCallbackService taskCallbackService; private final TaskCallbackService taskCallbackService;
public WorkerRequestProcessor(ProcessService processService){ public TaskExecuteProcessor(ProcessService processService){
this.processService = processService; this.processService = processService;
this.taskCallbackService = new TaskCallbackService(); this.taskCallbackService = new TaskCallbackService();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
@ -92,14 +101,62 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){ } catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
} }
taskCallbackService.addCallbackChannel(taskExecutionContext.getTaskInstanceId(), taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new CallbackChannel(channel, command.getOpaque())); new NettyRemoteChannel(channel, command.getOpaque()));
this.doAck(taskExecutionContext);
// submit task // submit task
workerExecService.submit(new TaskScheduleThread(taskExecutionContext, workerExecService.submit(new TaskExecuteThread(taskExecutionContext,
processService, taskCallbackService)); processService, taskCallbackService));
} }
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
}
/**
* get task log path
* @return log path
*/
private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
*/
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
return ackCommand;
}
/** /**
* get execute local path * get execute local path

103
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* task kill processor
*/
public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
KillTaskRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
doKill(killCommand);
}
private void doKill(KillTaskRequestCommand killCommand){
try {
if(killCommand.getProcessId() == 0 ){
logger.error("process kill failed, process id :{}, task id:{}", killCommand.getProcessId(), killCommand.getTaskInstanceId());
return;
}
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId()));
logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
// find log and kill yarn job
killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode());
} catch (Exception e) {
logger.error("kill task failed", e);
}
}
public void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClientService logClient = null;
String log = null;
try {
logClient = new LogClientService();
log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
} finally {
if(logClient != null){
logClient.close();
}
}
if (StringUtils.isNotEmpty(log)) {
List<String> appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) {
logger.error("task instance work dir is empty");
throw new RuntimeException("task instance work dir is empty");
}
if (appIds.size() > 0) {
ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath);
}
}
} catch (Exception e) {
logger.error("kill yarn job failure",e);
}
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -235,7 +235,7 @@ public class FetchTaskThread implements Runnable{
logger.info("task : {} ready to submit to task scheduler thread",taskInstId); logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task // submit task
// workerExecService.submit(new TaskScheduleThread(taskInstance, processService)); // workerExecService.submit(new TaskExecuteThread(taskInstance, processService));
// remove node from zk // remove node from zk
removeNodeFromTaskQueue(taskQueueStr); removeNodeFromTaskQueue(taskQueueStr);

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -17,21 +17,15 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext; import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
@ -51,12 +45,12 @@ import java.util.stream.Collectors;
/** /**
* task scheduler thread * task scheduler thread
*/ */
public class TaskScheduleThread implements Runnable { public class TaskExecuteThread implements Runnable {
/** /**
* logger * logger
*/ */
private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class); private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
/** /**
* task instance * task instance
@ -74,32 +68,27 @@ public class TaskScheduleThread implements Runnable {
private AbstractTask task; private AbstractTask task;
/** /**
* task instance callback service * task callback service
*/ */
private TaskCallbackService taskInstanceCallbackService; private TaskCallbackService taskCallbackService;
/** /**
* constructor * constructor
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @param processService processService * @param processService processService
* @param taskInstanceCallbackService taskInstanceCallbackService * @param taskCallbackService taskCallbackService
*/ */
public TaskScheduleThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ public TaskExecuteThread(TaskExecutionContext taskExecutionContext, ProcessService processService, TaskCallbackService taskCallbackService){
this.processService = processService; this.processService = processService;
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.taskInstanceCallbackService = taskInstanceCallbackService; this.taskCallbackService = taskCallbackService;
} }
@Override @Override
public void run() { public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId()); ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
try { try {
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskInstanceCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
logger.info("script path : {}", taskExecutionContext.getExecutePath()); logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node // task node
TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class);
@ -163,7 +152,7 @@ public class TaskScheduleThread implements Runnable {
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
} finally { } finally {
taskInstanceCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
} }
} }
@ -183,48 +172,6 @@ public class TaskScheduleThread implements Runnable {
return globalParamsMap; return globalParamsMap;
} }
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
*/
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath());
ackCommand.setHost(OSUtils.getHost());
ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
}else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
return ackCommand;
}
/**
* get task log path
* @return log path
*/
private String getTaskLogPath() {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
/** /**
* set task timeout * set task timeout
* @param taskProps * @param taskProps
@ -259,8 +206,6 @@ public class TaskScheduleThread implements Runnable {
} }
/** /**
* kill task * kill task
*/ */
Loading…
Cancel
Save