diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java index fcff6024ec..09d0a362f3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.api.utils.CheckUtils.checkDesc; @@ -105,7 +106,7 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic } if (projectMapper.insert(project) > 0) { - result.put(Constants.DATA_LIST, project); + result.put(Constants.DATA_LIST, project.getId()); putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.CREATE_PROJECT_ERROR); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java index 9bbbf32a6f..e26a3751fb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java @@ -17,12 +17,16 @@ package org.apache.dolphinscheduler.dao.datasource; +import static org.apache.dolphinscheduler.common.Constants.PASSWORD; +import static org.apache.dolphinscheduler.common.Constants.USER; + import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import java.sql.Connection; import java.sql.DriverManager; +import java.util.Properties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -157,6 +161,11 @@ public abstract class BaseDataSource { separator = ":"; break; case HIVE: + if ("?".equals(otherParams.substring(0, 1))) { + break; + } + separator = ";"; + break; case SPARK: case SQLSERVER: separator = ";"; @@ -178,6 +187,19 @@ public abstract class BaseDataSource { return DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword()); } + /** + * the data source test connection + * @param info Properties + * @return Connection Connection + * @throws Exception Exception + */ + public Connection getConnection(Properties info) throws Exception { + Class.forName(driverClassSelector()); + info.setProperty(USER, getUser()); + info.setProperty(PASSWORD, getPassword()); + return DriverManager.getConnection(getJdbcUrl(), info); + } + protected String filterOther(String otherParams) { return otherParams; } @@ -226,6 +248,10 @@ public abstract class BaseDataSource { this.other = other; } + public void setConnParams(String connParams) { + + } + public String getJavaSecurityKrb5Conf() { return javaSecurityKrb5Conf; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java index bcf1cdf3d2..8fccb08bb7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java @@ -17,13 +17,17 @@ package org.apache.dolphinscheduler.dao.datasource; +import static org.apache.dolphinscheduler.common.Constants.SEMICOLON; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.HiveConfUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import java.sql.Connection; +import java.util.Map; /** * data source of hive @@ -100,4 +104,17 @@ public class HiveDataSource extends BaseDataSource { return super.getConnection(); } + @Override + public void setConnParams(String connParams) { + // Verification parameters + Map connParamMap = CollectionUtils.stringToMap(connParams, SEMICOLON); + if (connParamMap.isEmpty()) { + return; + } + + StringBuilder otherSb = new StringBuilder(); + connParamMap.forEach((k, v) -> otherSb.append(String.format("%s=%s%s", k, v, SEMICOLON))); + StringBuilder otherAppend = StringUtils.isNotBlank(getOther()) ? otherSb.append(getOther()) : otherSb.deleteCharAt(otherSb.length() - 1); + super.setOther(otherAppend.toString()); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java index 7932be5750..7a58e361a7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java @@ -230,7 +230,7 @@ public class UdfFunc { if (StringUtils.isBlank(key)) { return null; } - return JSONUtils.parseObject(key); + return JSONUtils.parseObject(key, UdfFunc.class); } } } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java index 1d02fd4fe3..31c963d5d7 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java @@ -158,4 +158,38 @@ public class BaseDataSourceTest { } + @Test + public void testSetConnParams() { + + BaseDataSource hiveDataSource = new HiveDataSource(); + hiveDataSource.setAddress("jdbc:hive2://127.0.0.1:10000"); + hiveDataSource.setDatabase("test"); + hiveDataSource.setPassword("123456"); + hiveDataSource.setUser("test"); + hiveDataSource.setConnParams(""); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test", hiveDataSource.getJdbcUrl()); + + //set fake other + hiveDataSource.setConnParams("hive.tez.container.size=20000;"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl()); + + hiveDataSource.setOther(null); + hiveDataSource.setConnParams("hive.tez.container.size=20000"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl()); + + hiveDataSource.setOther(null); + hiveDataSource.setConnParams("hive.tez.container.size=20000;hive.zzz=100"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;hive.zzz=100?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl()); + + hiveDataSource.setOther("charset=UTF-8"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;charset=UTF-8", hiveDataSource.getJdbcUrl()); + + hiveDataSource.setConnParams("hive.tez.container.size=20000;hive.zzz=100"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;hive.zzz=100;charset=UTF-8?hive.tez.container.size=20000", hiveDataSource.getJdbcUrl()); + + hiveDataSource.setOther("charset=UTF-8;hive.exec.stagingdir=/tmp"); + hiveDataSource.setConnParams("hive.tez.container.size=20000;hive.zzz=100"); + Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;hive.zzz=100;charset=UTF-8?hive.tez.container.size=20000;hive.exec.stagingdir=/tmp", hiveDataSource.getJdbcUrl()); + } + } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java index 3920fd39b2..145b7f38d2 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSourceTest.java @@ -81,7 +81,7 @@ public class HiveDataSourceTest { hiveDataSource.setOther("hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2"); Assert.assertEquals( - "jdbc:hive2://127.0.0.1:10000/test;?hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2", + "jdbc:hive2://127.0.0.1:10000/test?hive.mapred.mode=strict;hive.server2.thrift.http.path=hs2", hiveDataSource.getJdbcUrl()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 18882a2fb5..c2ea2c4073 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -14,9 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; @@ -42,13 +45,10 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; - - - @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class}) }) -public class MasterServer { +public class MasterServer implements IStoppable { /** * logger of MasterServer @@ -62,8 +62,8 @@ public class MasterServer { private MasterConfig masterConfig; /** - * spring application context - * only use it for initialization + * spring application context + * only use it for initialization */ @Autowired private SpringApplicationContext springApplicationContext; @@ -73,6 +73,12 @@ public class MasterServer { */ private NettyRemotingServer nettyRemotingServer; + /** + * master registry + */ + @Autowired + private MasterRegistry masterRegistry; + /** * zk master client */ @@ -87,8 +93,9 @@ public class MasterServer { /** * master server startup - * + *

* master server not use web service + * * @param args arguments */ public static void main(String[] args) { @@ -100,16 +107,23 @@ public class MasterServer { * run master server */ @PostConstruct - public void run(){ + public void run() { + try { + //init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(masterConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.start(); - //init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(masterConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); - this.nettyRemotingServer.start(); + this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this); + + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } // self tolerant this.zkMasterClient.start(); @@ -137,7 +151,9 @@ public class MasterServer { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - close("shutdownHook"); + if (Stopper.isRunning()) { + close("shutdownHook"); + } } })); @@ -145,13 +161,14 @@ public class MasterServer { /** * gracefully close + * * @param cause close cause */ public void close(String cause) { try { //execute only once - if(Stopper.isStopped()){ + if (Stopper.isStopped()) { return; } @@ -163,24 +180,32 @@ public class MasterServer { try { //thread sleep 3 seconds for thread quietly stop Thread.sleep(3000L); - }catch (Exception e){ + } catch (Exception e) { logger.warn("thread sleep exception ", e); } // this.masterSchedulerService.close(); this.nettyRemotingServer.close(); + this.masterRegistry.unRegistry(); this.zkMasterClient.close(); //close quartz - try{ + try { QuartzExecutors.getInstance().shutdown(); logger.info("Quartz service stopped"); - }catch (Exception e){ - logger.warn("Quartz service stopped exception:{}",e.getMessage()); + } catch (Exception e) { + logger.warn("Quartz service stopped exception:{}", e.getMessage()); } + } catch (Exception e) { logger.error("master server stop exception ", e); + } finally { System.exit(-1); } } + + @Override + public void stop(String cause) { + close(cause); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 1872ae0a6e..ac7d8b0ffc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -149,7 +149,7 @@ public class LowerWeightHostManager extends CommonHostManager { String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); Set hostWeights = new HashSet<>(nodes.size()); for(String node : nodes){ - String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node); + String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); if(StringUtils.isNotEmpty(heartbeat) && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ String[] parts = heartbeat.split(COMMA); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 37d6e72243..b492395a0c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.registry; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; @@ -24,9 +25,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import java.util.Date; import java.util.concurrent.Executors; @@ -84,30 +83,29 @@ public class MasterRegistry { public void registry() { String address = NetUtils.getHost(); String localNodePath = getMasterPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); + zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( + (client, newState) -> { if (newState == ConnectionState.LOST) { logger.error("master : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("master : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("master : {} connection SUSPENDED ", address); + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } - } - }); + }); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), Sets.newHashSet(getMasterPath()), + Constants.MASTER_PREFIX, zookeeperRegistryCenter); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, masterHeartbeatInterval, TimeUnit.SECONDS); - logger.info("master node : {} registry to ZK path {} successfully with heartBeatInterval : {}s" - , address, localNodePath, masterHeartbeatInterval); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); + logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); } /** @@ -116,16 +114,14 @@ public class MasterRegistry { public void unRegistry() { String address = getLocalAddress(); String localNodePath = getMasterPath(); - heartBeatExecutor.shutdownNow(); - zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); - logger.info("master node : {} unRegistry from ZK path {}." - , address, localNodePath); + zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath); + logger.info("master node : {} unRegistry to ZK.", address); } /** * get master path */ - private String getMasterPath() { + public String getMasterPath() { String address = getLocalAddress(); return this.zookeeperRegistryCenter.getMasterPath() + "/" + address; } @@ -139,4 +135,12 @@ public class MasterRegistry { } + /** + * get zookeeper registry center + * @return ZookeeperRegistryCenter + */ + public ZookeeperRegistryCenter getZookeeperRegistryCenter() { + return zookeeperRegistryCenter; + } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index b89d85126f..a12583b535 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.registry; import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; @@ -29,7 +30,10 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HeartBeatTask extends Thread { +/** + * Heart beat task + */ +public class HeartBeatTask implements Runnable { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); @@ -37,23 +41,39 @@ public class HeartBeatTask extends Thread { private double reservedMemory; private double maxCpuloadAvg; private Set heartBeatPaths; + private String serverType; private ZookeeperRegistryCenter zookeeperRegistryCenter; + /** + * server stop or not + */ + protected IStoppable stoppable = null; public HeartBeatTask(String startTime, double reservedMemory, double maxCpuloadAvg, Set heartBeatPaths, + String serverType, ZookeeperRegistryCenter zookeeperRegistryCenter) { this.startTime = startTime; this.reservedMemory = reservedMemory; this.maxCpuloadAvg = maxCpuloadAvg; this.heartBeatPaths = heartBeatPaths; this.zookeeperRegistryCenter = zookeeperRegistryCenter; + this.serverType = serverType; } @Override public void run() { try { + + // check dead or not in zookeeper + for (String heartBeatPath : heartBeatPaths) { + if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) { + zookeeperRegistryCenter.getStoppable().stop("i was judged to death, release resources and stop myself"); + return; + } + } + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double loadAverage = OSUtils.loadAverage(); @@ -79,10 +99,19 @@ public class HeartBeatTask extends Thread { builder.append(OSUtils.getProcessID()); for (String heartBeatPath : heartBeatPaths) { - zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); + zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString()); } } catch (Throwable ex) { logger.error("error write heartbeat info", ex); } } + + /** + * for stop server + * + * @param serverStoppable server stoppable interface + */ + public void setStoppable(IStoppable serverStoppable) { + this.stoppable = serverStoppable; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index b1a5edee38..4dfdb80e52 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements InitializingBean { /** * init MasterNodeListener listener */ - registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); + registryCenter.getRegisterOperator().addListener(new MasterNodeListener()); /** * init WorkerNodeListener listener */ - registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener()); + registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 3ca62bee6a..9017a13a65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -17,19 +17,27 @@ package org.apache.dolphinscheduler.server.registry; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + /** - * zookeeper register center + * zookeeper register center */ @Service public class ZookeeperRegistryCenter implements InitializingBean { @@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements InitializingBean { @Autowired - protected ZookeeperCachedOperator zookeeperCachedOperator; - + protected RegisterOperator registerOperator; @Autowired - private ZookeeperConfig zookeeperConfig; + private ZookeeperConfig zookeeperConfig; /** * nodes namespace @@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { public final String EMPTY = ""; + private IStoppable stoppable; + @Override public void afterPropertiesSet() throws Exception { NODES = zookeeperConfig.getDsRoot() + "/nodes"; @@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements InitializingBean { * init nodes */ private void initNodes() { - zookeeperCachedOperator.persist(MASTER_PATH, EMPTY); - zookeeperCachedOperator.persist(WORKER_PATH, EMPTY); + registerOperator.persist(MASTER_PATH, EMPTY); + registerOperator.persist(WORKER_PATH, EMPTY); } /** * close */ public void close() { - if (isStarted.compareAndSet(true, false)) { - if (zookeeperCachedOperator != null) { - zookeeperCachedOperator.close(); - } + if (isStarted.compareAndSet(true, false) && registerOperator != null) { + registerOperator.close(); } } /** * get master path + * * @return master path */ public String getMasterPath() { @@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker path + * * @return worker path */ public String getWorkerPath() { @@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { } /** - * get master nodes directly + * get master nodes directly + * * @return master nodes */ public Set getMasterNodesDirectly() { @@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { } /** - * get worker nodes directly + * get worker nodes directly + * * @return master nodes */ public Set getWorkerNodesDirectly() { @@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group directly + * * @return worker group nodes */ public Set getWorkerGroupDirectly() { @@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group nodes + * * @param workerGroup * @return */ @@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * whether worker path + * * @param path path * @return result */ @@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * whether master path + * * @param path path * @return result */ @@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group path + * * @param workerGroup workerGroup * @return worker group path */ @@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get children nodes + * * @param key key * @return children nodes */ public List getChildrenKeys(final String key) { - return zookeeperCachedOperator.getChildrenKeys(key); + return registerOperator.getChildrenKeys(key); + } + + /** + * @return get dead server node parent path + */ + public String getDeadZNodeParentPath() { + return registerOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; + } + + public void setStoppable(IStoppable stoppable) { + this.stoppable = stoppable; + } + + public IStoppable getStoppable() { + return stoppable; } /** - * get zookeeperCachedOperator - * @return zookeeperCachedOperator + * check dead server or not , if dead, stop self + * + * @param zNode node path + * @param serverType master or worker prefix + * @return true if not exists + * @throws Exception errors */ - public ZookeeperCachedOperator getZookeeperCachedOperator() { - return zookeeperCachedOperator; + protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception { + //ip_sequenceno + String[] zNodesPath = zNode.split("\\/"); + String ipSeqNo = zNodesPath[zNodesPath.length - 1]; + + String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; + + if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) { + return true; + } + + return false; } + public RegisterOperator getRegisterOperator() { + return registerOperator; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index a267b5bf32..10880bf94f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -33,20 +35,24 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import java.util.Set; + import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; /** - * worker server + * worker server */ @ComponentScan("org.apache.dolphinscheduler") -public class WorkerServer { +public class WorkerServer implements IStoppable { /** * logger @@ -54,31 +60,31 @@ public class WorkerServer { private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); /** - * netty remote server + * netty remote server */ private NettyRemotingServer nettyRemotingServer; /** - * worker registry + * worker registry */ @Autowired private WorkerRegistry workerRegistry; /** - * worker config + * worker config */ @Autowired private WorkerConfig workerConfig; /** - * spring application context - * only use it for initialization + * spring application context + * only use it for initialization */ @Autowired private SpringApplicationContext springApplicationContext; /** - * alert model netty remote server + * alert model netty remote server */ private AlertClientService alertClientService; @@ -105,24 +111,31 @@ public class WorkerServer { */ @PostConstruct public void run() { - logger.info("start worker server..."); - - //alert-server client registry - alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT); - - //init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(workerConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService)); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); - this.nettyRemotingServer.start(); - - // worker registry - this.workerRegistry.registry(); - + try { + logger.info("start worker server..."); + + //init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(workerConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); + this.nettyRemotingServer.start(); + + this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this); + Set workerZkPaths = this.workerRegistry.getWorkerZkPaths(); + this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP); + // worker registry + this.workerRegistry.registry(); + + // retry report task status + this.retryReportTaskStatusThread.start(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } // task execute manager this.workerManagerThread.start(); @@ -135,7 +148,9 @@ public class WorkerServer { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - close("shutdownHook"); + if (Stopper.isRunning()) { + close("shutdownHook"); + } } })); } @@ -167,8 +182,13 @@ public class WorkerServer { } catch (Exception e) { logger.error("worker server stop exception ", e); + } finally { System.exit(-1); } } + @Override + public void stop(String cause) { + close(cause); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 3d4d73f51a..b763497a04 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.SLASH; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -29,9 +30,7 @@ import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import java.util.Date; import java.util.Set; @@ -89,6 +88,14 @@ public class WorkerRegistry { this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } + /** + * get zookeeper registry center + * @return ZookeeperRegistryCenter + */ + public ZookeeperRegistryCenter getZookeeperRegistryCenter() { + return zookeeperRegistryCenter; + } + /** * registry */ @@ -98,28 +105,27 @@ public class WorkerRegistry { int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); for (String workerZKPath : workerZkPaths) { - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); - zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); + zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( + (client,newState) -> { if (newState == ConnectionState.LOST) { logger.error("worker : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("worker : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("worker : {} connection SUSPENDED ", address); } - } - }); + }); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, - this.workerConfig.getWorkerReservedMemory(), - this.workerConfig.getWorkerMaxCpuloadAvg(), - workerZkPaths, - this.zookeeperRegistryCenter); + this.workerConfig.getWorkerReservedMemory(), + this.workerConfig.getWorkerMaxCpuloadAvg(), + workerZkPaths, + Constants.WORKER_PREFIX, + this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); @@ -132,7 +138,7 @@ public class WorkerRegistry { String address = getLocalAddress(); Set workerZkPaths = getWorkerZkPaths(); for (String workerZkPath : workerZkPaths) { - zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath); + zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath); logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); } this.heartBeatExecutor.shutdownNow(); @@ -141,7 +147,7 @@ public class WorkerRegistry { /** * get worker path */ - private Set getWorkerZkPaths() { + public Set getWorkerZkPaths() { Set workerZkPaths = Sets.newHashSet(); String address = getLocalAddress(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index 5a16194b3b..8f21d5e95c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -244,6 +244,9 @@ public class SqlTask extends AbstractTask { PreparedStatement stmt = null; ResultSet resultSet = null; try { + + baseDataSource.setConnParams(sqlParameters.getConnParams()); + // create connection connection = baseDataSource.getConnection(); // create temp function diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 1f0926ba0c..665a62c26d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -85,6 +85,9 @@ public class ZKMasterClient extends AbstractZKClient { // Master registry masterRegistry.registry(); + String registPath = this.masterRegistry.getMasterPath(); + masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); + // init system znode this.initSystemZNode(); @@ -296,15 +299,6 @@ public class ZKMasterClient extends AbstractZKClient { return false; } - /** - * failover worker tasks - * - * 1. kill yarn job if there are yarn jobs in tasks. - * 2. change task state from running to need failover. - * 3. failover all tasks when workerHost is null - * @param workerHost worker host - */ - /** * failover worker tasks *

diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java index 52ed4a96c5..7609c4549c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java @@ -14,14 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.DependentRelation; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.model.DateInterval; +import org.apache.dolphinscheduler.common.model.DependentItem; +import org.apache.dolphinscheduler.common.model.DependentTaskModel; import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -29,200 +35,381 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.DependentTaskExecThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - +/** + * DependentTaskTest + */ @RunWith(MockitoJUnitRunner.Silent.class) public class DependentTaskTest { - private static final Logger logger = LoggerFactory.getLogger(DependentTaskTest.class); + /** + * TaskNode.runFlag : task can be run normally + */ + public static final String FLOWNODE_RUN_FLAG_NORMAL = "NORMAL"; private ProcessService processService; - private ApplicationContext applicationContext; - - private MasterConfig config; + /** + * the dependent task to be tested + * ProcessDefinition id=1 + * Task id=task-10, name=D + * ProcessInstance id=100 + * TaskInstance id=1000 + * notice: must be initialized by setupTaskInstance() on each test case + */ + private ProcessInstance processInstance; + private TaskInstance taskInstance; @Before - public void before() throws Exception{ + public void before() { + ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); - config = new MasterConfig(); + MasterConfig config = new MasterConfig(); config.setMasterTaskCommitRetryTimes(3); config.setMasterTaskCommitInterval(1000); - processService = Mockito.mock(ProcessService.class); - DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); - Mockito.when(processService - .findLastRunningProcess(4, dateInterval.getStartTime(), - dateInterval.getEndTime())) - .thenReturn(findLastProcessInterval()); + Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + processService = Mockito.mock(ProcessService.class); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + processInstance = getProcessInstance(100, 1); + // for MasterBaseTaskExecThread.call + // for DependentTaskExecThread.waitTaskQuit Mockito.when(processService - .getTaskNodeListByDefinitionId(4)) - .thenReturn(getTaskNodes()); - Mockito.when(processService - .findValidTaskListByProcessId(11)) - .thenReturn(getTaskInstances()); + .findProcessInstanceById(100)) + .thenAnswer(i -> processInstance); + // for MasterBaseTaskExecThread.submit Mockito.when(processService - .findTaskInstanceById(252612)) - .thenReturn(getTaskInstance()); - + .submitTask(Mockito.argThat(taskInstance -> taskInstance.getId() == 1000))) + .thenAnswer(i -> taskInstance); - Mockito.when(processService.findProcessInstanceById(10111)) - .thenReturn(getProcessInstance()); - Mockito.when(processService.findProcessDefineById(0)) - .thenReturn(getProcessDefinition()); - Mockito.when(processService.saveTaskInstance(getTaskInstance())) + // for DependentTaskExecThread.initTaskParameters + Mockito.when(processService + .updateTaskInstance(Mockito.any())) + .thenReturn(true); + // for DependentTaskExecThread.updateTaskState + Mockito.when(processService + .saveTaskInstance(Mockito.any())) .thenReturn(true); - applicationContext = Mockito.mock(ApplicationContext.class); - SpringApplicationContext springApplicationContext = new SpringApplicationContext(); - springApplicationContext.setApplicationContext(applicationContext); - Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config); + // for DependentTaskExecThread.waitTaskQuit + Mockito.when(processService + .findTaskInstanceById(1000)) + .thenAnswer(i -> taskInstance); } - @Test - public void testDependAll() throws Exception{ + private void testBasicInit() { + TaskNode taskNode = getDependantTaskNode(); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setRelation(DependentRelation.AND); + dependentTaskModel.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day") + ).collect(Collectors.toList())); - TaskInstance taskInstance = getTaskInstance(); - String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - taskInstance.setDependency(dependString); + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.AND); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); - Mockito.when(processService.submitTask(taskInstance)) - .thenReturn(taskInstance); - DependentTaskExecThread dependentTask = - new DependentTaskExecThread(taskInstance); + // dependence: AND(AND(2-A-day-today)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); - dependentTask.call(); + setupTaskInstance(taskNode); + } - Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); + @Test + public void testBasicSuccess() throws Exception { + testBasicInit(); + ProcessInstance dependentProcessInstance = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(dependentProcessInstance); - DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); + // for DependentExecute.getDependTaskResult + Mockito.when(processService + .findValidTaskListByProcessId(200)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "A", dependentProcessInstance), + getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "B", dependentProcessInstance) + ).collect(Collectors.toList())); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); + } + @Test + public void testBasicFailure() throws Exception { + testBasicInit(); + ProcessInstance dependentProcessInstance = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(dependentProcessInstance); + // for DependentExecute.getDependTaskResult Mockito.when(processService - .findLastRunningProcess(4, dateInterval.getStartTime(), - dateInterval.getEndTime())) - .thenReturn(findLastStopProcessInterval()); - DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); - dependentFailure.call(); - Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState()); + .findValidTaskListByProcessId(200)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", dependentProcessInstance), + getTaskInstanceForValidTaskList(2000, ExecutionStatus.SUCCESS, "B", dependentProcessInstance) + ).collect(Collectors.toList())); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, taskExecThread.getTaskInstance().getState()); } @Test - public void testDependTask() throws Exception{ - - TaskInstance taskInstance = getTaskInstance(); - String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"D\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - taskInstance.setDependency(dependString); - Mockito.when(processService.submitTask(taskInstance)) - .thenReturn(taskInstance); - DependentTaskExecThread dependentTask = - new DependentTaskExecThread(taskInstance); + public void testDependentRelation() throws Exception { + DependentTaskModel dependentTaskModel1 = new DependentTaskModel(); + dependentTaskModel1.setRelation(DependentRelation.AND); + dependentTaskModel1.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day"), + getDependentItemFromTaskNode(3, "B", "today", "day") + ).collect(Collectors.toList())); + + DependentTaskModel dependentTaskModel2 = new DependentTaskModel(); + dependentTaskModel2.setRelation(DependentRelation.OR); + dependentTaskModel2.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day"), + getDependentItemFromTaskNode(3, "C", "today", "day") + ).collect(Collectors.toList())); + + /* + * OR AND 2-A-day-today 3-B-day-today + * OR 2-A-day-today 3-C-day-today + */ + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.OR); + dependentParameters.setDependTaskList(Stream.of( + dependentTaskModel1, + dependentTaskModel2 + ).collect(Collectors.toList())); + + TaskNode taskNode = getDependantTaskNode(); + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + setupTaskInstance(taskNode); + + ProcessInstance processInstance200 = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE); + ProcessInstance processInstance300 = + getProcessInstanceForFindLastRunningProcess(300, 3, ExecutionStatus.SUCCESS); + + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(processInstance200); + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(3), Mockito.any(), Mockito.any())) + .thenReturn(processInstance300); - dependentTask.call(); + // for DependentExecute.getDependTaskResult + Mockito.when(processService + .findValidTaskListByProcessId(200)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.FAILURE, "A", processInstance200) + ).collect(Collectors.toList())); + Mockito.when(processService + .findValidTaskListByProcessId(300)) + .thenReturn(Stream.of( + getTaskInstanceForValidTaskList(3000, ExecutionStatus.SUCCESS, "B", processInstance300), + getTaskInstanceForValidTaskList(3001, ExecutionStatus.SUCCESS, "C", processInstance300) + ).collect(Collectors.toList())); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); + } - Assert.assertEquals(ExecutionStatus.SUCCESS, dependentTask.getTaskInstance().getState()); + /** + * test when dependent on ALL tasks in another process + */ + private void testDependentOnAllInit() { + TaskNode taskNode = getDependantTaskNode(); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setRelation(DependentRelation.AND); + dependentTaskModel.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, Constants.DEPENDENT_ALL, "today", "day") + ).collect(Collectors.toList())); + + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.AND); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); + + // dependence: AND(AND(2:ALL today day)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + + setupTaskInstance(taskNode); + } - DateInterval dateInterval =DependentDateUtils.getTodayInterval(new Date()).get(0); + @Test + public void testDependentOnAllSuccess() throws Exception { + testDependentOnAllInit(); + // for DependentExecute.findLastProcessInterval Mockito.when(processService - .findLastRunningProcess(4, dateInterval.getStartTime(), - dateInterval.getEndTime())) - .thenReturn(findLastStopProcessInterval()); + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.SUCCESS)); - Mockito.when(processService - .findValidTaskListByProcessId(11)) - .thenReturn(getErrorTaskInstances()); - DependentTaskExecThread dependentFailure = new DependentTaskExecThread(taskInstance); - dependentFailure.call(); - Assert.assertEquals(ExecutionStatus.FAILURE, dependentFailure.getTaskInstance().getState()); + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecThread.getTaskInstance().getState()); } - private ProcessInstance findLastStopProcessInterval(){ - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(11); - processInstance.setProcessDefinitionId(4); - processInstance.setState(ExecutionStatus.STOP); - return processInstance; - } + @Test + public void testDependentOnAllFailure() throws Exception { + testDependentOnAllInit(); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.FAILURE)); - private ProcessInstance findLastProcessInterval(){ - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(11); - processInstance.setProcessDefinitionId(4); - processInstance.setState(ExecutionStatus.SUCCESS); - return processInstance; + DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance); + dependentTask.call(); + Assert.assertEquals(ExecutionStatus.FAILURE, dependentTask.getTaskInstance().getState()); } - private ProcessDefinition getProcessDefinition(){ - ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setId(0); - return processDefinition; + /** + * test whether waitTaskQuit has been well impl + */ + @Test + public void testWaitAndCancel() throws Exception { + // for the poor independence of UT, error on other place may causes the condition happens + if (!Stopper.isRunning()) { + return; + } + + TaskNode taskNode = getDependantTaskNode(); + DependentTaskModel dependentTaskModel = new DependentTaskModel(); + dependentTaskModel.setRelation(DependentRelation.AND); + dependentTaskModel.setDependItemList(Stream.of( + getDependentItemFromTaskNode(2, "A", "today", "day") + ).collect(Collectors.toList())); + + DependentParameters dependentParameters = new DependentParameters(); + dependentParameters.setRelation(DependentRelation.AND); + dependentParameters.setDependTaskList(Stream.of(dependentTaskModel).collect(Collectors.toList())); + + // dependence: AND(AND(2:A today day)) + taskNode.setDependence(JSONUtils.toJsonString(dependentParameters)); + + setupTaskInstance(taskNode); + + ProcessInstance dependentProcessInstance = + getProcessInstanceForFindLastRunningProcess(200, 2, ExecutionStatus.RUNNING_EXECUTION); + // for DependentExecute.findLastProcessInterval + Mockito.when(processService + .findLastRunningProcess(Mockito.eq(2), Mockito.any(), Mockito.any())) + .thenReturn(dependentProcessInstance); + + DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance); + + // for DependentExecute.getDependTaskResult + Mockito.when(processService + .findValidTaskListByProcessId(200)) + .thenAnswer(i -> { + processInstance.setState(ExecutionStatus.READY_STOP); + return Stream.of( + getTaskInstanceForValidTaskList(2000, ExecutionStatus.RUNNING_EXECUTION, "A", dependentProcessInstance) + ).collect(Collectors.toList()); + }) + .thenThrow(new IllegalStateException("have not been stopped as expected")); + + taskExecThread.call(); + Assert.assertEquals(ExecutionStatus.KILL, taskExecThread.getTaskInstance().getState()); } - private ProcessInstance getProcessInstance(){ + private ProcessInstance getProcessInstance(int processInstanceId, int processDefinitionId) { ProcessInstance processInstance = new ProcessInstance(); - processInstance.setId(10111); - processInstance.setProcessDefinitionId(0); + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); processInstance.setState(ExecutionStatus.RUNNING_EXECUTION); - return processInstance; } + /** + * task that dependent on others (and to be tested here) + * notice: should be filled with setDependence() and be passed to setupTaskInstance() + */ + private TaskNode getDependantTaskNode() { + TaskNode taskNode = new TaskNode(); + taskNode.setId("tasks-10"); + taskNode.setName("D"); + taskNode.setType(TaskType.DEPENDENT.toString()); + taskNode.setRunFlag(FLOWNODE_RUN_FLAG_NORMAL); + return taskNode; + } - private List getTaskNodes(){ - List list = new ArrayList<>(); - TaskDefinition taskNode = new TaskDefinition(); - taskNode.setCode(1111L); - taskNode.setName("C"); - taskNode.setTaskType(TaskType.SQL); - list.add(taskNode); - return list; + private void setupTaskInstance(TaskNode taskNode) { + taskInstance = new TaskInstance(); + taskInstance.setId(1000); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + taskInstance.setTaskJson(JSONUtils.toJsonString(taskNode)); + taskInstance.setTaskType(taskNode.getType()); + taskInstance.setName(taskNode.getName()); } - private List getErrorTaskInstances(){ - List list = new ArrayList<>(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setName("C"); - taskInstance.setState(ExecutionStatus.SUCCESS); - taskInstance.setDependency("1231"); - list.add(taskInstance); - return list; + /** + * DependentItem defines the condition for the dependent + */ + private DependentItem getDependentItemFromTaskNode( + int processDefinitionId, String taskName, + String date, String cycle + ) { + DependentItem dependentItem = new DependentItem(); + dependentItem.setDefinitionId(processDefinitionId); + dependentItem.setDepTasks(taskName); + dependentItem.setDateValue(date); + dependentItem.setCycle(cycle); + // so far, the following fields have no effect + dependentItem.setDependResult(DependResult.SUCCESS); + dependentItem.setStatus(ExecutionStatus.SUCCESS); + return dependentItem; } - private List getTaskInstances(){ - List list = new ArrayList<>(); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setName("D"); - taskInstance.setState(ExecutionStatus.SUCCESS); - taskInstance.setDependency("1231"); - list.add(taskInstance); - return list; + private ProcessInstance getProcessInstanceForFindLastRunningProcess( + int processInstanceId, int processDefinitionId, ExecutionStatus state + ) { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(processInstanceId); + processInstance.setProcessDefinitionId(processDefinitionId); + processInstance.setState(state); + return processInstance; } - private TaskInstance getTaskInstance(){ + private TaskInstance getTaskInstanceForValidTaskList( + int taskInstanceId, ExecutionStatus state, + String taskName, ProcessInstance processInstance + ) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setTaskType("DEPENDENT"); - taskInstance.setId(252612); - taskInstance.setName("C"); - taskInstance.setProcessInstanceId(10111); + taskInstance.setId(taskInstanceId); + taskInstance.setName(taskName); + taskInstance.setProcessInstanceId(processInstance.getId()); + taskInstance.setProcessDefinitionId(processInstance.getProcessDefinitionId()); taskInstance.setTaskJson("{}"); - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + taskInstance.setState(state); return taskInstance; } - -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 8c2321dd8e..74cd2da6c8 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -45,7 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import java.util.ArrayList; @@ -67,7 +67,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class, + ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, CuratorZookeeperClient.class}) public class TaskPriorityQueueConsumerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java index a180f51576..9b62473930 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -60,8 +59,8 @@ public class MasterRegistryTest { masterRegistry.registry(); String masterPath = zookeeperRegistryCenter.getMasterPath(); TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node - String masterNodePath = masterPath + "/" + (NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort())); - String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath); + String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort()); + String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); masterRegistry.unRegistry(); } @@ -73,7 +72,7 @@ public class MasterRegistryTest { TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node masterRegistry.unRegistry(); String masterPath = zookeeperRegistryCenter.getMasterPath(); - List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath); + List childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath); Assert.assertTrue(childrenKeys.isEmpty()); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java new file mode 100644 index 0000000000..24bb25c97f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.registry; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * zookeeper registry center test + */ +@RunWith(MockitoJUnitRunner.class) +public class ZookeeperRegistryCenterTest { + + @InjectMocks + private ZookeeperRegistryCenter zookeeperRegistryCenter; + + @Mock + protected RegisterOperator registerOperator; + + @Mock + private ZookeeperConfig zookeeperConfig; + + private static final String DS_ROOT = "/dolphinscheduler"; + + @Test + public void testGetDeadZNodeParentPath() { + ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); + zookeeperConfig.setDsRoot(DS_ROOT); + Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig); + + String deadZNodeParentPath = zookeeperRegistryCenter.getDeadZNodeParentPath(); + + Assert.assertEquals(deadZNodeParentPath, DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS); + + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index 8938f49773..bf04f1f569 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -43,7 +43,7 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import java.util.Date; @@ -71,7 +71,7 @@ import io.netty.channel.Channel; ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, + RegisterOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java index a71e48030d..d7066c0d40 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java @@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executor; - -import org.apache.curator.framework.imps.CuratorFrameworkImpl; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; + +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.state.ConnectionStateListener; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,7 +63,7 @@ public class WorkerRegistryTest { private ZookeeperRegistryCenter zookeeperRegistryCenter; @Mock - private ZookeeperCachedOperator zookeeperCachedOperator; + private RegisterOperator registerOperator; @Mock private CuratorFrameworkImpl zkClient; @@ -69,15 +71,21 @@ public class WorkerRegistryTest { @Mock private WorkerConfig workerConfig; + private static final Set workerGroups; + + static { + workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); + } + @Before public void before() { - Set workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); + Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups); Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker"); - Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator); - Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient); - Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn( + Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator); + Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient); + Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn( new Listenable() { @Override public void addListener(ConnectionStateListener connectionStateListener) { @@ -114,7 +122,7 @@ public class WorkerRegistryTest { int i = 0; for (String workerGroup : workerConfig.getWorkerGroups()) { String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getAddr(workerConfig.getListenPort())); - String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath); + String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath); if (0 == i) { Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/")); } else { @@ -156,7 +164,7 @@ public class WorkerRegistryTest { for (String workerGroup : workerConfig.getWorkerGroups()) { String workerGroupPath = workerPath + "/" + workerGroup.trim(); - List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); + List childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath); Assert.assertTrue(childrenKeys.isEmpty()); } @@ -168,4 +176,10 @@ public class WorkerRegistryTest { workerRegistry.unRegistry(); } + + @Test + public void testGetWorkerZkPaths() { + workerRegistry.init(); + Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size()); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 37d8f10c93..7cdf680090 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -17,14 +17,8 @@ package org.apache.dolphinscheduler.service.zk; -import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; import static org.apache.dolphinscheduler.common.Constants.COLON; -import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; -import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -47,57 +41,10 @@ import org.springframework.stereotype.Component; * abstract zookeeper client */ @Component -public abstract class AbstractZKClient extends ZookeeperCachedOperator { +public abstract class AbstractZKClient extends RegisterOperator { private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); - /** - * remove dead server by host - * - * @param host host - * @param serverType serverType - */ - public void removeDeadServerByHost(String host, String serverType) { - List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); - for (String serverPath : deadServers) { - if (serverPath.startsWith(serverType + UNDERLINE + host)) { - String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; - super.remove(server); - logger.info("{} server {} deleted from zk dead server path success", serverType, host); - } - } - } - - /** - * opType(add): if find dead server , then add to zk deadServerPath - * opType(delete): delete path from zk - * - * @param zNode node path - * @param zkNodeType master or worker - * @param opType delete or add - */ - public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) { - String host = getHostByEventDataPath(zNode); - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; - - //check server restart, if restart , dead server path in zk should be delete - if (opType.equals(DELETE_ZK_OP)) { - removeDeadServerByHost(host, type); - - } else if (opType.equals(ADD_ZK_OP)) { - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; - if (!super.isExisted(deadServerPath)) { - //add dead server info to zk dead server path : /dead-servers/ - - super.persist(deadServerPath, (type + UNDERLINE + host)); - - logger.info("{} server dead , and {} added to zk dead server path success", - zkNodeType, zNode); - } - } - - } - /** * get active master num * @@ -187,7 +134,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { /** * check the zookeeper node already exists * - * @param host host + * @param host host * @param zkNodeType zookeeper node type * @return true if exists */ @@ -247,12 +194,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { return path; } - /** - * @return get dead server node parent path - */ - protected String getDeadZNodeParentPath() { - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; - } /** * @return get master start up lock path @@ -310,26 +251,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { } } - /** - * get host ip, string format: masterParentPath/ip - * - * @param path path - * @return host ip, string format: masterParentPath/ip - */ - protected String getHostByEventDataPath(String path) { - if (StringUtils.isEmpty(path)) { - logger.error("empty path!"); - return ""; - } - String[] pathArray = path.split(SINGLE_SLASH); - if (pathArray.length < 1) { - logger.error("parse ip error: {}", path); - return ""; - } - return pathArray[pathArray.length - 1]; - - } - @Override public String toString() { return "AbstractZKClient{" diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java new file mode 100644 index 0000000000..0fd4a4fa92 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.List; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * register operator + */ +@Component +public class RegisterOperator extends ZookeeperCachedOperator { + + private final Logger logger = LoggerFactory.getLogger(RegisterOperator.class); + + /** + * @return get dead server node parent path + */ + protected String getDeadZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; + } + + /** + * remove dead server by host + * + * @param host host + * @param serverType serverType + * @throws Exception + */ + public void removeDeadServerByHost(String host, String serverType) throws Exception { + List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); + for (String serverPath : deadServers) { + if (serverPath.startsWith(serverType + UNDERLINE + host)) { + String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; + super.remove(server); + logger.info("{} server {} deleted from zk dead server path success", serverType, host); + } + } + } + + /** + * get host ip, string format: masterParentPath/ip + * + * @param path path + * @return host ip, string format: masterParentPath/ip + */ + protected String getHostByEventDataPath(String path) { + if (StringUtils.isEmpty(path)) { + logger.error("empty path!"); + return ""; + } + String[] pathArray = path.split(SINGLE_SLASH); + if (pathArray.length < 1) { + logger.error("parse ip error: {}", path); + return ""; + } + return pathArray[pathArray.length - 1]; + + } + + /** + * opType(add): if find dead server , then add to zk deadServerPath + * opType(delete): delete path from zk + * + * @param zNode node path + * @param zkNodeType master or worker + * @param opType delete or add + * @throws Exception errors + */ + public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { + String host = getHostByEventDataPath(zNode); + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + + //check server restart, if restart , dead server path in zk should be delete + if (opType.equals(DELETE_ZK_OP)) { + removeDeadServerByHost(host, type); + + } else if (opType.equals(ADD_ZK_OP)) { + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; + if (!super.isExisted(deadServerPath)) { + //add dead server info to zk dead server path : /dead-servers/ + + super.persist(deadServerPath, (type + UNDERLINE + host)); + + logger.info("{} server dead , and {} added to zk dead server path success", + zkNodeType, zNode); + } + } + + } + + /** + * opType(add): if find dead server , then add to zk deadServerPath + * opType(delete): delete path from zk + * + * @param zNodeSet node path set + * @param zkNodeType master or worker + * @param opType delete or add + * @throws Exception errors + */ + public void handleDeadServer(Set zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception { + + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + for (String zNode : zNodeSet) { + String host = getHostByEventDataPath(zNode); + //check server restart, if restart , dead server path in zk should be delete + if (opType.equals(DELETE_ZK_OP)) { + removeDeadServerByHost(host, type); + + } else if (opType.equals(ADD_ZK_OP)) { + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; + if (!super.isExisted(deadServerPath)) { + //add dead server info to zk dead server path : /dead-servers/ + + super.persist(deadServerPath, (type + UNDERLINE + host)); + + logger.info("{} server dead , and {} added to zk dead server path success", + zkNodeType, zNode); + } + } + + } + + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index 88c339b045..54913cf915 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -32,6 +32,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +/** + * zookeeper cache operator + */ @Component public class ZookeeperCachedOperator extends ZookeeperOperator { diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java new file mode 100644 index 0000000000..f828c0772f --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; + +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * register operator test + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class RegisterOperatorTest { + + private static ZKServer zkServer; + + @InjectMocks + private RegisterOperator registerOperator; + + @Mock + private ZookeeperConfig zookeeperConfig; + + private static final String DS_ROOT = "/dolphinscheduler"; + private static final String MASTER_NODE = "127.0.0.1:5678"; + + @Before + public void before() { + new Thread(() -> { + if (zkServer == null) { + zkServer = new ZKServer(); + } + zkServer.startLocalZkServer(2185); + }).start(); + } + + @Test + public void testAfterPropertiesSet() throws Exception { + TimeUnit.SECONDS.sleep(10); + Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185"); + Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100); + Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10); + Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000); + Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000); + Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000); + Mockito.when(zookeeperConfig.getDigest()).thenReturn(""); + Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT); + Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000); + + registerOperator.afterPropertiesSet(); + Assert.assertNotNull(registerOperator.getZkClient()); + } + + @After + public void after() { + if (zkServer != null) { + zkServer.stop(); + } + } + + @Test + public void testGetDeadZNodeParentPath() throws Exception { + + testAfterPropertiesSet(); + String path = registerOperator.getDeadZNodeParentPath(); + + Assert.assertEquals(DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path); + } + + @Test + public void testHandleDeadServer() throws Exception { + testAfterPropertiesSet(); + registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); + String path = registerOperator.getDeadZNodeParentPath(); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + + } + + @Test + public void testRemoveDeadServerByHost() throws Exception { + testAfterPropertiesSet(); + String path = registerOperator.getDeadZNodeParentPath(); + + registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + + registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX); + Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue index 5eb0a109a2..81d59d8b95 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/log.vue @@ -168,7 +168,7 @@ * Download log */ _downloadLog () { - downloadFile('/log/download-log', { + downloadFile('log/download-log', { taskInstanceId: this.stateId || this.logId }) }, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue index fd8a2b8955..b664904de7 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/dependItemList.vue @@ -16,17 +16,17 @@ */