Browse Source

merge from dev 2021-03-03

pull/3/MERGE
lenboo 4 years ago
parent
commit
dcf97920aa
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  2. 26
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java
  3. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java
  5. 34
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java
  6. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java
  7. 37
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  8. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  9. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  10. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  11. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  12. 84
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  13. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  14. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  15. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  16. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  17. 441
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  18. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  19. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
  20. 61
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
  21. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  22. 44
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
  23. 81
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  24. 155
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
  25. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
  26. 116
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
  27. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue
  28. 34
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue
  29. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue
  30. 1
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue
  31. 2
      dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue
  32. 3
      pom.xml

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.utils.CheckUtils.checkDesc; import static org.apache.dolphinscheduler.api.utils.CheckUtils.checkDesc;
@ -105,7 +106,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
} }
if (projectMapper.insert(project) > 0) { if (projectMapper.insert(project) > 0) {
result.put(Constants.DATA_LIST, project); result.put(Constants.DATA_LIST, project.getId());
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.CREATE_PROJECT_ERROR); putMsg(result, Status.CREATE_PROJECT_ERROR);

26
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java

@ -17,12 +17,16 @@
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.Constants.USER;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.util.Properties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -157,6 +161,11 @@ public abstract class BaseDataSource {
separator = ":"; separator = ":";
break; break;
case HIVE: case HIVE:
if ("?".equals(otherParams.substring(0, 1))) {
break;
}
separator = ";";
break;
case SPARK: case SPARK:
case SQLSERVER: case SQLSERVER:
separator = ";"; separator = ";";
@ -178,6 +187,19 @@ public abstract class BaseDataSource {
return DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); return DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} }
/**
* the data source test connection
* @param info Properties
* @return Connection Connection
* @throws Exception Exception
*/
public Connection getConnection(Properties info) throws Exception {
Class.forName(driverClassSelector());
info.setProperty(USER, getUser());
info.setProperty(PASSWORD, getPassword());
return DriverManager.getConnection(getJdbcUrl(), info);
}
protected String filterOther(String otherParams) { protected String filterOther(String otherParams) {
return otherParams; return otherParams;
} }
@ -226,6 +248,10 @@ public abstract class BaseDataSource {
this.other = other; this.other = other;
} }
public void setConnParams(String connParams) {
}
public String getJavaSecurityKrb5Conf() { public String getJavaSecurityKrb5Conf() {
return javaSecurityKrb5Conf; return javaSecurityKrb5Conf;
} }

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java

