Browse Source

master/worker basic communication

pull/2/head
qiaozhanwei 5 years ago
parent
commit
06927c2cf4
  1. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  2. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  3. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java
  4. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  5. 42
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
  6. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -121,7 +121,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
// TODO send task to worker // TODO send task to worker
public void sendToWorker(String taskInstanceJson){ public void sendToWorker(String taskInstanceJson){
final Address address = new Address("192.168.220.247", 12346); final Address address = new Address("127.0.0.1", 12346);
ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson);
try { try {
Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE); Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE);
@ -155,6 +155,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
if(submitDB && !submitQueue){ if(submitDB && !submitQueue){
// submit task to queue // submit task to queue
sendToWorker(JSONObject.toJSONString(task)); sendToWorker(JSONObject.toJSONString(task));
submitQueue = true;
} }
if(submitDB && submitQueue){ if(submitDB && submitQueue){
return task; return task;

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -169,7 +168,7 @@ public class WorkerServer implements IStoppable {
//init remoting server //init remoting server
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService));
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java

@ -19,11 +19,19 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel; import io.netty.channel.Channel;
/**
* callback channel
*/
public class CallbackChannel { public class CallbackChannel {
/**
* channel
*/
private Channel channel; private Channel channel;
/**
* equest unique identification
*/
private long opaque; private long opaque;
public CallbackChannel(Channel channel, long opaque) { public CallbackChannel(Channel channel, long opaque) {

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
public class TaskInstanceCallbackService { public class TaskCallbackService {
private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
@ -56,7 +56,7 @@ public class TaskInstanceCallbackService {
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(0)).addListener(new ChannelFutureListener(){
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {

42
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java → dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java

@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -44,22 +43,36 @@ import org.slf4j.LoggerFactory;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
/**
* worker request processor
*/
public class WorkerRequestProcessor implements NettyRequestProcessor {
public class WorkerNettyRequestProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class);
private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class);
/**
* process service
*/
private final ProcessService processService; private final ProcessService processService;
/**
* thread executor service
*/
private final ExecutorService workerExecService; private final ExecutorService workerExecService;
/**
* worker config
*/
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final TaskInstanceCallbackService taskInstanceCallbackService; /**
* task callback service
*/
private final TaskCallbackService taskCallbackService;
public WorkerNettyRequestProcessor(ProcessService processService){ public WorkerRequestProcessor(ProcessService processService){
this.processService = processService; this.processService = processService;
this.taskInstanceCallbackService = new TaskInstanceCallbackService(); this.taskCallbackService = new TaskCallbackService();
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
} }
@ -68,7 +81,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType())); String.format("invalid command type : %s", command.getType()));
logger.debug("received command : {}", command); logger.info("received command : {}", command);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class); command.getBody(), ExecuteTaskRequestCommand.class);
@ -79,7 +92,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
//TODO 需要干掉,然后移到master里面。 //TODO this logic need add to master
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null // verify tenant is null
@ -91,7 +104,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
//TODO 到这里。 //TODO end
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskInstance); String execLocalPath = getExecLocalPath(taskInstance);
logger.info("task instance local execute path : {} ", execLocalPath); logger.info("task instance local execute path : {} ", execLocalPath);
@ -102,9 +116,13 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor {
} catch (Exception ex){ } catch (Exception ex){
logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); logger.error(String.format("create execLocalPath : %s", execLocalPath), ex);
} }
taskCallbackService.addCallbackChannel(taskInstance.getId(),
new CallbackChannel(channel, command.getOpaque()));
// submit task // submit task
taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque())); workerExecService.submit(new TaskScheduleThread(taskInstance,
workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); processService, taskCallbackService));
ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId());
channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque())); channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque()));

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -35,10 +35,9 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
@ -80,7 +79,7 @@ public class TaskScheduleThread implements Runnable {
/** /**
* task instance callback service * task instance callback service
*/ */
private TaskInstanceCallbackService taskInstanceCallbackService; private TaskCallbackService taskInstanceCallbackService;
/** /**
* constructor * constructor
@ -88,7 +87,7 @@ public class TaskScheduleThread implements Runnable {
* @param taskInstance task instance * @param taskInstance task instance
* @param processService process dao * @param processService process dao
*/ */
public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){ public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){
this.processService = processService; this.processService = processService;
this.taskInstance = taskInstance; this.taskInstance = taskInstance;
this.taskInstanceCallbackService = taskInstanceCallbackService; this.taskInstanceCallbackService = taskInstanceCallbackService;
@ -105,7 +104,7 @@ public class TaskScheduleThread implements Runnable {
try { try {
// tell master that task is in executing // tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType()); ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType());
// taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand);
logger.info("script path : {}", taskInstance.getExecutePath()); logger.info("script path : {}", taskInstance.getExecutePath());
// task node // task node
@ -182,7 +181,7 @@ public class TaskScheduleThread implements Runnable {
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
} finally { } finally {
// taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand);
} }
logger.info("task instance id : {},task final status : {}", logger.info("task instance id : {},task final status : {}",

Loading…
Cancel
Save