Browse Source

add comment

pull/2/head
qiaozhanwei 4 years ago
parent
commit
d16a8aa370
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  2. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java
  3. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java
  4. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -171,7 +171,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService)); this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new WorkerRequestProcessor(processService));
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// TODO 注释掉,因为有心跳,可以复用心跳的逻辑,worker registry // TODO ,because there is a heartbeat, you can reuse the heartbeat logic,worker registry
// this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort()); // this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort());
// this.workerRegistry.registry(); // this.workerRegistry.registry();

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java

@ -26,14 +26,30 @@ import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/**
* taks callback service
*/
public class TaskCallbackService { public class TaskCallbackService {
/**
* callback channels
*/
private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Integer, CallbackChannel> CALL_BACK_CHANNELS = new ConcurrentHashMap<>();
/**
* add callback channel
* @param taskInstanceId taskInstanceId
* @param channel channel
*/
public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){ public void addCallbackChannel(int taskInstanceId, CallbackChannel channel){
CALL_BACK_CHANNELS.put(taskInstanceId, channel); CALL_BACK_CHANNELS.put(taskInstanceId, channel);
} }
/**
* get callback channel
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
public CallbackChannel getCallbackChannel(int taskInstanceId){ public CallbackChannel getCallbackChannel(int taskInstanceId){
CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId); CallbackChannel callbackChannel = CALL_BACK_CHANNELS.get(taskInstanceId);
if(callbackChannel.getChannel().isActive()){ if(callbackChannel.getChannel().isActive()){
@ -45,15 +61,30 @@ public class TaskCallbackService {
return callbackChannel; return callbackChannel;
} }
/**
* remove callback channels
* @param taskInstanceId taskInstanceId
*/
public void remove(int taskInstanceId){ public void remove(int taskInstanceId){
CALL_BACK_CHANNELS.remove(taskInstanceId); CALL_BACK_CHANNELS.remove(taskInstanceId);
} }
/**
* send ack
* @param taskInstanceId taskInstanceId
* @param ackCommand ackCommand
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque())); callbackChannel.getChannel().writeAndFlush(ackCommand.convert2Command(callbackChannel.getOpaque()));
} }
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param responseCommand responseCommand
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId); CallbackChannel callbackChannel = getCallbackChannel(taskInstanceId);
callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command( callbackChannel.getChannel().writeAndFlush(responseCommand.convert2Command(

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/WorkerRequestProcessor.java

@ -125,6 +125,12 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
processService, taskCallbackService)); processService, taskCallbackService));
} }
/**
* whehter tenant is null
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) { private boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if(tenant == null){ if(tenant == null){
logger.error("tenant not exists,process instance id : {},task instance id : {}", logger.error("tenant not exists,process instance id : {},task instance id : {}",
@ -135,6 +141,11 @@ public class WorkerRequestProcessor implements NettyRequestProcessor {
return false; return false;
} }
/**
* get execute local path
* @param taskInstance taskInstance
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){ private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(), taskInstance.getProcessDefine().getId(),

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -24,20 +24,36 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* worker registry
*/
public class WorkerRegistry { public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
/**
* zookeeper registry center
*/
private final ZookeeperRegistryCenter zookeeperRegistryCenter; private final ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* port
*/
private final int port; private final int port;
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port; this.port = port;
} }
/**
* registry
*/
public void registry() { public void registry() {
String address = Constants.LOCAL_ADDRESS; String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath(); String localNodePath = getWorkerPath();
@ -58,6 +74,9 @@ public class WorkerRegistry {
logger.info("scheduler node : {} registry to ZK successfully.", address); logger.info("scheduler node : {} registry to ZK successfully.", address);
} }
/**
* remove registry info
*/
public void unRegistry() { public void unRegistry() {
String address = getLocalAddress(); String address = getLocalAddress();
String localNodePath = getWorkerPath(); String localNodePath = getWorkerPath();
@ -65,12 +84,20 @@ public class WorkerRegistry {
logger.info("worker node : {} unRegistry to ZK.", address); logger.info("worker node : {} unRegistry to ZK.", address);
} }
/**
* get worker path
* @return
*/
private String getWorkerPath() { private String getWorkerPath() {
String address = getLocalAddress(); String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath; return localNodePath;
} }
/**
* get local address
* @return
*/
private String getLocalAddress(){ private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port; return Constants.LOCAL_ADDRESS + ":" + port;
} }

Loading…
Cancel
Save