@ -17,13 +17,17 @@
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import static org.apache.dolphinscheduler.common.Constants.SEMICOLON;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HiveConfUtils; import org.apache.dolphinscheduler.common.utils.HiveConfUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.util.Map;
/** /**
* data source of hive * data source of hive
@ -100,4 +104,17 @@ public class HiveDataSource extends BaseDataSource {
return super.getConnection(); return super.getConnection();
} }
@Override
public void setConnParams(String connParams) {
// Verification parameters
Map<String, String> connParamMap = CollectionUtils.stringToMap(connParams, SEMICOLON);
if (connParamMap.isEmpty()) {
return;
}
StringBuilder otherSb = new StringBuilder();
connParamMap.forEach((k, v) -> otherSb.append(String.format("%s=%s%s", k, v, SEMICOLON)));
StringBuilder otherAppend = StringUtils.isNotBlank(getOther()) ? otherSb.append(getOther()) : otherSb.deleteCharAt(otherSb.length() - 1);
super.setOther(otherAppend.toString());
}
} }

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java

@ -230,7 +230,7 @@ public class UdfFunc {
if (StringUtils.isBlank(key)) { if (StringUtils.isBlank(key)) {
return null; return null;
} }
return JSONUtils.parseObject(key); return JSONUtils.parseObject(key, UdfFunc.class);
} }
} }
} }

34
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java

@ -158,4 +158,38 @@ public class BaseDataSourceTest {
} }
@Test
public void testSetConnParams() {
BaseDataSource hiveDataSource = new HiveDataSource();
hiveDataSource.setAddress("jdbc:hive2://127.0.0.1:10000");
hiveDataSource.setDatabase("test");
hiveDataSource.setPassword("123456");
hiveDataSource.setUser("test");
hiveDataSource.setConnParams("");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test", hiveDataSource.getJdbcUrl());
//set fake other
hiveDataSource.setConnParams("hive.tez.container.size=20000;");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl());
hiveDataSource.setOther(null);
hiveDataSource.setConnParams("hive.tez.container.size=20000");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl());
hiveDataSource.setOther(null);
hiveDataSource.setConnParams("hive.tez.container.size=20000;hive.zzz=100");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;hive.zzz=100?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl());
hiveDataSource.setOther("charset=UTF-8");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;charset=UTF-8", hiveDataSource.getJdbcUrl());
hiveDataSource.setConnParams("hive.tez.container.size=20000;hive.zzz=100");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;hive.zzz=100;charset=UTF-8?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl());
hiveDataSource.setOther("charset=UTF-8;hive.exec.stagingdir=/tmp");
hiveDataSource.setConnParams("hive.tez.container.size=20000;hive.zzz=100");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;hive.zzz=100;charset=UTF-8?hive.tez.container.size=20000;hive.exec.stagingdir=/tmp", hiveDataSource.getJdbcUrl());
}
} }

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java

@ -81,7 +81,7 @@ public class HiveDataSourceTest {
hiveDataSource.setOther("hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2"); hiveDataSource.setOther("hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2");
Assert.assertEquals( Assert.assertEquals(
"jdbc:hive2://127.0.0.1:10000/test;?hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2", "jdbc:hive2://127.0.0.1:10000/test?hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2",
hiveDataSource.getJdbcUrl()); hiveDataSource.getJdbcUrl());
} }

37
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@ -42,13 +45,10 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType; import org.springframework.context.annotation.FilterType;
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class}) @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
}) })
public class MasterServer { public class MasterServer implements IStoppable {
/** /**
* logger of MasterServer * logger of MasterServer
@ -73,6 +73,12 @@ public class MasterServer {
*/ */
private NettyRemotingServer nettyRemotingServer; private NettyRemotingServer nettyRemotingServer;
/**
* master registry
*/
@Autowired
private MasterRegistry masterRegistry;
/** /**
* zk master client * zk master client
*/ */
@ -87,8 +93,9 @@ public class MasterServer {
/** /**
* master server startup * master server startup
* * <p>
* master server not use web service * master server not use web service
*
* @param args arguments * @param args arguments
*/ */
public static void main(String[] args) { public static void main(String[] args) {
@ -101,7 +108,7 @@ public class MasterServer {
*/ */
@PostConstruct @PostConstruct
public void run() { public void run() {
try {
//init remoting server //init remoting server
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort()); serverConfig.setListenPort(masterConfig.getListenPort());
@ -111,6 +118,13 @@ public class MasterServer {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// self tolerant // self tolerant
this.zkMasterClient.start(); this.zkMasterClient.start();
@ -137,14 +151,17 @@ public class MasterServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
if (Stopper.isRunning()) {
close("shutdownHook"); close("shutdownHook");
} }
}
})); }));
} }
/** /**
* gracefully close * gracefully close
*
* @param cause close cause * @param cause close cause
*/ */
public void close(String cause) { public void close(String cause) {
@ -169,6 +186,7 @@ public class MasterServer {
// //
this.masterSchedulerService.close(); this.masterSchedulerService.close();
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
this.zkMasterClient.close(); this.zkMasterClient.close();
//close quartz //close quartz
try { try {
@ -177,10 +195,17 @@ public class MasterServer {
} catch (Exception e) { } catch (Exception e) {
logger.warn("Quartz service stopped exception:{}", e.getMessage()); logger.warn("Quartz service stopped exception:{}", e.getMessage());
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("master server stop exception ", e); logger.error("master server stop exception ", e);
} finally {
System.exit(-1); System.exit(-1);
} }
} }
@Override
public void stop(String cause) {
close(cause);
}
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -149,7 +149,7 @@ public class LowerWeightHostManager extends CommonHostManager {
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size()); Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){ for(String node : nodes){
String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node); String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat) if(StringUtils.isNotEmpty(heartbeat)
&& heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA); String[] parts = heartbeat.split(COMMA);

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
@ -24,9 +25,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.Date; import java.util.Date;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -84,18 +83,17 @@ public class MasterRegistry {
public void registry() { public void registry() {
String address = NetUtils.getHost(); String address = NetUtils.getHost();
String localNodePath = getMasterPath(); String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
@Override (client, newState) -> {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) { if (newState == ConnectionState.LOST) {
logger.error("master : {} connection lost from zookeeper", address); logger.error("master : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) { } else if (newState == ConnectionState.RECONNECTED) {
logger.info("master : {} reconnected to zookeeper", address); logger.info("master : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
} else if (newState == ConnectionState.SUSPENDED) { } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("master : {} connection SUSPENDED ", address); logger.warn("master : {} connection SUSPENDED ", address);
} zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
} }
}); });
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
@ -103,11 +101,11 @@ public class MasterRegistry {
masterConfig.getMasterReservedMemory(), masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterMaxCpuloadAvg(),
Sets.newHashSet(getMasterPath()), Sets.newHashSet(getMasterPath()),
Constants.MASTER_PREFIX,
zookeeperRegistryCenter); zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, masterHeartbeatInterval, TimeUnit.SECONDS); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK path {} successfully with heartBeatInterval : {}s" logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
, address, localNodePath, masterHeartbeatInterval);
} }
/** /**
@ -116,16 +114,14 @@ public class MasterRegistry {
public void unRegistry() { public void unRegistry() {
String address = getLocalAddress(); String address = getLocalAddress();
String localNodePath = getMasterPath(); String localNodePath = getMasterPath();
heartBeatExecutor.shutdownNow(); zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); logger.info("master node : {} unRegistry to ZK.", address);
logger.info("master node : {} unRegistry from ZK path {}."
, address, localNodePath);
} }
/** /**
* get master path * get master path
*/ */
private String getMasterPath() { public String getMasterPath() {
String address = getLocalAddress(); String address = getLocalAddress();
return this.zookeeperRegistryCenter.getMasterPath() + "/" + address; return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
} }
@ -139,4 +135,12 @@ public class MasterRegistry {
} }
/**
* get zookeeper registry center
* @return ZookeeperRegistryCenter
*/
public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
return zookeeperRegistryCenter;
}
} }

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -29,7 +30,10 @@ import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class HeartBeatTask extends Thread { /**
* Heart beat task
*/
public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
@ -37,23 +41,39 @@ public class HeartBeatTask extends Thread {
private double reservedMemory; private double reservedMemory;
private double maxCpuloadAvg; private double maxCpuloadAvg;
private Set<String> heartBeatPaths; private Set<String> heartBeatPaths;
private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter; private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* server stop or not
*/
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime, public HeartBeatTask(String startTime,
double reservedMemory, double reservedMemory,
double maxCpuloadAvg, double maxCpuloadAvg,
Set<String> heartBeatPaths, Set<String> heartBeatPaths,
String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) { ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime; this.startTime = startTime;
this.reservedMemory = reservedMemory; this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg; this.maxCpuloadAvg = maxCpuloadAvg;
this.heartBeatPaths = heartBeatPaths; this.heartBeatPaths = heartBeatPaths;
this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.serverType = serverType;
} }
@Override @Override
public void run() { public void run() {
try { try {
// check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) {
zookeeperRegistryCenter.getStoppable().stop("i was judged to death, release resources and stop myself");
return;
}
}
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage(); double loadAverage = OSUtils.loadAverage();
@ -79,10 +99,19 @@ public class HeartBeatTask extends Thread {
builder.append(OSUtils.getProcessID()); builder.append(OSUtils.getProcessID());
for (String heartBeatPath : heartBeatPaths) { for (String heartBeatPath : heartBeatPaths) {
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString());
} }
} catch (Throwable ex) { } catch (Throwable ex) {
logger.error("error write heartbeat info", ex); logger.error("error write heartbeat info", ex);
} }
} }
/**
* for stop server
*
* @param serverStoppable server stoppable interface
*/
public void setStoppable(IStoppable serverStoppable) {
this.stoppable = serverStoppable;
}
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements InitializingBean {
/** /**
* init MasterNodeListener listener * init MasterNodeListener listener
*/ */
registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); registryCenter.getRegisterOperator().addListener(new MasterNodeListener());
/** /**
* init WorkerNodeListener listener * init WorkerNodeListener listener
*/ */
registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener()); registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener());
} }
/** /**

84
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -17,17 +17,25 @@
package org.apache.dolphinscheduler.server.registry; package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/** /**
* zookeeper register center * zookeeper register center
*/ */
@ -38,8 +46,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
@Autowired @Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator; protected RegisterOperator registerOperator;
@Autowired @Autowired
private ZookeeperConfig zookeeperConfig; private ZookeeperConfig zookeeperConfig;
@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
public final String EMPTY = ""; public final String EMPTY = "";
private IStoppable stoppable;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
NODES = zookeeperConfig.getDsRoot() + "/nodes"; NODES = zookeeperConfig.getDsRoot() + "/nodes";
@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements InitializingBean {
* init nodes * init nodes
*/ */
private void initNodes() { private void initNodes() {
zookeeperCachedOperator.persist(MASTER_PATH, EMPTY); registerOperator.persist(MASTER_PATH, EMPTY);
zookeeperCachedOperator.persist(WORKER_PATH, EMPTY); registerOperator.persist(WORKER_PATH, EMPTY);
} }
/** /**
* close * close
*/ */
public void close() { public void close() {
if (isStarted.compareAndSet(true, false)) { if (isStarted.compareAndSet(true, false) && registerOperator != null) {
if (zookeeperCachedOperator != null) { registerOperator.close();
zookeeperCachedOperator.close();
}
} }
} }
/** /**
* get master path * get master path
*
* @return master path * @return master path
*/ */
public String getMasterPath() { public String getMasterPath() {
@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get worker path * get worker path
*
* @return worker path * @return worker path
*/ */
public String getWorkerPath() { public String getWorkerPath() {
@ -115,6 +124,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get master nodes directly * get master nodes directly
*
* @return master nodes * @return master nodes
*/ */
public Set<String> getMasterNodesDirectly() { public Set<String> getMasterNodesDirectly() {
@ -124,6 +134,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get worker nodes directly * get worker nodes directly
*
* @return master nodes * @return master nodes
*/ */
public Set<String> getWorkerNodesDirectly() { public Set<String> getWorkerNodesDirectly() {
@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get worker group directly * get worker group directly
*
* @return worker group nodes * @return worker group nodes
*/ */
public Set<String> getWorkerGroupDirectly() { public Set<String> getWorkerGroupDirectly() {
@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get worker group nodes * get worker group nodes
*
* @param workerGroup * @param workerGroup
* @return * @return
*/ */
@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* whether worker path * whether worker path
*
* @param path path * @param path path
* @return result * @return result
*/ */
@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* whether master path * whether master path
*
* @param path path * @param path path
* @return result * @return result
*/ */
@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get worker group path * get worker group path
*
* @param workerGroup workerGroup * @param workerGroup workerGroup
* @return worker group path * @return worker group path
*/ */
@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/** /**
* get children nodes * get children nodes
*
* @param key key * @param key key
* @return children nodes * @return children nodes
*/ */
public List<String> getChildrenKeys(final String key) { public List<String> getChildrenKeys(final String key) {
return zookeeperCachedOperator.getChildrenKeys(key); return registerOperator.getChildrenKeys(key);
} }
/** /**
* get zookeeperCachedOperator * @return get dead server node parent path
* @return zookeeperCachedOperator
*/ */
public ZookeeperCachedOperator getZookeeperCachedOperator() { public String getDeadZNodeParentPath() {
return zookeeperCachedOperator; return registerOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
public void setStoppable(IStoppable stoppable) {
this.stoppable = stoppable;
} }
public IStoppable getStoppable() {
return stoppable;
}
/**
* check dead server or not , if dead, stop self
*
* @param zNode node path
* @param serverType master or worker prefix
* @return true if not exists
* @throws Exception errors
*/
protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception {
//ip_sequenceno
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) {
return true;
}
return false;
}
public RegisterOperator getRegisterOperator() {
return registerOperator;
}
} }

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.worker; package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -33,11 +35,15 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Set;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.WebApplicationType; import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
@ -46,7 +52,7 @@ import org.springframework.context.annotation.ComponentScan;
* worker server * worker server
*/ */
@ComponentScan("org.apache.dolphinscheduler") @ComponentScan("org.apache.dolphinscheduler")
public class WorkerServer { public class WorkerServer implements IStoppable {
/** /**
* logger * logger
@ -105,24 +111,31 @@ public class WorkerServer {
*/ */
@PostConstruct @PostConstruct
public void run() { public void run() {
try {
logger.info("start worker server..."); logger.info("start worker server...");
//alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT);
//init remoting server //init remoting server
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(workerConfig.getListenPort()); serverConfig.setListenPort(workerConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService)); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
// worker registry // worker registry
this.workerRegistry.registry(); this.workerRegistry.registry();
// retry report task status
this.retryReportTaskStatusThread.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// task execute manager // task execute manager
this.workerManagerThread.start(); this.workerManagerThread.start();
@ -135,8 +148,10 @@ public class WorkerServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
if (Stopper.isRunning()) {
close("shutdownHook"); close("shutdownHook");
} }
}
})); }));
} }
@ -167,8 +182,13 @@ public class WorkerServer {
} catch (Exception e) { } catch (Exception e) {
logger.error("worker server stop exception ", e); logger.error("worker server stop exception ", e);
} finally {
System.exit(-1); System.exit(-1);
} }
} }
@Override
public void stop(String cause) {
close(cause);
}
} }

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH; import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -29,9 +30,7 @@ import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.Date; import java.util.Date;
import java.util.Set; import java.util.Set;
@ -89,6 +88,14 @@ public class WorkerRegistry {
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
} }
/**
* get zookeeper registry center
* @return ZookeeperRegistryCenter
*/
public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
return zookeeperRegistryCenter;
}
/** /**
* registry * registry
*/ */
@ -98,19 +105,17 @@ public class WorkerRegistry {
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
for (String workerZKPath : workerZkPaths) { for (String workerZKPath : workerZkPaths) {
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
@Override (client,newState) -> {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) { if (newState == ConnectionState.LOST) {
logger.error("worker : {} connection lost from zookeeper", address); logger.error("worker : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) { } else if (newState == ConnectionState.RECONNECTED) {
logger.info("worker : {} reconnected to zookeeper", address); logger.info("worker : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} else if (newState == ConnectionState.SUSPENDED) { } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ", address); logger.warn("worker : {} connection SUSPENDED ", address);
} }
}
}); });
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
} }
@ -119,6 +124,7 @@ public class WorkerRegistry {
this.workerConfig.getWorkerReservedMemory(), this.workerConfig.getWorkerReservedMemory(),
this.workerConfig.getWorkerMaxCpuloadAvg(), this.workerConfig.getWorkerMaxCpuloadAvg(),
workerZkPaths, workerZkPaths,
Constants.WORKER_PREFIX,
this.zookeeperRegistryCenter); this.zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
@ -132,7 +138,7 @@ public class WorkerRegistry {
String address = getLocalAddress(); String address = getLocalAddress();
Set<String> workerZkPaths = getWorkerZkPaths(); Set<String> workerZkPaths = getWorkerZkPaths();
for (String workerZkPath : workerZkPaths) { for (String workerZkPath : workerZkPaths) {
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath); zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath);
logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
} }
this.heartBeatExecutor.shutdownNow(); this.heartBeatExecutor.shutdownNow();
@ -141,7 +147,7 @@ public class WorkerRegistry {
/** /**
* get worker path * get worker path
*/ */
private Set<String> getWorkerZkPaths() { public Set<String> getWorkerZkPaths() {
Set<String> workerZkPaths = Sets.newHashSet(); Set<String> workerZkPaths = Sets.newHashSet();
String address = getLocalAddress(); String address = getLocalAddress();

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -244,6 +244,9 @@ public class SqlTask extends AbstractTask {
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet resultSet = null; ResultSet resultSet = null;
try { try {
baseDataSource.setConnParams(sqlParameters.getConnParams());
// create connection // create connection
connection = baseDataSource.getConnection(); connection = baseDataSource.getConnection();
// create temp function // create temp function

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -85,6 +85,9 @@ public class ZKMasterClient extends AbstractZKClient {
// Master registry // Master registry
masterRegistry.registry(); masterRegistry.registry();
String registPath = this.masterRegistry.getMasterPath();
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
@ -296,15 +299,6 @@ public class ZKMasterClient extends AbstractZKClient {
return false; return false;
} }
/**
* failover worker tasks
*
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
* @param workerHost worker host
*/
/** /**
* failover worker tasks * failover worker tasks
* <p> * <p>

441
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -14,14 +14,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master; package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.DependentRelation;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.DateInterval; import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -29,200 +35,381 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread; import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import java.util.ArrayList; /**
import java.util.Date; * DependentTaskTest
import java.util.List; */
@RunWith(MockitoJUnitRunner.Silent.class) @RunWith(MockitoJUnitRunner.Silent.class)
public class DependentTaskTest { public class DependentTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); /**
* TaskNode.runFlag : task can be run normally
*/
public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL";
private ProcessService processService; private ProcessService processService;
private ApplicationContext applicationContext;
/**
private MasterConfig config; * the dependent task to be tested
* ProcessDefinition id=1
* Task id=task-10, name=D
* ProcessInstance id=100
* TaskInstance id=1000
* notice: must be initialized by setupTaskInstance() on each test case
*/
private ProcessInstance processInstance;
private TaskInstance taskInstance;
@Before @Before
public void before() throws Exception{ public void before() {
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
config = new MasterConfig(); MasterConfig config = new MasterConfig();
config.setMasterTaskCommitRetryTimes(3); config.setMasterTaskCommitRetryTimes(3);
config.setMasterTaskCommitInterval(1000); config.setMasterTaskCommitInterval(1000);
processService = Mockito.mock(ProcessService.class); Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0);
Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(),
dateInterval.getEndTime()))
.thenReturn(findLastProcessInterval());
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
processInstance = getProcessInstance(100, 1);
// for MasterBaseTaskExecThread.call
// for DependentTaskExecThread.waitTaskQuit
Mockito.when(processService Mockito.when(processService
.getTaskNodeListByDefinitionId(4)) .findProcessInstanceById(100))
.thenReturn(getTaskNodes()); .thenAnswer(i -> processInstance);
Mockito.when(processService
.findValidTaskListByProcessId(11))
.thenReturn(getTaskInstances());
// for MasterBaseTaskExecThread.submit
Mockito.when(processService Mockito.when(processService
.findTaskInstanceById(252612)) .submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000)))
.thenReturn(getTaskInstance()); .thenAnswer(i -> taskInstance);
// for DependentTaskExecThread.initTaskParameters
Mockito.when(processService.findProcessInstanceById(10111)) Mockito.when(processService
.thenReturn(getProcessInstance()); .updateTaskInstance(Mockito.any()))
Mockito.when(processService.findProcessDefineById(0)) .thenReturn(true);
.thenReturn(getProcessDefinition()); // for DependentTaskExecThread.updateTaskState
Mockito.when(processService.saveTaskInstance(getTaskInstance())) Mockito.when(processService
.saveTaskInstance(Mockito.any()))
.thenReturn(true); .thenReturn(true);
applicationContext = Mockito.mock(ApplicationContext.class); // for DependentTaskExecThread.waitTaskQuit
SpringApplicationContext springApplicationContext = new SpringApplicationContext(); Mockito.when(processService
springApplicationContext.setApplicationContext(applicationContext); .findTaskInstanceById(1000))
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); .thenAnswer(i -> taskInstance);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
} }
@Test private void testBasicInit() {
public void testDependAll() throws Exception{ TaskNode taskNode = getDependantTaskNode();
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
).collect(Collectors.toList()));
TaskInstance taskInstance = getTaskInstance(); DependentParameters dependentParameters = new DependentParameters();
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; dependentParameters.setRelation(DependentRelation.AND);
taskInstance.setDependency(dependString); dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
Mockito.when(processService.submitTask(taskInstance)) // dependence: AND(AND(2-A-day-today))
.thenReturn(taskInstance); taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
DependentTaskExecThread dependentTask =
new DependentTaskExecThread(taskInstance);
dependentTask.call(); setupTaskInstance(taskNode);
}
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); @Test
public void testBasicSuccess() throws Exception {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); // for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "A", dependentProcessInstance),
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "B", dependentProcessInstance)
).collect(Collectors.toList()));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
@Test
public void testBasicFailure() throws Exception {
testBasicInit();
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(), .findValidTaskListByProcessId(200))
dateInterval.getEndTime())) .thenReturn(Stream.of(
.thenReturn(findLastStopProcessInterval()); getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", dependentProcessInstance),
DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "B", dependentProcessInstance)
dependentFailure.call(); ).collect(Collectors.toList()));
Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState());
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.FAILURE, taskExecThread.getTaskInstance().getState());
} }
@Test @Test
public void testDependTask() throws Exception{ public void testDependentRelation() throws Exception {
DependentTaskModel dependentTaskModel1 = new DependentTaskModel();
dependentTaskModel1.setRelation(DependentRelation.AND);
dependentTaskModel1.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "B", "today", "day")
).collect(Collectors.toList()));
DependentTaskModel dependentTaskModel2 = new DependentTaskModel();
dependentTaskModel2.setRelation(DependentRelation.OR);
dependentTaskModel2.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day"),
getDependentItemFromTaskNode(3, "C", "today", "day")
).collect(Collectors.toList()));
TaskInstance taskInstance = getTaskInstance(); /*
String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; * OR AND 2-A-day-today 3-B-day-today
taskInstance.setDependency(dependString); * OR 2-A-day-today 3-C-day-today
Mockito.when(processService.submitTask(taskInstance)) */
.thenReturn(taskInstance); DependentParameters dependentParameters = new DependentParameters();
DependentTaskExecThread dependentTask = dependentParameters.setRelation(DependentRelation.OR);
new DependentTaskExecThread(taskInstance); dependentParameters.setDependTaskList(Stream.of(
dependentTaskModel1,
dependentTaskModel2
).collect(Collectors.toList()));
TaskNode taskNode = getDependantTaskNode();
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
setupTaskInstance(taskNode);
ProcessInstance processInstance200 =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE);
ProcessInstance processInstance300 =
getProcessInstanceForFindLastRunningProcess(300, 3, ExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(processInstance200);
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any()))
.thenReturn(processInstance300);
dependentTask.call(); // for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", processInstance200)
).collect(Collectors.toList()));
Mockito.when(processService
.findValidTaskListByProcessId(300))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, "B", processInstance300),
getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, "C", processInstance300)
).collect(Collectors.toList()));
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
}
Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); /**
* test when dependent on ALL tasks in another process
*/
private void testDependentOnAllInit() {
TaskNode taskNode = getDependantTaskNode();
DependentTaskModel dependentTaskModel = new DependentTaskModel();
dependentTaskModel.setRelation(DependentRelation.AND);
dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setRelation(DependentRelation.AND);
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
// dependence: AND(AND(2:ALL today day))
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
setupTaskInstance(taskNode);
}
DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); @Test
public void testDependentOnAllSuccess() throws Exception {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService Mockito.when(processService
.findLastRunningProcess(4, dateInterval.getStartTime(), .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
dateInterval.getEndTime())) .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS));
.thenReturn(findLastStopProcessInterval());
Mockito.when(processService DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
.findValidTaskListByProcessId(11)) taskExecThread.call();
.thenReturn(getErrorTaskInstances()); Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState());
DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance);
dependentFailure.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState());
} }
private ProcessInstance findLastStopProcessInterval(){ @Test
ProcessInstance processInstance = new ProcessInstance(); public void testDependentOnAllFailure() throws Exception {
processInstance.setId(11); testDependentOnAllInit();
processInstance.setProcessDefinitionId(4); // for DependentExecute.findLastProcessInterval
processInstance.setState(ExecutionStatus.STOP); Mockito.when(processService
return processInstance; .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE));
DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance);
dependentTask.call();
Assert.assertEquals(ExecutionStatus.FAILURE, dependentTask.getTaskInstance().getState());
} }
private ProcessInstance findLastProcessInterval(){ /**
ProcessInstance processInstance = new ProcessInstance(); * test whether waitTaskQuit has been well impl
processInstance.setId(11); */
processInstance.setProcessDefinitionId(4); @Test
processInstance.setState(ExecutionStatus.SUCCESS); public void testWaitAndCancel() throws Exception {
return processInstance; // for the poor independence of UT, error on other place may causes the condition happens
if (!Stopper.isRunning()) {
return;
} }
private ProcessDefinition getProcessDefinition(){ TaskNode taskNode = getDependantTaskNode();
ProcessDefinition processDefinition = new ProcessDefinition(); DependentTaskModel dependentTaskModel = new DependentTaskModel();
processDefinition.setId(0); dependentTaskModel.setRelation(DependentRelation.AND);
return processDefinition; dependentTaskModel.setDependItemList(Stream.of(
getDependentItemFromTaskNode(2, "A", "today", "day")
).collect(Collectors.toList()));
DependentParameters dependentParameters = new DependentParameters();
dependentParameters.setRelation(DependentRelation.AND);
dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList()));
// dependence: AND(AND(2:A today day))
taskNode.setDependence(JSONUtils.toJsonString(dependentParameters));
setupTaskInstance(taskNode);
ProcessInstance dependentProcessInstance =
getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any()))
.thenReturn(dependentProcessInstance);
DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.thenAnswer(i -> {
processInstance.setState(ExecutionStatus.READY_STOP);
return Stream.of(
getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, "A", dependentProcessInstance)
).collect(Collectors.toList());
})
.thenThrow(new IllegalStateException("have not been stopped as expected"));
taskExecThread.call();
Assert.assertEquals(ExecutionStatus.KILL, taskExecThread.getTaskInstance().getState());
} }
private ProcessInstance getProcessInstance(){ private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) {
ProcessInstance processInstance = new ProcessInstance(); ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(10111); processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(0); processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
return processInstance; return processInstance;
} }
/**
* task that dependent on others (and to be tested here)
* notice: should be filled with setDependence() and be passed to setupTaskInstance()
*/
private TaskNode getDependantTaskNode() {
TaskNode taskNode = new TaskNode();
taskNode.setId("tasks-10");
taskNode.setName("D");
taskNode.setType(TaskType.DEPENDENT.toString());
taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL);
return taskNode;
}
private List<TaskDefinition> getTaskNodes(){ private void setupTaskInstance(TaskNode taskNode) {
List<TaskDefinition> list = new ArrayList<>(); taskInstance = new TaskInstance();
TaskDefinition taskNode = new TaskDefinition(); taskInstance.setId(1000);
taskNode.setCode(1111L); taskInstance.setProcessInstanceId(processInstance.getId());
taskNode.setName("C"); taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskNode.setTaskType(TaskType.SQL); taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
list.add(taskNode); taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode));
return list; taskInstance.setTaskType(taskNode.getType());
taskInstance.setName(taskNode.getName());
} }
private List<TaskInstance> getErrorTaskInstances(){ /**
List<TaskInstance> list = new ArrayList<>(); * DependentItem defines the condition for the dependent
TaskInstance taskInstance = new TaskInstance(); */
taskInstance.setName("C"); private DependentItem getDependentItemFromTaskNode(
taskInstance.setState(ExecutionStatus.SUCCESS); int processDefinitionId, String taskName,
taskInstance.setDependency("1231"); String date, String cycle
list.add(taskInstance); ) {
return list; DependentItem dependentItem = new DependentItem();
dependentItem.setDefinitionId(processDefinitionId);
dependentItem.setDepTasks(taskName);
dependentItem.setDateValue(date);
dependentItem.setCycle(cycle);
// so far, the following fields have no effect
dependentItem.setDependResult(DependResult.SUCCESS);
dependentItem.setStatus(ExecutionStatus.SUCCESS);
return dependentItem;
} }
private List<TaskInstance> getTaskInstances(){ private ProcessInstance getProcessInstanceForFindLastRunningProcess(
List<TaskInstance> list = new ArrayList<>(); int processInstanceId, int processDefinitionId, ExecutionStatus state
TaskInstance taskInstance = new TaskInstance(); ) {
taskInstance.setName("D"); ProcessInstance processInstance = new ProcessInstance();
taskInstance.setState(ExecutionStatus.SUCCESS); processInstance.setId(processInstanceId);
taskInstance.setDependency("1231"); processInstance.setProcessDefinitionId(processDefinitionId);
list.add(taskInstance); processInstance.setState(state);
return list; return processInstance;
} }
private TaskInstance getTaskInstance(){ private TaskInstance getTaskInstanceForValidTaskList(
int taskInstanceId, ExecutionStatus state,
String taskName, ProcessInstance processInstance
) {
TaskInstance taskInstance = new TaskInstance(); TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("DEPENDENT"); taskInstance.setTaskType("DEPENDENT");
taskInstance.setId(252612); taskInstance.setId(taskInstanceId);
taskInstance.setName("C"); taskInstance.setName(taskName);
taskInstance.setProcessInstanceId(10111); taskInstance.setProcessInstanceId(processInstance.getId());
taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId());
taskInstance.setTaskJson("{}"); taskInstance.setTaskJson("{}");
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); taskInstance.setState(state);
return taskInstance; return taskInstance;
} }
} }

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -45,7 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.ArrayList; import java.util.ArrayList;
@ -67,7 +67,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, @ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class, ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class,
CuratorZookeeperClient.class}) CuratorZookeeperClient.class})
public class TaskPriorityQueueConsumerTest { public class TaskPriorityQueueConsumerTest {

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@ -60,8 +59,8 @@ public class MasterRegistryTest {
masterRegistry.registry(); masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath(); String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + "/" + (NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort())); String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath); String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
masterRegistry.unRegistry(); masterRegistry.unRegistry();
} }
@ -73,7 +72,7 @@ public class MasterRegistryTest {
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
masterRegistry.unRegistry(); masterRegistry.unRegistry();
String masterPath = zookeeperRegistryCenter.getMasterPath(); String masterPath = zookeeperRegistryCenter.getMasterPath();
List<String> childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath); List<String> childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath);
Assert.assertTrue(childrenKeys.isEmpty()); Assert.assertTrue(childrenKeys.isEmpty());
} }
} }

