Browse Source

fix master server shutdown error (#1177)

* move updateTaskState into try/catch block in case of exception

* fix NPE

* using conf.getInt instead of getString

* for AbstractZKClient, remove the log, for it will print the same log message in createZNodePath.
for AlertDao, correct the spelling.

* duplicate

* refactor getTaskWorkerGroupId

* add friendly log

* update hearbeat thread num = 1

* fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread

* 1. move verifyTaskInstanceIsNull after taskInstance
2. keep verifyTenantIsNull/verifyTaskInstanceIsNull clean and readable

* fix the message

* delete before check to avoid KeeperException$NoNodeException

* fix the message

* check processInstance state before delete tenant

* check processInstance state before delete worker group

* refactor

* merge api constants into common constatns

* update the resource perm

* update the dataSource perm

* fix CheckUtils.checkUserParams method

* update AlertGroupService, extends from BaseService, remove duplicate methods

* refactor

* modify method name

* add hasProjectAndPerm method

* using checkProject instead of getResultStatus

* delete checkAuth method, using hasProjectAndPerm instead.

* correct spelling

* add transactional for deleteWorkerGroupById

* add Transactional for deleteProcessInstanceById method

* change sqlSessionTemplate singleton

* change sqlSessionTemplate singleton and reformat code

* fix unsuitable error message

* update shutdownhook methods

* fix worker log bug

* fix api server debug mode bug

* upgrade zk version

* delete this line ,for zkClient.close() will do the whole thing

* fix master server shutdown error
pull/2/head
Tboy 5 years ago committed by qiaozhanwei
parent
commit
a92554fdb2
  1. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -73,6 +73,11 @@ public class ZKMasterClient extends AbstractZKClient {
private static ZKMasterClient zkMasterClient = null; private static ZKMasterClient zkMasterClient = null;
private PathChildrenCache masterPathChildrenCache;
private PathChildrenCache workerPathChildrenCache;
private ZKMasterClient(ProcessDao processDao){ private ZKMasterClient(ProcessDao processDao){
this.processDao = processDao; this.processDao = processDao;
init(); init();
@ -133,6 +138,19 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
public void close(){
try {
if(masterPathChildrenCache != null){
masterPathChildrenCache.close();
}
if(workerPathChildrenCache != null){
workerPathChildrenCache.close();
}
super.close();
} catch (Exception ignore) {
}
}
@ -175,12 +193,12 @@ public class ZKMasterClient extends AbstractZKClient {
* monitor master * monitor master
*/ */
public void listenerMaster(){ public void listenerMaster(){
PathChildrenCache masterPc = new PathChildrenCache(zkClient, masterPathChildrenCache = new PathChildrenCache(zkClient,
getZNodeParentPath(ZKNodeType.MASTER), true ,defaultThreadFactory); getZNodeParentPath(ZKNodeType.MASTER), true ,defaultThreadFactory);
try { try {
masterPc.start(); masterPathChildrenCache.start();
masterPc.getListenable().addListener(new PathChildrenCacheListener() { masterPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) { switch (event.getType()) {
@ -273,12 +291,11 @@ public class ZKMasterClient extends AbstractZKClient {
* monitor worker * monitor worker
*/ */
public void listenerWorker(){ public void listenerWorker(){
workerPathChildrenCache = new PathChildrenCache(zkClient,
PathChildrenCache workerPc = new PathChildrenCache(zkClient,
getZNodeParentPath(ZKNodeType.WORKER),true ,defaultThreadFactory); getZNodeParentPath(ZKNodeType.WORKER),true ,defaultThreadFactory);
try { try {
workerPc.start(); workerPathChildrenCache.start();
workerPc.getListenable().addListener(new PathChildrenCacheListener() { workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
switch (event.getType()) { switch (event.getType()) {

Loading…
Cancel
Save