Browse Source

Fix when the master or worker down, the alert cann't insert to Db, due to the datasource close (#5919)

Wenjun Ruan 3 years ago committed by GitHub
parent
commit
7d500c45d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  2. 10
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java
  3. 16
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java
  4. 53
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
  5. 36
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
  6. 48
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java
  7. 1
      pom.xml

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -139,11 +139,11 @@ public class ServerNodeManager implements InitializingBean {
/**
* init MasterNodeListener listener
*/
registryCenter.getRegisterOperator().addListener(new MasterNodeListener());
registryCenter.getRegisterOperator().registerListener(new MasterNodeListener(Integer.MAX_VALUE));
/**
* init WorkerNodeListener listener
*/
registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener());
registryCenter.getRegisterOperator().registerListener(new WorkerGroupNodeListener(Integer.MAX_VALUE));
}
/**
@ -207,6 +207,10 @@ public class ServerNodeManager implements InitializingBean {
*/
class WorkerGroupNodeListener extends AbstractListener {
public WorkerGroupNodeListener(int order) {
super(order);
}
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if (registryCenter.isWorkerPath(path)) {
@ -246,6 +250,10 @@ public class ServerNodeManager implements InitializingBean {
*/
class MasterNodeListener extends AbstractListener {
public MasterNodeListener(int order) {
super(order);
}
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if (registryCenter.isMasterPath(path)) {

10
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/zk/ZKMasterClient.java

@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractListener;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.apache.commons.lang.StringUtils;
@ -97,7 +98,7 @@ public class ZKMasterClient extends AbstractZKClient {
removeZKNodePath(null, ZKNodeType.MASTER, true);
removeZKNodePath(null, ZKNodeType.WORKER, true);
}
registerListener();
registerListener(new NodeChangeListener(Integer.MIN_VALUE));
} catch (Exception e) {
logger.error("master start up exception", e);
} finally {
@ -115,6 +116,12 @@ public class ZKMasterClient extends AbstractZKClient {
super.close();
}
class NodeChangeListener extends AbstractListener {
public NodeChangeListener(int order) {
super(order);
}
/**
* handle path events that this class cares about
*
@ -132,6 +139,7 @@ public class ZKMasterClient extends AbstractZKClient {
handleWorkerEvent(event, path);
}
}
}
/**
* remove zookeeper node path

16
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractListener.java

@ -21,7 +21,16 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public abstract class AbstractListener implements TreeCacheListener {
public abstract class AbstractListener implements TreeCacheListener, Comparable<AbstractListener> {
/**
* The order is represent as prioritization, the high order will be executed first
*/
private final int order;
public AbstractListener(int order) {
this.order = order;
}
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
@ -33,4 +42,9 @@ public abstract class AbstractListener implements TreeCacheListener {
}
protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path);
@Override
public int compareTo(AbstractListener o) {
return order - o.order;
}
}

53
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -16,37 +16,35 @@
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);
private TreeCache treeCache;
/**
* The main point to define a listener list here is to execute the listener at customize order.
*/
private List<AbstractListener> listenerList = new CopyOnWriteArrayList<>();
/**
* register a unified listener of /${dsRoot},
*/
@Override
protected void registerListener() {
treeCache.getListenable().addListener((client, event) -> {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
});
public void registerListener(AbstractListener abstractListener) {
logger.info("register zookeeper listener: {}", abstractListener.getClass().getName());
listenerList.add(abstractListener);
listenerList.sort(AbstractListener::compareTo);
}
@Override
@ -59,25 +57,12 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
treeCache.getListenable().addListener(((client, event) -> {
for (AbstractListener abstractListener : listenerList) {
logger.debug("zookeeperListener:{} triggered", abstractListener.getClass().getName());
abstractListener.childEvent(client, event);
}
//for sub class
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}
public String getFromCache(final String cachePath, final String key) {
ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
}
return null;
}
public TreeCache getTreeCache(final String cachePath) {
return treeCache;
}
public void addListener(TreeCacheListener listener){
this.treeCache.getListenable().addListener(listener);
}));
}
@Override

36
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -30,7 +30,6 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -65,7 +64,8 @@ public class ZookeeperOperator implements InitializingBean {
/**
* this method is for sub class,
*/
protected void registerListener(){}
protected void registerListener(AbstractListener abstractListener) {
}
protected void treeCacheStart(){}
@ -143,16 +143,6 @@ public class ZookeeperOperator implements InitializingBean {
}
}
public boolean hasChildren(final String key) {
Stat stat;
try {
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;
} catch (Exception ex) {
throw new IllegalStateException(ex);
}
}
public boolean isExisted(final String key) {
try {
return zkClient.checkExists().forPath(key) != null;
@ -194,28 +184,6 @@ public class ZookeeperOperator implements InitializingBean {
}
}
public void persistEphemeral(String key, String value, boolean overwrite) {
try {
if (overwrite) {
persistEphemeral(key, value);
} else {
if (!isExisted(key)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
}
}
} catch (final Exception ex) {
logger.error("persistEphemeral key : {} , value : {}, overwrite : {}", key, value, overwrite, ex);
}
}
public void persistEphemeralSequential(final String key, String value) {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) {
logger.error("persistEphemeralSequential key : {}", key, ex);
}
}
public void remove(final String key) {
try {
if (isExisted(key)) {

48
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperatorTest.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.junit.Assert;
import org.junit.Test;
public class ZookeeperCachedOperatorTest {
private ZookeeperCachedOperator zookeeperCachedOperator = new ZookeeperCachedOperator();
@Test
public void testRegisterListener() {
AbstractListener abstractListener1 = new AbstractListener(1) {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
// ignore
}
};
AbstractListener abstractListener2 = new AbstractListener(2) {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
// ignore
}
};
zookeeperCachedOperator.registerListener(abstractListener2);
zookeeperCachedOperator.registerListener(abstractListener1);
Assert.assertTrue(true);
}
}

1
pom.xml

@ -847,6 +847,7 @@
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/ZookeeperCachedOperatorTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>

Loading…
Cancel
Save