61
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* zookeeper registry center test
*/
@RunWith(MockitoJUnitRunner.class)
public class ZookeeperRegistryCenterTest {
@InjectMocks
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Mock
protected RegisterOperator registerOperator;
@Mock
private ZookeeperConfig zookeeperConfig;
private static final String DS_ROOT = "/dolphinscheduler";
@Test
public void testGetDeadZNodeParentPath() {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
zookeeperConfig.setDsRoot(DS_ROOT);
Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
String deadZNodeParentPath = zookeeperRegistryCenter.getDeadZNodeParentPath();
Assert.assertEquals(deadZNodeParentPath, DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS);
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -43,7 +43,7 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.Date; import java.util.Date;
@ -71,7 +71,7 @@ import io.netty.channel.Channel;
ZookeeperRegistryCenter.class, ZookeeperRegistryCenter.class,
MasterConfig.class, MasterConfig.class,
WorkerConfig.class, WorkerConfig.class,
ZookeeperCachedOperator.class, RegisterOperator.class,
ZookeeperConfig.class, ZookeeperConfig.class,
ZookeeperNodeManager.class, ZookeeperNodeManager.class,
TaskCallbackService.class, TaskCallbackService.class,

44
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -61,7 +63,7 @@ public class WorkerRegistryTest {
private ZookeeperRegistryCenter zookeeperRegistryCenter; private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Mock @Mock
private ZookeeperCachedOperator zookeeperCachedOperator; private RegisterOperator registerOperator;
@Mock @Mock
private CuratorFrameworkImpl zkClient; private CuratorFrameworkImpl zkClient;
@ -69,15 +71,21 @@ public class WorkerRegistryTest {
@Mock @Mock
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
private static final Set<String> workerGroups;
static {
workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
}
@Before @Before
public void before() { public void before() {
Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups); Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker"); Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator); Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator);
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient); Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient);
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn( Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn(
new Listenable<ConnectionStateListener>() { new Listenable<ConnectionStateListener>() {
@Override @Override
public void addListener(ConnectionStateListener connectionStateListener) { public void addListener(ConnectionStateListener connectionStateListener) {
@ -114,7 +122,7 @@ public class WorkerRegistryTest {
int i = 0; int i = 0;
for (String workerGroup : workerConfig.getWorkerGroups()) { for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getAddr(workerConfig.getListenPort())); String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getAddr(workerConfig.getListenPort()));
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath); String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
if (0 == i) { if (0 == i) {
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/")); Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
} else { } else {
@ -156,7 +164,7 @@ public class WorkerRegistryTest {
for (String workerGroup : workerConfig.getWorkerGroups()) { for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerGroupPath = workerPath + "/" + workerGroup.trim(); String workerGroupPath = workerPath + "/" + workerGroup.trim();
List<String> childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); List<String> childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath);
Assert.assertTrue(childrenKeys.isEmpty()); Assert.assertTrue(childrenKeys.isEmpty());
} }
@ -168,4 +176,10 @@ public class WorkerRegistryTest {
workerRegistry.unRegistry(); workerRegistry.unRegistry();
} }
@Test
public void testGetWorkerZkPaths() {
workerRegistry.init();
Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size());
}
} }

81
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -17,14 +17,8 @@
package org.apache.dolphinscheduler.service.zk; package org.apache.dolphinscheduler.service.zk;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON; import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@ -47,57 +41,10 @@ import org.springframework.stereotype.Component;
* abstract zookeeper client * abstract zookeeper client
*/ */
@Component @Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator { public abstract class AbstractZKClient extends RegisterOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* remove dead server by host
*
* @param host host
* @param serverType serverType
*/
public void removeDeadServerByHost(String host, String serverType) {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNode node path
* @param zkNodeType master or worker
* @param opType delete or add
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) {
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
}
/** /**
* get active master num * get active master num
* *
@ -247,12 +194,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
return path; return path;
} }
/**
* @return get dead server node parent path
*/
protected String getDeadZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/** /**
* @return get master start up lock path * @return get master start up lock path
@ -310,26 +251,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
} }
} }
/**
* get host ip, string format: masterParentPath/ip
*
* @param path path
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
if (StringUtils.isEmpty(path)) {
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if (pathArray.length < 1) {
logger.error("parse ip error: {}", path);
return "";
}
return pathArray[pathArray.length - 1];
}
@Override @Override
public String toString() { public String toString() {
return "AbstractZKClient{" return "AbstractZKClient{"

155
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* register operator
*/
@Component
public class RegisterOperator extends ZookeeperCachedOperator {
private final Logger logger = LoggerFactory.getLogger(RegisterOperator.class);
/**
* @return get dead server node parent path
*/
protected String getDeadZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/**
* remove dead server by host
*
* @param host host
* @param serverType serverType
* @throws Exception
*/
public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
/**
* get host ip, string format: masterParentPath/ip
*
* @param path path
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
if (StringUtils.isEmpty(path)) {
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if (pathArray.length < 1) {
logger.error("parse ip error: {}", path);
return "";
}
return pathArray[pathArray.length - 1];
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNode node path
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNodeSet node path set
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(Set<String> zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception {
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
for (String zNode : zNodeSet) {
String host = getHostByEventDataPath(zNode);
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
}
}
}

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -32,6 +32,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/**
* zookeeper cache operator
*/
@Component @Component
public class ZookeeperCachedOperator extends ZookeeperOperator { public class ZookeeperCachedOperator extends ZookeeperOperator {

116
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* register operator test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class RegisterOperatorTest {
private static ZKServer zkServer;
@InjectMocks
private RegisterOperator registerOperator;
@Mock
private ZookeeperConfig zookeeperConfig;
private static final String DS_ROOT = "/dolphinscheduler";
private static final String MASTER_NODE = "127.0.0.1:5678";
@Before
public void before() {
new Thread(() -> {
if (zkServer == null) {
zkServer = new ZKServer();
}
zkServer.startLocalZkServer(2185);
}).start();
}
@Test
public void testAfterPropertiesSet() throws Exception {
TimeUnit.SECONDS.sleep(10);
Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185");
Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100);
Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10);
Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000);
Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000);
Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000);
Mockito.when(zookeeperConfig.getDigest()).thenReturn("");
Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT);
Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000);
registerOperator.afterPropertiesSet();
Assert.assertNotNull(registerOperator.getZkClient());
}
@After
public void after() {
if (zkServer != null) {
zkServer.stop();
}
}
@Test
public void testGetDeadZNodeParentPath() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
Assert.assertEquals(DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path);
}
@Test
public void testHandleDeadServer() throws Exception {
testAfterPropertiesSet();
registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
String path = registerOperator.getDeadZNodeParentPath();
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
}
@Test
public void testRemoveDeadServerByHost() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
}
}

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue

