diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 19cb417e30..340a11adf2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -171,7 +171,7 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.start(); - // TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry + // TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry // this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); // this.workerRegistry.registry(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 2b6ed957d0..cd62e98a9b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -26,14 +26,30 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import java.util.concurrent.ConcurrentHashMap; +/** + * taks callback service + */ public class TaskCallbackService { + /** + * callback channels + */ private static final ConcurrentHashMap CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); + /** + * add callback channel + * @param taskInstanceId taskInstanceId + * @param channel channel + */ public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){ CALL_BACK_CHANNELS.put(taskInstanceId, channel); } + /** + * get callback channel + * @param taskInstanceId taskInstanceId + * @return callback channel + */ public CallbackChannel getCallbackChannel(int taskInstanceId){ CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId); if(callbackChannel.getChannel().isActive()){ @@ -45,15 +61,30 @@ public class TaskCallbackService { return callbackChannel; } + /** + * remove callback channels + * @param taskInstanceId taskInstanceId + */ public void remove(int taskInstanceId){ CALL_BACK_CHANNELS.remove(taskInstanceId); } + /** + * send ack + * @param taskInstanceId taskInstanceId + * @param ackCommand ackCommand + */ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque())); } + /** + * send result + * + * @param taskInstanceId taskInstanceId + * @param responseCommand responseCommand + */ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command( diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java index 0a1b7c4837..8ea9ccbe5b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java @@ -125,6 +125,12 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { processService, taskCallbackService)); } + /** + * whehter tenant is null + * @param tenant tenant + * @param taskInstance taskInstance + * @return result + */ private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { if(tenant == null){ logger.error("tenant not exists,process instance id : {},task instance id : {}", @@ -135,6 +141,11 @@ public class WorkerRequestProcessor implements NettyRequestProcessor { return false; } + /** + * get execute local path + * @param taskInstance taskInstance + * @return execute local path + */ private String getExecLocalPath(TaskInstance taskInstance){ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), taskInstance.getProcessDefine().getId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 396b009c01..fc81638705 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -24,20 +24,36 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +/** + * worker registry + */ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); + /** + * zookeeper registry center + */ private final ZookeeperRegistryCenter zookeeperRegistryCenter; + /** + * port + */ private final int port; + /** + * construct + * @param zookeeperRegistryCenter zookeeperRegistryCenter + * @param port port + */ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.port = port; } + /** + * registry + */ public void registry() { String address = Constants.LOCAL_ADDRESS; String localNodePath = getWorkerPath(); @@ -58,6 +74,9 @@ public class WorkerRegistry { logger.info("scheduler node : {} registry to ZK successfully.", address); } + /** + * remove registry info + */ public void unRegistry() { String address = getLocalAddress(); String localNodePath = getWorkerPath(); @@ -65,12 +84,20 @@ public class WorkerRegistry { logger.info("worker node : {} unRegistry to ZK.", address); } + /** + * get worker path + * @return + */ private String getWorkerPath() { String address = getLocalAddress(); String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; return localNodePath; } + /** + * get local address + * @return + */ private String getLocalAddress(){ return Constants.LOCAL_ADDRESS + ":" + port; }