diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 3b578f72e2..27d527a59a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -121,7 +121,7 @@ public class MasterBaseTaskExecThread implements Callable { // TODO send task to worker public void sendToWorker(String taskInstanceJson){ - final Address address = new Address("192.168.220.247", 12346); + final Address address = new Address("127.0.0.1", 12346); ExecuteTaskRequestCommand command = new ExecuteTaskRequestCommand(taskInstanceJson); try { Command response = nettyRemotingClient.sendSync(address, command.convert2Command(), Integer.MAX_VALUE); @@ -155,6 +155,7 @@ public class MasterBaseTaskExecThread implements Callable { if(submitDB && !submitQueue){ // submit task to queue sendToWorker(JSONObject.toJSONString(task)); + submitQueue = true; } if(submitDB && submitQueue){ return task; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index af5402ad0a..19cb417e30 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.processor.WorkerNettyRequestProcessor; +import org.apache.dolphinscheduler.server.worker.processor.WorkerRequestProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; -import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread; import org.apache.dolphinscheduler.server.zk.ZKWorkerClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -169,7 +168,7 @@ public class WorkerServer implements IStoppable { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerNettyRequestProcessor(processService)); + this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.start(); // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java index 95345c0b95..e3d893fb7b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/CallbackChannel.java @@ -19,11 +19,19 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; - +/** + * callback channel + */ public class CallbackChannel { + /** + * channel + */ private Channel channel; + /** + * equest unique identification + */ private long opaque; public CallbackChannel(Channel channel, long opaque) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java similarity index 96% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 0480d94716..762ee18c45 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskInstanceCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import java.util.concurrent.ConcurrentHashMap; -public class TaskInstanceCallbackService { +public class TaskCallbackService { private static final ConcurrentHashMap CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); @@ -56,7 +56,7 @@ public class TaskInstanceCallbackService { public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); - callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ + callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(0)).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java similarity index 87% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 61df9fb66c..be6b95e489 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerNettyRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -44,22 +43,36 @@ import org.slf4j.LoggerFactory; import java.util.Date; import java.util.concurrent.ExecutorService; +/** + * worker request processor + */ +public class WorkerRequestProcessor implements NettyRequestProcessor { -public class WorkerNettyRequestProcessor implements NettyRequestProcessor { - - private final Logger logger = LoggerFactory.getLogger(WorkerNettyRequestProcessor.class); + private final Logger logger = LoggerFactory.getLogger(WorkerRequestProcessor.class); + /** + * process service + */ private final ProcessService processService; + /** + * thread executor service + */ private final ExecutorService workerExecService; + /** + * worker config + */ private final WorkerConfig workerConfig; - private final TaskInstanceCallbackService taskInstanceCallbackService; + /** + * task callback service + */ + private final TaskCallbackService taskCallbackService; - public WorkerNettyRequestProcessor(ProcessService processService){ + public WorkerRequestProcessor(ProcessService processService){ this.processService = processService; - this.taskInstanceCallbackService = new TaskInstanceCallbackService(); + this.taskCallbackService = new TaskCallbackService(); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); } @@ -68,7 +81,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - logger.debug("received command : {}", command); + logger.info("received command : {}", command); ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( command.getBody(), ExecuteTaskRequestCommand.class); @@ -79,7 +92,7 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId()); - //TODO 需要干掉,然后移到master里面。 + //TODO this logic need add to master int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId(); Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId); // verify tenant is null @@ -91,7 +104,8 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); - //TODO 到这里。 + //TODO end + // local execute path String execLocalPath = getExecLocalPath(taskInstance); logger.info("task instance local execute path : {} ", execLocalPath); @@ -102,9 +116,13 @@ public class WorkerNettyRequestProcessor implements NettyRequestProcessor { } catch (Exception ex){ logger.error(String.format("create execLocalPath : %s", execLocalPath), ex); } + + taskCallbackService.addCallbackChannel(taskInstance.getId(), + new CallbackChannel(channel, command.getOpaque())); + // submit task - taskInstanceCallbackService.addCallbackChannel(taskInstance.getId(), new CallbackChannel(channel, command.getOpaque())); - workerExecService.submit(new TaskScheduleThread(taskInstance, processService, taskInstanceCallbackService)); + workerExecService.submit(new TaskScheduleThread(taskInstance, + processService, taskCallbackService)); ExecuteTaskResponseCommand executeTaskResponseCommand = new ExecuteTaskResponseCommand(taskInstance.getId()); channel.writeAndFlush(executeTaskResponseCommand.convert2Command(command.getOpaque())); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java index cb777c6bac..3488fe0031 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java @@ -35,10 +35,9 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.server.worker.processor.TaskInstanceCallbackService; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; import org.apache.dolphinscheduler.server.worker.task.TaskProps; @@ -80,7 +79,7 @@ public class TaskScheduleThread implements Runnable { /** * task instance callback service */ - private TaskInstanceCallbackService taskInstanceCallbackService; + private TaskCallbackService taskInstanceCallbackService; /** * constructor @@ -88,7 +87,7 @@ public class TaskScheduleThread implements Runnable { * @param taskInstance task instance * @param processService process dao */ - public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskInstanceCallbackService taskInstanceCallbackService){ + public TaskScheduleThread(TaskInstance taskInstance, ProcessService processService, TaskCallbackService taskInstanceCallbackService){ this.processService = processService; this.taskInstance = taskInstance; this.taskInstanceCallbackService = taskInstanceCallbackService; @@ -105,7 +104,7 @@ public class TaskScheduleThread implements Runnable { try { // tell master that task is in executing ExecuteTaskAckCommand ackCommand = buildAckCommand(taskInstance.getTaskType()); -// taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); + taskInstanceCallbackService.sendAck(taskInstance.getId(), ackCommand); logger.info("script path : {}", taskInstance.getExecutePath()); // task node @@ -182,7 +181,7 @@ public class TaskScheduleThread implements Runnable { responseCommand.setEndTime(new Date()); } finally { -// taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); + taskInstanceCallbackService.sendResult(taskInstance.getId(), responseCommand); } logger.info("task instance id : {},task final status : {}",