diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java index e69f9a9f76..9c02111c36 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java @@ -86,21 +86,20 @@ public class CollectionUtils { * @return string to map */ public static Map stringToMap(String str, String separator, String keyPrefix) { - if (null == str || "".equals(str)) { - return null; + Map emptyMap = new HashMap<>(0); + if (StringUtils.isEmpty(str)) { + return emptyMap; } - if (null == separator || "".equals(separator)) { - return null; + if (StringUtils.isEmpty(separator)) { + return emptyMap; } String[] strings = str.split(separator); - int mapLength = strings.length; - if ((strings.length % 2) != 0) { - mapLength = mapLength + 1; - } - - Map map = new HashMap<>(mapLength); + Map map = new HashMap<>(strings.length); for (int i = 0; i < strings.length; i++) { String[] strArray = strings[i].split("="); + if (strArray.length != 2) { + return emptyMap; + } //strArray[0] KEY strArray[1] VALUE if (StringUtils.isEmpty(keyPrefix)) { map.put(strArray[0], strArray[1]); @@ -146,7 +145,7 @@ public class CollectionUtils { * @param obj the object * @return the maximum frequency of the object */ - public final int max(final Object obj) { + private int max(final Object obj) { return Math.max(freqA(obj), freqB(obj)); } @@ -156,7 +155,7 @@ public class CollectionUtils { * @param obj the object * @return the minimum frequency of the object */ - public final int min(final Object obj) { + private int min(final Object obj) { return Math.min(freqA(obj), freqB(obj)); } @@ -180,10 +179,10 @@ public class CollectionUtils { return getFreq(obj, cardinalityB); } - private final int getFreq(final Object obj, final Map freqMap) { + private int getFreq(final Object obj, final Map freqMap) { final Integer count = freqMap.get(obj); if (count != null) { - return count.intValue(); + return count; } return 0; } @@ -203,7 +202,7 @@ public class CollectionUtils { return true; } - if ((a == null && b != null) || a != null && b == null) { + if (a == null || b == null) { return false; } @@ -253,12 +252,7 @@ public class CollectionUtils { public static Map getCardinalityMap(final Iterable coll) { final Map count = new HashMap(); for (final O obj : coll) { - final Integer c = count.get(obj); - if (c == null) { - count.put(obj, Integer.valueOf(1)); - } else { - count.put(obj, Integer.valueOf(c.intValue() + 1)); - } + count.put(obj, count.getOrDefault(obj, 0) + 1); } return count; } @@ -273,6 +267,12 @@ public class CollectionUtils { */ public static List> getListByExclusion(List originList, Set exclusionSet) { List> instanceList = new ArrayList<>(); + if (exclusionSet == null) { + exclusionSet = new HashSet<>(); + } + if (originList == null) { + return instanceList; + } Map instanceMap; for (T instance : originList) { Map dataMap = new BeanMap(instance); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java index 0f30e7fbe0..3455d5344c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java @@ -291,14 +291,14 @@ public class DateUtils { * get some hour of day * * @param date date - * @param hours hours + * @param offsetHour hours * @return some hour of day * */ - public static Date getSomeHourOfDay(Date date, int hours) { + public static Date getSomeHourOfDay(Date date, int offsetHour) { Calendar cal = Calendar.getInstance(); cal.setTime(date); - cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours); + cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java index 574343d0cb..103e75fb61 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java @@ -34,7 +34,7 @@ public class DependentDateUtils { public static List getLastHoursInterval(Date businessDate, int hourNumber){ List dateIntervals = new ArrayList<>(); for(int index = hourNumber; index > 0; index--){ - Date lastHour = DateUtils.getSomeHourOfDay(businessDate, index); + Date lastHour = DateUtils.getSomeHourOfDay(businessDate, -index); Date beginTime = DateUtils.getStartOfHour(lastHour); Date endTime = DateUtils.getEndOfHour(lastHour); dateIntervals.add(new DateInterval(beginTime, endTime)); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java index efee627676..14e90ebcdc 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.common.queue; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.zk.StandaloneZKServerForTest; +import org.apache.dolphinscheduler.common.zk.ZKServer; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals; /** * task queue test */ -public class TaskQueueImplTest extends StandaloneZKServerForTest { +public class TaskQueueImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); @@ -43,7 +43,7 @@ public class TaskQueueImplTest extends StandaloneZKServerForTest { @Before public void before(){ - super.before(); + ZKServer.start(); tasksQueue = TaskQueueFactory.getTaskQueueInstance(); @@ -57,6 +57,7 @@ public class TaskQueueImplTest extends StandaloneZKServerForTest { public void after(){ //clear all data tasksQueue.delete(); + ZKServer.stop(); } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java index 30c11522b6..7321879ab8 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.utils; +import org.apache.dolphinscheduler.common.Constants; import org.junit.Assert; import org.junit.Test; @@ -26,19 +27,26 @@ public class CollectionUtilsTest { @Test public void equalLists() { + Assert.assertTrue(CollectionUtils.equalLists(null,null)); + Assert.assertTrue(CollectionUtils.equalLists(new ArrayList(),new ArrayList())); List a = new ArrayList(); a.add(1); a.add(2); - a.add(3); List b = new ArrayList(); - b.add(3); + b.add(1); + b.add(2); + Assert.assertTrue(CollectionUtils.equalLists(a, b)); + a.add(1); + Assert.assertFalse(CollectionUtils.equalLists(a, b)); b.add(2); + Assert.assertFalse(CollectionUtils.equalLists(a, b)); + a.add(2); b.add(1); - Assert.assertTrue(CollectionUtils.equalLists(a,b)); - Assert.assertTrue(CollectionUtils.equalLists(null,null)); - List c = new ArrayList(); - Assert.assertFalse(CollectionUtils.equalLists(c,null)); - Assert.assertFalse(CollectionUtils.equalLists(c,a)); + a.add(4); + b.add(2); + Assert.assertFalse(CollectionUtils.equalLists(a, b)); + Assert.assertFalse(CollectionUtils.equalLists(null, new ArrayList())); + Assert.assertFalse(CollectionUtils.equalLists(new ArrayList(), null)); } @Test @@ -56,7 +64,49 @@ public class CollectionUtilsTest { @Test public void stringToMap() { - Map a = CollectionUtils.stringToMap("a=b;c=d", ";", ""); + Map a = CollectionUtils.stringToMap("a=b;c=d;", ";"); Assert.assertNotNull(a); + Assert.assertTrue(a.size() == 2); + a = CollectionUtils.stringToMap(null, ";"); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("", ";"); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d", ""); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d", null); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d;e=f", ";"); + Assert.assertEquals(a.size(), 3); + a = CollectionUtils.stringToMap("a;b=f", ";"); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d;e=f;", ";", "test"); + Assert.assertEquals(a.size(), 3); + Assert.assertNotNull(a.get("testa")); } + + @Test + public void getListByExclusion() { + Assert.assertNotNull(CollectionUtils.getListByExclusion(null, null)); + List originList = new ArrayList<>(); + originList.add(1); + originList.add(2); + List> ret = CollectionUtils.getListByExclusion(originList, null); + Assert.assertEquals(ret.size(), 2); + ret = CollectionUtils.getListByExclusion(originList, new HashSet<>()); + Assert.assertEquals(ret.size(), 2); + Assert.assertFalse(ret.get(0).isEmpty()); + Set exclusion = new HashSet<>(); + exclusion.add(Constants.CLASS); + ret = CollectionUtils.getListByExclusion(originList, exclusion); + Assert.assertEquals(ret.size(), 2); + Assert.assertTrue(ret.get(0).isEmpty()); + } + + @Test + public void isNotEmpty() { + List list = new ArrayList<>(); + Assert.assertFalse(CollectionUtils.isNotEmpty(list)); + Assert.assertFalse(CollectionUtils.isNotEmpty(null)); + } + } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java index bcaa391042..6800f6b542 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java @@ -18,13 +18,11 @@ package org.apache.dolphinscheduler.common.utils; import org.junit.Assert; import org.junit.Test; - import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtilsTest { - @Test public void format2Readable() throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -54,4 +52,102 @@ public class DateUtilsTest { Assert.assertEquals(sunday, sunday1); } -} \ No newline at end of file + + @Test + public void diffHours(){ + Date d1 = DateUtils.stringToDate("2019-01-28 00:00:00"); + Date d2 = DateUtils.stringToDate("2019-01-28 20:00:00"); + Assert.assertEquals(DateUtils.diffHours(d1, d2), 20); + Date d3 = DateUtils.stringToDate("2019-01-28 20:00:00"); + Assert.assertEquals(DateUtils.diffHours(d3, d2), 0); + Assert.assertEquals(DateUtils.diffHours(d2, d1), 20); + Date d4 = null; + Assert.assertEquals(DateUtils.diffHours(d2, d4), 0); + } + + @Test + public void dateToString() { + Date d1 = DateUtils.stringToDate("2019-01-28"); + Assert.assertNull(d1); + d1 = DateUtils.stringToDate("2019-01-28 00:00:00"); + Assert.assertEquals(DateUtils.dateToString(d1), "2019-01-28 00:00:00"); + } + + @Test + public void getSomeDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 00:00:00"); + Date curr = DateUtils.getSomeDay(d1, 1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-01 00:00:00"); + Assert.assertEquals(DateUtils.dateToString(DateUtils.getSomeDay(d1, -31)), "2018-12-31 00:00:00"); + } + + @Test + public void getFirstDayOfMonth() { + Date d1 = DateUtils.stringToDate("2019-01-31 00:00:00"); + Date curr = DateUtils.getFirstDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-01 00:00:00"); + + d1 = DateUtils.stringToDate("2019-01-31 01:59:00"); + curr = DateUtils.getFirstDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-01 01:59:00"); + } + + @Test + public void getSomeHourOfDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59"); + Date curr = DateUtils.getSomeHourOfDay(d1, -1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 10:00:00"); + curr = DateUtils.getSomeHourOfDay(d1, 0); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:00:00"); + curr = DateUtils.getSomeHourOfDay(d1, 2); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 13:00:00"); + curr = DateUtils.getSomeHourOfDay(d1, 24); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-01 11:00:00"); + } + + @Test + public void getLastDayOfMonth() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59"); + Date curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); + d1 = DateUtils.stringToDate("2019-01-02 11:59:59"); + curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); + + d1 = DateUtils.stringToDate("2019-02-02 11:59:59"); + curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-28 11:59:59"); + + d1 = DateUtils.stringToDate("2020-02-02 11:59:59"); + curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2020-02-29 11:59:59"); + } + + @Test + public void getStartOfDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59"); + Date curr = DateUtils.getStartOfDay(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 00:00:00"); + } + + @Test + public void getEndOfDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59"); + Date curr = DateUtils.getEndOfDay(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 23:59:59"); + } + + @Test + public void getStartOfHour() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59"); + Date curr = DateUtils.getStartOfHour(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:00:00"); + } + + @Test + public void getEndOfHour() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59"); + Date curr = DateUtils.getEndOfHour(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java deleted file mode 100644 index fed9ebb18a..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.zk; - -import java.io.File; -import java.util.Properties; - -import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.junit.Before; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * just for test - */ -public class StandaloneZKServerForTest { - - private static final Logger logger = LoggerFactory.getLogger(StandaloneZKServerForTest.class); - - private static volatile ZooKeeperServerMain zkServer = null; - - - @Before - public void before() { - logger.info("standalone zookeeper server for test service start "); - - ThreadPoolExecutors.getInstance().execute(new Runnable() { - @Override - public void run() { - - //delete zk data dir ? - File zkFile = new File(System.getProperty("java.io.tmpdir"), "zookeeper"); - - startStandaloneServer("2000", zkFile.getAbsolutePath(), "2181", "10", "5"); - } - }); - - } - - - /** - * start zk server - * @param tickTime zookeeper ticktime - * @param dataDir zookeeper data dir - * @param clientPort zookeeper client port - * @param initLimit zookeeper init limit - * @param syncLimit zookeeper sync limit - */ - private void startStandaloneServer(String tickTime, String dataDir, String clientPort, String initLimit, String syncLimit) { - Properties props = new Properties(); - props.setProperty("tickTime", tickTime); - props.setProperty("dataDir", dataDir); - props.setProperty("clientPort", clientPort); - props.setProperty("initLimit", initLimit); - props.setProperty("syncLimit", syncLimit); - - QuorumPeerConfig quorumConfig = new QuorumPeerConfig(); - try { - quorumConfig.parseProperties(props); - - if(zkServer == null ){ - - synchronized (StandaloneZKServerForTest.class){ - if(zkServer == null ){ - zkServer = new ZooKeeperServerMain(); - final ServerConfig config = new ServerConfig(); - config.readFrom(quorumConfig); - zkServer.runFromConfig(config); - } - } - - } - - } catch (Exception e) { - logger.error("start standalone server fail!", e); - } - } - - -} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java new file mode 100644 index 0000000000..5c3db2d5d1 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * demo for using zkServer + */ +public class TestZk { + + @Before + public void before(){ + ZKServer.start(); + } + + @Test + public void test(){ + Assert.assertTrue(ZKServer.isStarted()); + } + + @After + public void after(){ + ZKServer.stop(); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java new file mode 100644 index 0000000000..5aba9fd8a1 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * just for test + */ +public class ZKServer { + + private static final Logger logger = LoggerFactory.getLogger(ZKServer.class); + + private static volatile PublicZooKeeperServerMain zkServer = null; + + public static final int DEFAULT_ZK_TEST_PORT = 22181; + + public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT; + + private static String dataDir = null; + + private static final AtomicBoolean isStarted = new AtomicBoolean(false); + + public static void start() { + try { + startLocalZkServer(DEFAULT_ZK_TEST_PORT); + } catch (Exception e) { + logger.error("Failed to start ZK: " + e); + } + } + + public static boolean isStarted(){ + return isStarted.get(); + } + + static class PublicZooKeeperServerMain extends ZooKeeperServerMain { + + @Override + public void initializeAndRun(String[] args) + throws QuorumPeerConfig.ConfigException, IOException { + super.initializeAndRun(args); + } + + @Override + public void shutdown() { + super.shutdown(); + } + } + + /** + * Starts a local Zk instance with a generated empty data directory + * + * @param port The port to listen on + */ + public static void startLocalZkServer(final int port) { + startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis()); + } + + /** + * Starts a local Zk instance + * + * @param port The port to listen on + * @param dataDirPath The path for the Zk data directory + */ + private static synchronized void startLocalZkServer(final int port, final String dataDirPath) { + if (zkServer != null) { + throw new RuntimeException("Zookeeper server is already started!"); + } + try { + zkServer = new PublicZooKeeperServerMain(); + logger.info("Zookeeper data path : {} ", dataDirPath); + dataDir = dataDirPath; + final String[] args = new String[]{Integer.toString(port), dataDirPath}; + Thread init = new Thread(new Runnable() { + @Override + public void run() { + try { + zkServer.initializeAndRun(args); + } catch (QuorumPeerConfig.ConfigException e) { + logger.warn("Caught exception while starting ZK", e); + } catch (IOException e) { + logger.warn("Caught exception while starting ZK", e); + } + } + }, "init-zk-thread"); + init.start(); + } catch (Exception e) { + logger.warn("Caught exception while starting ZK", e); + throw new RuntimeException(e); + } + + CuratorFramework zkClient = CuratorFrameworkFactory.builder() + .connectString(DEFAULT_ZK_STR) + .retryPolicy(new ExponentialBackoffRetry(10,100)) + .sessionTimeoutMs(1000 * 30) + .connectionTimeoutMs(1000 * 30) + .build(); + + try { + zkClient.blockUntilConnected(10, TimeUnit.SECONDS); + zkClient.close(); + } catch (InterruptedException ignore) { + } + isStarted.compareAndSet(false, true); + logger.info("zk server started"); + } + + /** + * Stops a local Zk instance, deleting its data directory + */ + public static void stop() { + try { + stopLocalZkServer(true); + } catch (Exception e) { + logger.error("Failed to stop ZK ",e); + } + } + + /** + * Stops a local Zk instance. + * + * @param deleteDataDir Whether or not to delete the data directory + */ + private static synchronized void stopLocalZkServer(final boolean deleteDataDir) { + if (zkServer != null) { + try { + zkServer.shutdown(); + zkServer = null; + if (deleteDataDir) { + org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir)); + } + isStarted.compareAndSet(true, false); + } catch (Exception e) { + logger.warn("Caught exception while stopping ZK server", e); + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java index a4b8618bfd..da17e14044 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java @@ -154,7 +154,7 @@ public class UserMapperTest { accessToken.setToken("secrettoken"); accessToken.setCreateTime(new Date()); accessToken.setUpdateTime(new Date()); - accessToken.setExpireTime(DateUtils.getSomeHourOfDay(new Date(),-1)); + accessToken.setExpireTime(DateUtils.getSomeHourOfDay(new Date(),1)); accessTokenMapper.insert(accessToken); return accessToken; } @@ -356,4 +356,4 @@ public class UserMapperTest { accessTokenMapper.deleteById(accessToken.getId()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 776290aafa..b9b3ad6824 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -68,6 +68,7 @@ public abstract class AbstractYarnTask extends AbstractTask { } catch (Exception e) { logger.error("yarn process failure", e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index 30c5a1a5d2..b0bb4c6f4c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -99,7 +99,7 @@ public class DependentTask extends AbstractTask { } @Override - public void handle(){ + public void handle() throws Exception { // set the name of the current thread String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); @@ -135,6 +135,7 @@ public class DependentTask extends AbstractTask { }catch (Exception e){ logger.error(e.getMessage(),e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index a5c7390499..44eef65aba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -113,23 +113,20 @@ public class HttpTask extends AbstractTask { long startTime = System.currentTimeMillis(); String statusCode = null; String body = null; - try(CloseableHttpClient client = createHttpClient()) { - try(CloseableHttpResponse response = sendRequest(client)) { - statusCode = String.valueOf(getStatusCode(response)); - body = getResponseBody(response); - exitStatusCode = validResponse(body, statusCode); - long costTime = System.currentTimeMillis() - startTime; - logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}", - DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output); - }catch (Exception e) { - appendMessage(e.toString()); - exitStatusCode = -1; - logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); - } - } catch (Exception e) { + + try(CloseableHttpClient client = createHttpClient(); + CloseableHttpResponse response = sendRequest(client)) { + statusCode = String.valueOf(getStatusCode(response)); + body = getResponseBody(response); + exitStatusCode = validResponse(body, statusCode); + long costTime = System.currentTimeMillis() - startTime; + logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}", + DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output); + }catch (Exception e){ appendMessage(e.toString()); exitStatusCode = -1; logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index 3418c741f8..59cf8a6e24 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -97,14 +97,13 @@ public class ProcedureTask extends AbstractTask { procedureParameters.getMethod(), procedureParameters.getLocalParams()); - // determine whether there is a data source - if (procedureParameters.getDatasource() == 0){ - logger.error("datasource id not exists"); + DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource()); + if (dataSource == null){ + logger.error("datasource not exists"); exitStatusCode = -1; - return; + throw new IllegalArgumentException("datasource not found"); } - DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource()); logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", dataSource.getName(), dataSource.getType(), @@ -112,11 +111,6 @@ public class ProcedureTask extends AbstractTask { dataSource.getUserId(), dataSource.getConnectionParams()); - if (dataSource == null){ - logger.error("datasource not exists"); - exitStatusCode = -1; - return; - } Connection connection = null; CallableStatement stmt = null; try { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index fffd5f080d..f6227b15a4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -98,6 +98,7 @@ public class PythonTask extends AbstractTask { } catch (Exception e) { logger.error("python task failure", e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index fb7d2268e5..438d373775 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -106,6 +106,7 @@ public class ShellTask extends AbstractTask { } catch (Exception e) { logger.error("shell task failure", e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index f0478ac74c..ccfee2efec 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -261,9 +261,7 @@ public class SqlTask extends AbstractTask { Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), SEMICOLON, HIVE_CONF); - if(connParamMap != null){ - paramProp.putAll(connParamMap); - } + paramProp.putAll(connParamMap); connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), paramProp); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index 3d428eab89..272fb546da 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -29,7 +29,7 @@ public class DependentTaskTest { @Test - public void testDependInit(){ + public void testDependInit() throws Exception{ TaskProps taskProps = new TaskProps(); diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js index 0a86186933..11f22132c7 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js @@ -91,22 +91,232 @@ Dag.prototype.toolbarEvent = function ({ item, code, is }) { /** * Echo data display */ -Dag.prototype.backfill = function () { - jsPlumb.ready(() => { - JSP.init({ - dag: this.dag, - instance: this.instance +Dag.prototype.backfill = function (arg) { + if(arg) { + let locationsValue = store.state.dag.locations + let locationsValue1 = store.state.dag.locations + let locationsValue2 = store.state.dag.locations + let arr = [] + for (let i in locationsValue1) { + let objs = new Object(); + objs.id = i + arr.push(Object.assign(objs,locationsValue1[i])); //Attributes + } + let tmp = [] + for(let i in locationsValue2) { + if(locationsValue2[i].targetarr !='' && locationsValue2[i].targetarr.split(',').length>1) { + tmp.push(locationsValue2[i]) + } + } + + function copy (array) { + let newArray = [] + for(let item of array) { + newArray.push(item); + } + return newArray; + } + + + let newArr = copy(arr) + function getNewArr() { + for(let i= 0; i1) { + newArr[i].targetarr = newArr[i].targetarr.split(',').shift() + } + } + return newArr + } + getNewArr() + /** + * @description Transform flat data into a tree structure + * @param {Array} arr Flat data + * @param {String} pidStr targetarr key name + * @param {String} idStr id key name + * @param {String} childrenStr children key name + */ + function fommat({arrayList, pidStr = 'targetarr', idStr = 'id', childrenStr = 'children'}) { + let listOjb = {}; // Used to store objects of the form {key: obj} + let treeList = []; // An array to store the final tree structure data + // Transform the data into {key: obj} format, which is convenient for the following data processing + for (let i = 0; i < arrayList.length; i++) { + listOjb[arrayList[i][idStr]] = arrayList[i] + } + // Format data based on pid + for (let j = 0; j < arrayList.length; j++) { + // Determine if the parent exists + // let haveParent = arrayList[j].targetarr.split(',').length>1?listOjb[arrayList[j].targetarr.split(',')[0]]:listOjb[arrayList[j][pidStr]] + let haveParent = listOjb[arrayList[j][pidStr]] + if (haveParent) { + // If there is no parent children field, create a children field + !haveParent[childrenStr] && (haveParent[childrenStr] = []) + // Insert child in parent + haveParent[childrenStr].push(arrayList[j]) + } else { + // If there is no parent, insert directly into the outermost layer + treeList.push(arrayList[j]) + } + } + return treeList + } + let datas = fommat({arrayList: newArr,pidStr: 'targetarr'}) + // Count the number of leaf nodes + function getLeafCountTree(json) { + if(!json.children) { + json.colspan = 1; + return 1; + } else { + let leafCount = 0; + for(let i = 0 ; i < json.children.length ; i++){ + leafCount = leafCount + getLeafCountTree(json.children[i]); + } + json.colspan = leafCount; + return leafCount; + } + } + // Number of tree node levels + let countTree = getLeafCountTree(datas[0]) + function getMaxFloor(treeData) { + let floor = 0 + let v = this + let max = 0 + function each (data, floor) { + data.forEach(e => { + e.floor = floor + e.x=floor*170 + if (floor > max) { + max = floor + } + if (e.children) { + each(e.children, floor + 1) + } + }) + } + each(treeData,1) + return max + } + getMaxFloor(datas) + // The last child of each node + let lastchildren = []; + forxh(datas); + function forxh(list) { + for (let i = 0; i < list.length; i++) { + let chlist = list[i]; + if (chlist.children) { + forxh(chlist.children); + } else { + lastchildren.push(chlist); + } + } + } + // Get all parent nodes above the leaf node + function treeFindPath (tree, func, path,n) { + if (!tree) return [] + for (const data of tree) { + path.push(data.name) + if (func(data)) return path + if (data.children) { + const findChildren = treeFindPath(data.children, func, path,n) + if (findChildren.length) return findChildren + } + path.pop() + } + return [] + } + function toLine(data){ + return data.reduce((arr, {id, name, targetarr, x, y, children = []}) => + arr.concat([{id, name, targetarr, x, y}], toLine(children)), []) + return result; + } + let listarr = toLine(datas); + let listarrs = toLine(datas) + let dataObject = {} + for(let i = 0; i value2) { + return 1; + } else { + return 0; + } + }; + } + + lastchildren = lastchildren.sort(createComparisonFunction('x')) + + // Coordinate value of each leaf node + for(let a = 0; a data.targetarr===lastchildren[i].targetarr,[],i+1) + for(let j = 0; j1) { + dataObject[Object.keys(locationsValue1)[0]].y = (countTree/2)*120+50 + } + + locationsValue = dataObject + jsPlumb.ready(() => { + JSP.init({ + dag: this.dag, + instance: this.instance + }) + // Backfill + JSP.jspBackfill({ + // connects + connects: _.cloneDeep(store.state.dag.connects), + // Node location information + locations: _.cloneDeep(locationsValue), + // Node data + largeJson: _.cloneDeep(store.state.dag.tasks) + }) }) - // Backfill - JSP.jspBackfill({ - // connects - connects: _.cloneDeep(store.state.dag.connects), - // Node location information - locations: _.cloneDeep(store.state.dag.locations), - // Node data - largeJson: _.cloneDeep(store.state.dag.tasks) + } else { + jsPlumb.ready(() => { + JSP.init({ + dag: this.dag, + instance: this.instance + }) + // Backfill + JSP.jspBackfill({ + // connects + connects: _.cloneDeep(store.state.dag.connects), + // Node location information + locations: _.cloneDeep(store.state.dag.locations), + // Node data + largeJson: _.cloneDeep(store.state.dag.tasks) + }) }) - }) + } } /** diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index 0c8cd2e861..94dc2651be 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -22,6 +22,7 @@
@@ -65,10 +66,12 @@ v-for="(item,$index) in toolOperList" :class="_operationClass(item)" :id="item.code" + :key="$index" @click="_ckOperation(item,$event)">
+ {}) @@ -513,7 +552,7 @@ }) }, mounted () { - this.init() + this.init(this.arg) }, beforeDestroy () { this.resetParams() diff --git a/pom.xml b/pom.xml index 61c2297621..2524461575 100644 --- a/pom.xml +++ b/pom.xml @@ -612,10 +612,10 @@ ${maven-surefire-plugin.version} + **/common/utils/*.java + **/common/graph/*.java **/api/utils/CheckUtilsTest.java **/api/utils/FileUtilsTest.java - **/common/graph/*.java - **/*CollectionUtilsTest.java