Browse Source

[Fix][Worker] Fix worker cannot start and alertClientService NullPointerException (#4956)

* [Fix][Worker] Fix worker cannot start

* [Fix][Worker] Fix alertClientService NullPointerException

* [Fix][Worker] Improve ut coverage

* [Improvement][CI] Improve workflow ci
pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
8c68e2c60b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .github/workflows/ci_ut.yml
  2. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  3. 34
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  4. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  5. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
  6. 15
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java

2
.github/workflows/ci_ut.yml

@ -48,7 +48,7 @@ jobs:
- name: Bootstrap database - name: Bootstrap database
run: | run: |
sed -i "s/: root/: test/g" $(pwd)/docker/docker-swarm/docker-compose.yml sed -i "s/: root/: test/g" $(pwd)/docker/docker-swarm/docker-compose.yml
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml create --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up --no-start --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql
sudo cp $(pwd)/sql/dolphinscheduler_postgre.sql $(docker volume inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" | awk -F "\"" '{print $4}') sudo cp $(pwd)/sql/dolphinscheduler_postgre.sql $(docker volume inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" | awk -F "\"" '{print $4}')
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d dolphinscheduler-zookeeper dolphinscheduler-postgresql docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d dolphinscheduler-zookeeper dolphinscheduler-postgresql
- name: Set up JDK 1.8 - name: Set up JDK 1.8

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@ -73,12 +72,6 @@ public class MasterServer implements IStoppable {
*/ */
private NettyRemotingServer nettyRemotingServer; private NettyRemotingServer nettyRemotingServer;
/**
* master registry
*/
@Autowired
private MasterRegistry masterRegistry;
/** /**
* zk master client * zk master client
*/ */
@ -108,25 +101,17 @@ public class MasterServer implements IStoppable {
*/ */
@PostConstruct @PostConstruct
public void run() { public void run() {
try { // init remoting server
//init remoting server NettyServerConfig serverConfig = new NettyServerConfig();
NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort());
serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.start();
this.nettyRemotingServer.start();
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// self tolerant // self tolerant
this.zkMasterClient.start(); this.zkMasterClient.start(this);
// scheduler start // scheduler start
this.masterSchedulerService.start(); this.masterSchedulerService.start();
@ -183,10 +168,9 @@ public class MasterServer implements IStoppable {
} catch (Exception e) { } catch (Exception e) {
logger.warn("thread sleep exception ", e); logger.warn("thread sleep exception ", e);
} }
// //close
this.masterSchedulerService.close(); this.masterSchedulerService.close();
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
this.zkMasterClient.close(); this.zkMasterClient.close();
//close quartz //close quartz
try { try {

34
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -105,37 +105,35 @@ public class WorkerServer implements IStoppable {
new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args);
} }
/** /**
* worker server run * worker server run
*/ */
@PostConstruct @PostConstruct
public void run() { public void run() {
// alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT);
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start();
// worker registry
try { try {
logger.info("start worker server..."); this.workerRegistry.registry();
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start();
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this); this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths(); Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP); this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
// worker registry
this.workerRegistry.registry();
// retry report task status
this.retryReportTaskStatusThread.start();
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// task execute manager // task execute manager
this.workerManagerThread.start(); this.workerManagerThread.start();

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -73,8 +74,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired @Autowired
private MasterRegistry masterRegistry; private MasterRegistry masterRegistry;
public void start() { public void start(MasterServer masterServer) {
InterProcessMutex mutex = null; InterProcessMutex mutex = null;
try { try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
@ -82,9 +82,9 @@ public class ZKMasterClient extends AbstractZKClient {
mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex = new InterProcessMutex(getZkClient(), znodeLock);
mutex.acquire(); mutex.acquire();
// Master registry // master registry
masterRegistry.registry(); masterRegistry.registry();
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
String registPath = this.masterRegistry.getMasterPath(); String registPath = this.masterRegistry.getMasterPath();
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -29,12 +29,13 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils; import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -136,10 +137,10 @@ public class ZookeeperOperator implements InitializingBean {
} }
public List<String> getChildrenKeys(final String key) { public List<String> getChildrenKeys(final String key) {
List<String> values;
try { try {
values = zkClient.getChildren().forPath(key); return zkClient.getChildren().forPath(key);
return values; } catch (NoNodeException ex) {
return new ArrayList<>();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
logger.error("getChildrenKeys key : {} InterruptedException", key); logger.error("getChildrenKeys key : {} InterruptedException", key);
throw new IllegalStateException(ex); throw new IllegalStateException(ex);
@ -193,7 +194,7 @@ public class ZookeeperOperator implements InitializingBean {
if (isExisted(key)) { if (isExisted(key)) {
try { try {
zkClient.delete().deletingChildrenIfNeeded().forPath(key); zkClient.delete().deletingChildrenIfNeeded().forPath(key);
} catch (KeeperException.NoNodeException ignore) { } catch (NoNodeException ignore) {
//NOP //NOP
} }
} }
@ -230,7 +231,7 @@ public class ZookeeperOperator implements InitializingBean {
if (isExisted(key)) { if (isExisted(key)) {
zkClient.delete().deletingChildrenIfNeeded().forPath(key); zkClient.delete().deletingChildrenIfNeeded().forPath(key);
} }
} catch (KeeperException.NoNodeException ignore) { } catch (NoNodeException ignore) {
//NOP //NOP
} catch (final Exception ex) { } catch (final Exception ex) {
logger.error("remove key : {}", key, ex); logger.error("remove key : {}", key, ex);

15
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java

@ -113,4 +113,19 @@ public class RegisterOperatorTest {
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
} }
@Test
public void testGetChildrenKeysWithNoNodeException() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
Assert.assertEquals(0, registerOperator.getChildrenKeys(path).size());
}
@Test
public void testNoNodeException() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
registerOperator.persistEphemeral(path, "test");
registerOperator.remove(path);
}
} }
Loading…
Cancel
Save