@ -168,7 +168,7 @@
* Download log * Download log
*/ */
_downloadLog () { _downloadLog () {
downloadFile('/log/download-log', { downloadFile('log/download-log', {
taskInstanceId: this.stateId || this.logId taskInstanceId: this.stateId || this.logId
}) })
}, },

34
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue

@ -16,17 +16,17 @@
*/ */
<template> <template>
<div class="dep-list-model"> <div class="dep-list-model">
<div v-for="(el,$index) in dependItemList" :key='$index' @click="itemIndex = $index"> <div v-for="(el,$index) in dependItemList" :key='$index'>
<el-select filterable :disabled="isDetails" style="width: 450px" v-model="el.projectId" @change="_onChangeProjectId" size="small"> <el-select filterable :disabled="isDetails" style="width: 450px" v-model="el.projectId" @change="v => _onChangeProjectId(v, $index)" size="small">
<el-option v-for="item in projectList" :key="item.value" :value="item.value" :label="item.label"></el-option> <el-option v-for="item in projectList" :key="item.value" :value="item.value" :label="item.label"></el-option>
</el-select> </el-select>
<el-select filterable :disabled="isDetails" style="width: 450px" v-model="el.definitionId" @change="_onChangeDefinitionId" size="small"> <el-select filterable :disabled="isDetails" style="width: 450px" v-model="el.definitionId" @change="v => _onChangeDefinitionId(v, $index)" size="small">
<el-option v-for="item in el.definitionList" :key="item.value" :value="item.value" :label="item.label"></el-option> <el-option v-for="item in el.definitionList" :key="item.value" :value="item.value" :label="item.label"></el-option>
</el-select> </el-select>
<el-select filterable :disabled="isDetails" style="width: 450px" v-model="el.depTasks" size="small"> <el-select filterable :disabled="isDetails" style="width: 450px" v-model="el.depTasks" size="small">
<el-option v-for="item in el.depTasksList || []" :key="item" :value="item" :label="item"></el-option> <el-option v-for="item in el.depTasksList || []" :key="item" :value="item" :label="item"></el-option>
</el-select> </el-select>
<el-select v-model="el.cycle" :disabled="isDetails" @change="_onChangeCycle" size="small"> <el-select v-model="el.cycle" :disabled="isDetails" @change="v => _onChangeCycle(v, $index)" size="small">
<el-option v-for="item in cycleList" :key="item.value" :value="item.value" :label="item.label"></el-option> <el-option v-for="item in cycleList" :key="item.value" :value="item.value" :label="item.label"></el-option>
</el-select> </el-select>
<el-select v-model="el.dateValue" :disabled="isDetails" size="small"> <el-select v-model="el.dateValue" :disabled="isDetails" size="small">
@ -62,8 +62,7 @@
list: [], list: [],
projectList: [], projectList: [],
cycleList: cycleList, cycleList: cycleList,
isInstance: false, isInstance: false
itemIndex: null
} }
}, },
mixins: [disabledState], mixins: [disabledState],
@ -105,7 +104,8 @@
* remove task * remove task
*/ */
_remove (i) { _remove (i) {
// this.dependTaskList[this.index].dependItemList.splice(i, 1) // eslint-disable-next-line
this.dependTaskList[this.index].dependItemList.splice(i, 1)
this._removeTip() this._removeTip()
if (!this.dependItemList.length || this.dependItemList.length === 0) { if (!this.dependItemList.length || this.dependItemList.length === 0) {
this.$emit('on-delete-all', { this.$emit('on-delete-all', {
@ -170,33 +170,33 @@
/** /**
* change process get dependItemList * change process get dependItemList
*/ */
_onChangeProjectId (value) { _onChangeProjectId (value, itemIndex) {
this._getProcessByProjectId(value).then(definitionList => { this._getProcessByProjectId(value).then(definitionList => {
/* this.$set(this.dependItemList, this.itemIndex, this._dlOldParams(value, definitionList, item)) */ /* this.$set(this.dependItemList, itemIndex, this._dlOldParams(value, definitionList, item)) */
let definitionId = definitionList[0].value let definitionId = definitionList[0].value
this._getDependItemList(definitionId).then(depTasksList => { this._getDependItemList(definitionId).then(depTasksList => {
let item = this.dependItemList[this.itemIndex] let item = this.dependItemList[itemIndex]
// init set depTasks All // init set depTasks All
item.depTasks = 'ALL' item.depTasks = 'ALL'
// set dependItemList item data // set dependItemList item data
this.$set(this.dependItemList, this.itemIndex, this._cpOldParams(value, definitionId, definitionList, depTasksList, item)) this.$set(this.dependItemList, itemIndex, this._cpOldParams(value, definitionId, definitionList, depTasksList, item))
}) })
}) })
}, },
_onChangeDefinitionId (value) { _onChangeDefinitionId (value, itemIndex) {
// get depItem list data // get depItem list data
this._getDependItemList(value).then(depTasksList => { this._getDependItemList(value).then(depTasksList => {
let item = this.dependItemList[this.itemIndex] let item = this.dependItemList[itemIndex]
// init set depTasks All // init set depTasks All
item.depTasks = 'ALL' item.depTasks = 'ALL'
// set dependItemList item data // set dependItemList item data
this.$set(this.dependItemList, this.itemIndex, this._rtOldParams(value, item.definitionList, depTasksList, item)) this.$set(this.dependItemList, itemIndex, this._rtOldParams(value, item.definitionList, depTasksList, item))
}) })
}, },
_onChangeCycle (value) { _onChangeCycle (value, itemIndex) {
let list = _.cloneDeep(dateValueList[value]) let list = _.cloneDeep(dateValueList[value])
this.$set(this.dependItemList[this.itemIndex], 'dateValue', list[0].value) this.$set(this.dependItemList[itemIndex], 'dateValue', list[0].value)
this.$set(this.dependItemList[this.itemIndex], 'dateValueList', list) this.$set(this.dependItemList[itemIndex], 'dateValueList', list)
}, },
_rtNewParams (value, definitionList, depTasksList, projectId) { _rtNewParams (value, definitionList, depTasksList, projectId) {
return { return {

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/nodeStatus.vue

@ -87,6 +87,8 @@
* remove task * remove task
*/ */
_remove (i) { _remove (i) {
// eslint-disable-next-line
this.dependTaskList[this.index].dependItemList.splice(i, 1)
this._removeTip() this._removeTip()
if (!this.dependItemList.length || this.dependItemList.length === 0) { if (!this.dependItemList.length || this.dependItemList.length === 0) {
this.$emit('on-delete-all', { this.$emit('on-delete-all', {

1
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/conditions.vue

@ -99,7 +99,6 @@
$('body').find('.tooltip.fade.top.in').remove() $('body').find('.tooltip.fade.top.in').remove()
}, },
_onDeleteAll (i) { _onDeleteAll (i) {
this.dependTaskList[this.index].dependItemList.splice(i, 1)
this.dependTaskList.map((item, i) => { this.dependTaskList.map((item, i) => {
if (item.dependItemList.length === 0) { if (item.dependItemList.length === 0) {
this.dependTaskList.splice(i, 1) this.dependTaskList.splice(i, 1)

2
dolphinscheduler-ui/src/js/conf/home/pages/resource/pages/file/pages/details/index.vue

@ -87,7 +87,7 @@
this.$router.go(-1) this.$router.go(-1)
}, },
_downloadFile () { _downloadFile () {
downloadFile('/resources/download', { downloadFile('resources/download', {
id: this.$route.params.id id: this.$route.params.id
}) })
}, },

3
pom.xml

@ -930,6 +930,7 @@
<include>**/server/master/processor/TaskKillResponseProcessorTest.java</include> <include>**/server/master/processor/TaskKillResponseProcessorTest.java</include>
<include>**/server/master/processor/queue/TaskResponseServiceTest.java</include> <include>**/server/master/processor/queue/TaskResponseServiceTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include> <include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include> <include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/HostTest.java</include> <include>**/server/utils/HostTest.java</include>
@ -961,6 +962,7 @@
<include>**/service/zk/DefaultEnsembleProviderTest.java</include> <include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include> <include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include> <include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include> <include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include> <include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
@ -988,6 +990,7 @@
<include>**/dao/AlertDaoTest.java</include> <include>**/dao/AlertDaoTest.java</include>
<include>**/dao/datasource/OracleDataSourceTest.java</include> <include>**/dao/datasource/OracleDataSourceTest.java</include>
<include>**/dao/datasource/HiveDataSourceTest.java</include> <include>**/dao/datasource/HiveDataSourceTest.java</include>
<include>**/dao/datasource/BaseDataSourceTest.java</include>
<include>**/dao/upgrade/ProcessDefinitionDaoTest.java</include> <include>**/dao/upgrade/ProcessDefinitionDaoTest.java</include>
<include>**/dao/upgrade/WokrerGrouopDaoTest.java</include> <include>**/dao/upgrade/WokrerGrouopDaoTest.java</include>
<include>**/dao/upgrade/UpgradeDaoTest.java</include> <include>**/dao/upgrade/UpgradeDaoTest.java</include>

Loading…
Cancel
Save