Browse Source

Merge pull request #7 from apache/dev

update
pull/2/head
samz406 5 years ago committed by GitHub
parent
commit
19f1538000
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java
  2. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
  3. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java
  4. 7
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java
  5. 66
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java
  6. 100
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java
  7. 98
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java
  8. 43
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java
  9. 165
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
  10. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java
  11. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
  12. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
  13. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
  14. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
  15. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  16. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  17. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  18. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java
  19. 212
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js
  20. 47
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
  21. 4
      pom.xml

42
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java

@ -86,21 +86,20 @@ public class CollectionUtils {
* @return string to map * @return string to map
*/ */
public static Map<String, String> stringToMap(String str, String separator, String keyPrefix) { public static Map<String, String> stringToMap(String str, String separator, String keyPrefix) {
if (null == str || "".equals(str)) { Map<String, String> emptyMap = new HashMap<>(0);
return null; if (StringUtils.isEmpty(str)) {
return emptyMap;
} }
if (null == separator || "".equals(separator)) { if (StringUtils.isEmpty(separator)) {
return null; return emptyMap;
} }
String[] strings = str.split(separator); String[] strings = str.split(separator);
int mapLength = strings.length; Map<String, String> map = new HashMap<>(strings.length);
if ((strings.length % 2) != 0) {
mapLength = mapLength + 1;
}
Map<String, String> map = new HashMap<>(mapLength);
for (int i = 0; i < strings.length; i++) { for (int i = 0; i < strings.length; i++) {
String[] strArray = strings[i].split("="); String[] strArray = strings[i].split("=");
if (strArray.length != 2) {
return emptyMap;
}
//strArray[0] KEY strArray[1] VALUE //strArray[0] KEY strArray[1] VALUE
if (StringUtils.isEmpty(keyPrefix)) { if (StringUtils.isEmpty(keyPrefix)) {
map.put(strArray[0], strArray[1]); map.put(strArray[0], strArray[1]);
@ -146,7 +145,7 @@ public class CollectionUtils {
* @param obj the object * @param obj the object
* @return the maximum frequency of the object * @return the maximum frequency of the object
*/ */
public final int max(final Object obj) { private int max(final Object obj) {
return Math.max(freqA(obj), freqB(obj)); return Math.max(freqA(obj), freqB(obj));
} }
@ -156,7 +155,7 @@ public class CollectionUtils {
* @param obj the object * @param obj the object
* @return the minimum frequency of the object * @return the minimum frequency of the object
*/ */
public final int min(final Object obj) { private int min(final Object obj) {
return Math.min(freqA(obj), freqB(obj)); return Math.min(freqA(obj), freqB(obj));
} }
@ -180,10 +179,10 @@ public class CollectionUtils {
return getFreq(obj, cardinalityB); return getFreq(obj, cardinalityB);
} }
private final int getFreq(final Object obj, final Map<?, Integer> freqMap) { private int getFreq(final Object obj, final Map<?, Integer> freqMap) {
final Integer count = freqMap.get(obj); final Integer count = freqMap.get(obj);
if (count != null) { if (count != null) {
return count.intValue(); return count;
} }
return 0; return 0;
} }
@ -203,7 +202,7 @@ public class CollectionUtils {
return true; return true;
} }
if ((a == null && b != null) || a != null && b == null) { if (a == null || b == null) {
return false; return false;
} }
@ -253,12 +252,7 @@ public class CollectionUtils {
public static <O> Map<O, Integer> getCardinalityMap(final Iterable<? extends O> coll) { public static <O> Map<O, Integer> getCardinalityMap(final Iterable<? extends O> coll) {
final Map<O, Integer> count = new HashMap<O, Integer>(); final Map<O, Integer> count = new HashMap<O, Integer>();
for (final O obj : coll) { for (final O obj : coll) {
final Integer c = count.get(obj); count.put(obj, count.getOrDefault(obj, 0) + 1);
if (c == null) {
count.put(obj, Integer.valueOf(1));
} else {
count.put(obj, Integer.valueOf(c.intValue() + 1));
}
} }
return count; return count;
} }
@ -273,6 +267,12 @@ public class CollectionUtils {
*/ */
public static <T extends Object> List<Map<String, Object>> getListByExclusion(List<T> originList, Set<String> exclusionSet) { public static <T extends Object> List<Map<String, Object>> getListByExclusion(List<T> originList, Set<String> exclusionSet) {
List<Map<String, Object>> instanceList = new ArrayList<>(); List<Map<String, Object>> instanceList = new ArrayList<>();
if (exclusionSet == null) {
exclusionSet = new HashSet<>();
}
if (originList == null) {
return instanceList;
}
Map<String, Object> instanceMap; Map<String, Object> instanceMap;
for (T instance : originList) { for (T instance : originList) {
Map<String, Object> dataMap = new BeanMap(instance); Map<String, Object> dataMap = new BeanMap(instance);

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java

@ -291,14 +291,14 @@ public class DateUtils {
* get some hour of day * get some hour of day
* *
* @param date date * @param date date
* @param hours hours * @param offsetHour hours
* @return some hour of day * @return some hour of day
* */ * */
public static Date getSomeHourOfDay(Date date, int hours) { public static Date getSomeHourOfDay(Date date, int offsetHour) {
Calendar cal = Calendar.getInstance(); Calendar cal = Calendar.getInstance();
cal.setTime(date); cal.setTime(date);
cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours); cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour);
cal.set(Calendar.MINUTE, 0); cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0); cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0); cal.set(Calendar.MILLISECOND, 0);

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java

@ -34,7 +34,7 @@ public class DependentDateUtils {
public static List<DateInterval> getLastHoursInterval(Date businessDate, int hourNumber){ public static List<DateInterval> getLastHoursInterval(Date businessDate, int hourNumber){
List<DateInterval> dateIntervals = new ArrayList<>(); List<DateInterval> dateIntervals = new ArrayList<>();
for(int index = hourNumber; index > 0; index--){ for(int index = hourNumber; index > 0; index--){
Date lastHour = DateUtils.getSomeHourOfDay(businessDate, index); Date lastHour = DateUtils.getSomeHourOfDay(businessDate, -index);
Date beginTime = DateUtils.getStartOfHour(lastHour); Date beginTime = DateUtils.getStartOfHour(lastHour);
Date endTime = DateUtils.getEndOfHour(lastHour); Date endTime = DateUtils.getEndOfHour(lastHour);
dateIntervals.add(new DateInterval(beginTime, endTime)); dateIntervals.add(new DateInterval(beginTime, endTime));

7
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java

@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.common.queue;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.StandaloneZKServerForTest; import org.apache.dolphinscheduler.common.zk.ZKServer;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
@ -35,7 +35,7 @@ import static org.junit.Assert.assertEquals;
/** /**
* task queue test * task queue test
*/ */
public class TaskQueueImplTest extends StandaloneZKServerForTest { public class TaskQueueImplTest {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class);
@ -43,7 +43,7 @@ public class TaskQueueImplTest extends StandaloneZKServerForTest {
@Before @Before
public void before(){ public void before(){
super.before(); ZKServer.start();
tasksQueue = TaskQueueFactory.getTaskQueueInstance(); tasksQueue = TaskQueueFactory.getTaskQueueInstance();
@ -57,6 +57,7 @@ public class TaskQueueImplTest extends StandaloneZKServerForTest {
public void after(){ public void after(){
//clear all data //clear all data
tasksQueue.delete(); tasksQueue.delete();
ZKServer.stop();
} }

66
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java

@ -16,6 +16,7 @@
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -26,19 +27,26 @@ public class CollectionUtilsTest {
@Test @Test
public void equalLists() { public void equalLists() {
Assert.assertTrue(CollectionUtils.equalLists(null,null));
Assert.assertTrue(CollectionUtils.equalLists(new ArrayList<Integer>(),new ArrayList<Integer>()));
List<Integer> a = new ArrayList<Integer>(); List<Integer> a = new ArrayList<Integer>();
a.add(1); a.add(1);
a.add(2); a.add(2);
a.add(3);
List<Integer> b = new ArrayList<Integer>(); List<Integer> b = new ArrayList<Integer>();
b.add(3);
b.add(2);
b.add(1); b.add(1);
b.add(2);
Assert.assertTrue(CollectionUtils.equalLists(a, b)); Assert.assertTrue(CollectionUtils.equalLists(a, b));
Assert.assertTrue(CollectionUtils.equalLists(null,null)); a.add(1);
List<Integer> c = new ArrayList<Integer>(); Assert.assertFalse(CollectionUtils.equalLists(a, b));
Assert.assertFalse(CollectionUtils.equalLists(c,null)); b.add(2);
Assert.assertFalse(CollectionUtils.equalLists(c,a)); Assert.assertFalse(CollectionUtils.equalLists(a, b));
a.add(2);
b.add(1);
a.add(4);
b.add(2);
Assert.assertFalse(CollectionUtils.equalLists(a, b));
Assert.assertFalse(CollectionUtils.equalLists(null, new ArrayList<Integer>()));
Assert.assertFalse(CollectionUtils.equalLists(new ArrayList<Integer>(), null));
} }
@Test @Test
@ -56,7 +64,49 @@ public class CollectionUtilsTest {
@Test @Test
public void stringToMap() { public void stringToMap() {
Map<String, String> a = CollectionUtils.stringToMap("a=b;c=d", ";", ""); Map<String, String> a = CollectionUtils.stringToMap("a=b;c=d;", ";");
Assert.assertNotNull(a); Assert.assertNotNull(a);
Assert.assertTrue(a.size() == 2);
a = CollectionUtils.stringToMap(null, ";");
Assert.assertTrue(a.isEmpty());
a = CollectionUtils.stringToMap("", ";");
Assert.assertTrue(a.isEmpty());
a = CollectionUtils.stringToMap("a=b;c=d", "");
Assert.assertTrue(a.isEmpty());
a = CollectionUtils.stringToMap("a=b;c=d", null);
Assert.assertTrue(a.isEmpty());
a = CollectionUtils.stringToMap("a=b;c=d;e=f", ";");
Assert.assertEquals(a.size(), 3);
a = CollectionUtils.stringToMap("a;b=f", ";");
Assert.assertTrue(a.isEmpty());
a = CollectionUtils.stringToMap("a=b;c=d;e=f;", ";", "test");
Assert.assertEquals(a.size(), 3);
Assert.assertNotNull(a.get("testa"));
}
@Test
public void getListByExclusion() {
Assert.assertNotNull(CollectionUtils.getListByExclusion(null, null));
List<Integer> originList = new ArrayList<>();
originList.add(1);
originList.add(2);
List<Map<String, Object>> ret = CollectionUtils.getListByExclusion(originList, null);
Assert.assertEquals(ret.size(), 2);
ret = CollectionUtils.getListByExclusion(originList, new HashSet<>());
Assert.assertEquals(ret.size(), 2);
Assert.assertFalse(ret.get(0).isEmpty());
Set<String> exclusion = new HashSet<>();
exclusion.add(Constants.CLASS);
ret = CollectionUtils.getListByExclusion(originList, exclusion);
Assert.assertEquals(ret.size(), 2);
Assert.assertTrue(ret.get(0).isEmpty());
} }
@Test
public void isNotEmpty() {
List<Integer> list = new ArrayList<>();
Assert.assertFalse(CollectionUtils.isNotEmpty(list));
Assert.assertFalse(CollectionUtils.isNotEmpty(null));
}
} }

100
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java

@ -18,13 +18,11 @@ package org.apache.dolphinscheduler.common.utils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
public class DateUtilsTest { public class DateUtilsTest {
@Test @Test
public void format2Readable() throws ParseException { public void format2Readable() throws ParseException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -54,4 +52,102 @@ public class DateUtilsTest {
Assert.assertEquals(sunday, sunday1); Assert.assertEquals(sunday, sunday1);
} }
@Test
public void diffHours(){
Date d1 = DateUtils.stringToDate("2019-01-28 00:00:00");
Date d2 = DateUtils.stringToDate("2019-01-28 20:00:00");
Assert.assertEquals(DateUtils.diffHours(d1, d2), 20);
Date d3 = DateUtils.stringToDate("2019-01-28 20:00:00");
Assert.assertEquals(DateUtils.diffHours(d3, d2), 0);
Assert.assertEquals(DateUtils.diffHours(d2, d1), 20);
Date d4 = null;
Assert.assertEquals(DateUtils.diffHours(d2, d4), 0);
}
@Test
public void dateToString() {
Date d1 = DateUtils.stringToDate("2019-01-28");
Assert.assertNull(d1);
d1 = DateUtils.stringToDate("2019-01-28 00:00:00");
Assert.assertEquals(DateUtils.dateToString(d1), "2019-01-28 00:00:00");
}
@Test
public void getSomeDay() {
Date d1 = DateUtils.stringToDate("2019-01-31 00:00:00");
Date curr = DateUtils.getSomeDay(d1, 1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-01 00:00:00");
Assert.assertEquals(DateUtils.dateToString(DateUtils.getSomeDay(d1, -31)), "2018-12-31 00:00:00");
}
@Test
public void getFirstDayOfMonth() {
Date d1 = DateUtils.stringToDate("2019-01-31 00:00:00");
Date curr = DateUtils.getFirstDayOfMonth(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-01 00:00:00");
d1 = DateUtils.stringToDate("2019-01-31 01:59:00");
curr = DateUtils.getFirstDayOfMonth(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-01 01:59:00");
}
@Test
public void getSomeHourOfDay() {
Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59");
Date curr = DateUtils.getSomeHourOfDay(d1, -1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 10:00:00");
curr = DateUtils.getSomeHourOfDay(d1, 0);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:00:00");
curr = DateUtils.getSomeHourOfDay(d1, 2);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 13:00:00");
curr = DateUtils.getSomeHourOfDay(d1, 24);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-01 11:00:00");
}
@Test
public void getLastDayOfMonth() {
Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59");
Date curr = DateUtils.getLastDayOfMonth(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59");
d1 = DateUtils.stringToDate("2019-01-02 11:59:59");
curr = DateUtils.getLastDayOfMonth(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59");
d1 = DateUtils.stringToDate("2019-02-02 11:59:59");
curr = DateUtils.getLastDayOfMonth(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-28 11:59:59");
d1 = DateUtils.stringToDate("2020-02-02 11:59:59");
curr = DateUtils.getLastDayOfMonth(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2020-02-29 11:59:59");
}
@Test
public void getStartOfDay() {
Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59");
Date curr = DateUtils.getStartOfDay(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 00:00:00");
}
@Test
public void getEndOfDay() {
Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59");
Date curr = DateUtils.getEndOfDay(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 23:59:59");
}
@Test
public void getStartOfHour() {
Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59");
Date curr = DateUtils.getStartOfHour(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:00:00");
}
@Test
public void getEndOfHour() {
Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59");
Date curr = DateUtils.getEndOfHour(d1);
Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59");
}
} }

98
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/StandaloneZKServerForTest.java

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import java.io.File;
import java.util.Properties;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* just for test
*/
public class StandaloneZKServerForTest {
private static final Logger logger = LoggerFactory.getLogger(StandaloneZKServerForTest.class);
private static volatile ZooKeeperServerMain zkServer = null;
@Before
public void before() {
logger.info("standalone zookeeper server for test service start ");
ThreadPoolExecutors.getInstance().execute(new Runnable() {
@Override
public void run() {
//delete zk data dir ?
File zkFile = new File(System.getProperty("java.io.tmpdir"), "zookeeper");
startStandaloneServer("2000", zkFile.getAbsolutePath(), "2181", "10", "5");
}
});
}
/**
* start zk server
* @param tickTime zookeeper ticktime
* @param dataDir zookeeper data dir
* @param clientPort zookeeper client port
* @param initLimit zookeeper init limit
* @param syncLimit zookeeper sync limit
*/
private void startStandaloneServer(String tickTime, String dataDir, String clientPort, String initLimit, String syncLimit) {
Properties props = new Properties();
props.setProperty("tickTime", tickTime);
props.setProperty("dataDir", dataDir);
props.setProperty("clientPort", clientPort);
props.setProperty("initLimit", initLimit);
props.setProperty("syncLimit", syncLimit);
QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
try {
quorumConfig.parseProperties(props);
if(zkServer == null ){
synchronized (StandaloneZKServerForTest.class){
if(zkServer == null ){
zkServer = new ZooKeeperServerMain();
final ServerConfig config = new ServerConfig();
config.readFrom(quorumConfig);
zkServer.runFromConfig(config);
}
}
}
} catch (Exception e) {
logger.error("start standalone server fail!", e);
}
}
}

43
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* demo for using zkServer
*/
public class TestZk {
@Before
public void before(){
ZKServer.start();
}
@Test
public void test(){
Assert.assertTrue(ZKServer.isStarted());
}
@After
public void after(){
ZKServer.stop();
}
}

165
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* just for test
*/
public class ZKServer {
private static final Logger logger = LoggerFactory.getLogger(ZKServer.class);
private static volatile PublicZooKeeperServerMain zkServer = null;
public static final int DEFAULT_ZK_TEST_PORT = 22181;
public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT;
private static String dataDir = null;
private static final AtomicBoolean isStarted = new AtomicBoolean(false);
public static void start() {
try {
startLocalZkServer(DEFAULT_ZK_TEST_PORT);
} catch (Exception e) {
logger.error("Failed to start ZK: " + e);
}
}
public static boolean isStarted(){
return isStarted.get();
}
static class PublicZooKeeperServerMain extends ZooKeeperServerMain {
@Override
public void initializeAndRun(String[] args)
throws QuorumPeerConfig.ConfigException, IOException {
super.initializeAndRun(args);
}
@Override
public void shutdown() {
super.shutdown();
}
}
/**
* Starts a local Zk instance with a generated empty data directory
*
* @param port The port to listen on
*/
public static void startLocalZkServer(final int port) {
startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis());
}
/**
* Starts a local Zk instance
*
* @param port The port to listen on
* @param dataDirPath The path for the Zk data directory
*/
private static synchronized void startLocalZkServer(final int port, final String dataDirPath) {
if (zkServer != null) {
throw new RuntimeException("Zookeeper server is already started!");
}
try {
zkServer = new PublicZooKeeperServerMain();
logger.info("Zookeeper data path : {} ", dataDirPath);
dataDir = dataDirPath;
final String[] args = new String[]{Integer.toString(port), dataDirPath};
Thread init = new Thread(new Runnable() {
@Override
public void run() {
try {
zkServer.initializeAndRun(args);
} catch (QuorumPeerConfig.ConfigException e) {
logger.warn("Caught exception while starting ZK", e);
} catch (IOException e) {
logger.warn("Caught exception while starting ZK", e);
}
}
}, "init-zk-thread");
init.start();
} catch (Exception e) {
logger.warn("Caught exception while starting ZK", e);
throw new RuntimeException(e);
}
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(DEFAULT_ZK_STR)
.retryPolicy(new ExponentialBackoffRetry(10,100))
.sessionTimeoutMs(1000 * 30)
.connectionTimeoutMs(1000 * 30)
.build();
try {
zkClient.blockUntilConnected(10, TimeUnit.SECONDS);
zkClient.close();
} catch (InterruptedException ignore) {
}
isStarted.compareAndSet(false, true);
logger.info("zk server started");
}
/**
* Stops a local Zk instance, deleting its data directory
*/
public static void stop() {
try {
stopLocalZkServer(true);
} catch (Exception e) {
logger.error("Failed to stop ZK ",e);
}
}
/**
* Stops a local Zk instance.
*
* @param deleteDataDir Whether or not to delete the data directory
*/
private static synchronized void stopLocalZkServer(final boolean deleteDataDir) {
if (zkServer != null) {
try {
zkServer.shutdown();
zkServer = null;
if (deleteDataDir) {
org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
}
isStarted.compareAndSet(true, false);
} catch (Exception e) {
logger.warn("Caught exception while stopping ZK server", e);
throw new RuntimeException(e);
}
}
}
}

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java

@ -154,7 +154,7 @@ public class UserMapperTest {
accessToken.setToken("secrettoken"); accessToken.setToken("secrettoken");
accessToken.setCreateTime(new Date()); accessToken.setCreateTime(new Date());
accessToken.setUpdateTime(new Date()); accessToken.setUpdateTime(new Date());
accessToken.setExpireTime(DateUtils.getSomeHourOfDay(new Date(),-1)); accessToken.setExpireTime(DateUtils.getSomeHourOfDay(new Date(),1));
accessTokenMapper.insert(accessToken); accessTokenMapper.insert(accessToken);
return accessToken; return accessToken;
} }

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java

@ -68,6 +68,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
} catch (Exception e) { } catch (Exception e) {
logger.error("yarn process failure", e); logger.error("yarn process failure", e);
exitStatusCode = -1; exitStatusCode = -1;
throw e;
} }
} }

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java

@ -99,7 +99,7 @@ public class DependentTask extends AbstractTask {
} }
@Override @Override
public void handle(){ public void handle() throws Exception {
// set the name of the current thread // set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
@ -135,6 +135,7 @@ public class DependentTask extends AbstractTask {
}catch (Exception e){ }catch (Exception e){
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
exitStatusCode = -1; exitStatusCode = -1;
throw e;
} }
} }

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java

@ -113,8 +113,9 @@ public class HttpTask extends AbstractTask {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
String statusCode = null; String statusCode = null;
String body = null; String body = null;
try(CloseableHttpClient client = createHttpClient()) {
try(CloseableHttpResponse response = sendRequest(client)) { try(CloseableHttpClient client = createHttpClient();
CloseableHttpResponse response = sendRequest(client)) {
statusCode = String.valueOf(getStatusCode(response)); statusCode = String.valueOf(getStatusCode(response));
body = getResponseBody(response); body = getResponseBody(response);
exitStatusCode = validResponse(body, statusCode); exitStatusCode = validResponse(body, statusCode);
@ -125,11 +126,7 @@ public class HttpTask extends AbstractTask {
appendMessage(e.toString()); appendMessage(e.toString());
exitStatusCode = -1; exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e);
} throw e;
} catch (Exception e) {
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e);
} }
} }

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java

@ -97,14 +97,13 @@ public class ProcedureTask extends AbstractTask {
procedureParameters.getMethod(), procedureParameters.getMethod(),
procedureParameters.getLocalParams()); procedureParameters.getLocalParams());
// determine whether there is a data source DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
if (procedureParameters.getDatasource() == 0){ if (dataSource == null){
logger.error("datasource id not exists"); logger.error("datasource not exists");
exitStatusCode = -1; exitStatusCode = -1;
return; throw new IllegalArgumentException("datasource not found");
} }
DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(), dataSource.getName(),
dataSource.getType(), dataSource.getType(),
@ -112,11 +111,6 @@ public class ProcedureTask extends AbstractTask {
dataSource.getUserId(), dataSource.getUserId(),
dataSource.getConnectionParams()); dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection connection = null; Connection connection = null;
CallableStatement stmt = null; CallableStatement stmt = null;
try { try {

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -98,6 +98,7 @@ public class PythonTask extends AbstractTask {
} catch (Exception e) { } catch (Exception e) {
logger.error("python task failure", e); logger.error("python task failure", e);
exitStatusCode = -1; exitStatusCode = -1;
throw e;
} }
} }

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -106,6 +106,7 @@ public class ShellTask extends AbstractTask {
} catch (Exception e) { } catch (Exception e) {
logger.error("shell task failure", e); logger.error("shell task failure", e);
exitStatusCode = -1; exitStatusCode = -1;
throw e;
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -261,9 +261,7 @@ public class SqlTask extends AbstractTask {
Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
SEMICOLON, SEMICOLON,
HIVE_CONF); HIVE_CONF);
if(connParamMap != null){
paramProp.putAll(connParamMap); paramProp.putAll(connParamMap);
}
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
paramProp); paramProp);

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java

@ -29,7 +29,7 @@ public class DependentTaskTest {
@Test @Test
public void testDependInit(){ public void testDependInit() throws Exception{
TaskProps taskProps = new TaskProps(); TaskProps taskProps = new TaskProps();

212
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js

@ -91,7 +91,216 @@ Dag.prototype.toolbarEvent = function ({ item, code, is }) {
/** /**
* Echo data display * Echo data display
*/ */
Dag.prototype.backfill = function () { Dag.prototype.backfill = function (arg) {
if(arg) {
let locationsValue = store.state.dag.locations
let locationsValue1 = store.state.dag.locations
let locationsValue2 = store.state.dag.locations
let arr = []
for (let i in locationsValue1) {
let objs = new Object();
objs.id = i
arr.push(Object.assign(objs,locationsValue1[i])); //Attributes
}
let tmp = []
for(let i in locationsValue2) {
if(locationsValue2[i].targetarr !='' && locationsValue2[i].targetarr.split(',').length>1) {
tmp.push(locationsValue2[i])
}
}
function copy (array) {
let newArray = []
for(let item of array) {
newArray.push(item);
}
return newArray;
}
let newArr = copy(arr)
function getNewArr() {
for(let i= 0; i<newArr.length; i++) {
if(newArr[i].targetarr !='' && newArr[i].targetarr.split(',').length>1) {
newArr[i].targetarr = newArr[i].targetarr.split(',').shift()
}
}
return newArr
}
getNewArr()
/**
* @description Transform flat data into a tree structure
* @param {Array} arr Flat data
* @param {String} pidStr targetarr key name
* @param {String} idStr id key name
* @param {String} childrenStr children key name
*/
function fommat({arrayList, pidStr = 'targetarr', idStr = 'id', childrenStr = 'children'}) {
let listOjb = {}; // Used to store objects of the form {key: obj}
let treeList = []; // An array to store the final tree structure data
// Transform the data into {key: obj} format, which is convenient for the following data processing
for (let i = 0; i < arrayList.length; i++) {
listOjb[arrayList[i][idStr]] = arrayList[i]
}
// Format data based on pid
for (let j = 0; j < arrayList.length; j++) {
// Determine if the parent exists
// let haveParent = arrayList[j].targetarr.split(',').length>1?listOjb[arrayList[j].targetarr.split(',')[0]]:listOjb[arrayList[j][pidStr]]
let haveParent = listOjb[arrayList[j][pidStr]]
if (haveParent) {
// If there is no parent children field, create a children field
!haveParent[childrenStr] && (haveParent[childrenStr] = [])
// Insert child in parent
haveParent[childrenStr].push(arrayList[j])
} else {
// If there is no parent, insert directly into the outermost layer
treeList.push(arrayList[j])
}
}
return treeList
}
let datas = fommat({arrayList: newArr,pidStr: 'targetarr'})
// Count the number of leaf nodes
function getLeafCountTree(json) {
if(!json.children) {
json.colspan = 1;
return 1;
} else {
let leafCount = 0;
for(let i = 0 ; i < json.children.length ; i++){
leafCount = leafCount + getLeafCountTree(json.children[i]);
}
json.colspan = leafCount;
return leafCount;
}
}
// Number of tree node levels
let countTree = getLeafCountTree(datas[0])
function getMaxFloor(treeData) {
let floor = 0
let v = this
let max = 0
function each (data, floor) {
data.forEach(e => {
e.floor = floor
e.x=floor*170
if (floor > max) {
max = floor
}
if (e.children) {
each(e.children, floor + 1)
}
})
}
each(treeData,1)
return max
}
getMaxFloor(datas)
// The last child of each node
let lastchildren = [];
forxh(datas);
function forxh(list) {
for (let i = 0; i < list.length; i++) {
let chlist = list[i];
if (chlist.children) {
forxh(chlist.children);
} else {
lastchildren.push(chlist);
}
}
}
// Get all parent nodes above the leaf node
function treeFindPath (tree, func, path,n) {
if (!tree) return []
for (const data of tree) {
path.push(data.name)
if (func(data)) return path
if (data.children) {
const findChildren = treeFindPath(data.children, func, path,n)
if (findChildren.length) return findChildren
}
path.pop()
}
return []
}
function toLine(data){
return data.reduce((arr, {id, name, targetarr, x, y, children = []}) =>
arr.concat([{id, name, targetarr, x, y}], toLine(children)), [])
return result;
}
let listarr = toLine(datas);
let listarrs = toLine(datas)
let dataObject = {}
for(let i = 0; i<listarrs.length; i++) {
delete(listarrs[i].id)
}
for(let a = 0; a<listarr.length; a++) {
dataObject[listarr[a].id] = listarrs[a]
}
// Comparison function
function createComparisonFunction(propertyName) {
return function (object1,object2) {
let value1 = object1[propertyName];
let value2 = object2[propertyName];
if (value1 < value2) {
return -1;
} else if (value1 > value2) {
return 1;
} else {
return 0;
}
};
}
lastchildren = lastchildren.sort(createComparisonFunction('x'))
// Coordinate value of each leaf node
for(let a = 0; a<lastchildren.length; a++) {
dataObject[lastchildren[a].id].y = (a+1)*120
}
for(let i =0 ; i<lastchildren.length; i++) {
let node = treeFindPath(datas, data=> data.targetarr===lastchildren[i].targetarr,[],i+1)
for(let j = 0; j<node.length; j++) {
for(let k= 0; k<listarrs.length; k++) {
if(node[j] == listarrs[k].name) {
listarrs[k].y = (i+1)*120
}
}
}
}
for(let i = 0; i<tmp.length; i++) {
for(let objs in dataObject) {
if(tmp[i].name == dataObject[objs].name) {
dataObject[objs].targetarr = tmp[i].targetarr
}
}
}
for(let a = 0; a<lastchildren.length; a++) {
dataObject[lastchildren[a].id].y = (a+1)*120
}
if(countTree>1) {
dataObject[Object.keys(locationsValue1)[0]].y = (countTree/2)*120+50
}
locationsValue = dataObject
jsPlumb.ready(() => {
JSP.init({
dag: this.dag,
instance: this.instance
})
// Backfill
JSP.jspBackfill({
// connects
connects: _.cloneDeep(store.state.dag.connects),
// Node location information
locations: _.cloneDeep(locationsValue),
// Node data
largeJson: _.cloneDeep(store.state.dag.tasks)
})
})
} else {
jsPlumb.ready(() => { jsPlumb.ready(() => {
JSP.init({ JSP.init({
dag: this.dag, dag: this.dag,
@ -108,6 +317,7 @@ Dag.prototype.backfill = function () {
}) })
}) })
} }
}
/** /**
* Get dag storage format data * Get dag storage format data

47
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue

@ -22,6 +22,7 @@
<div class="bar-box roundedRect jtk-draggable jtk-droppable jtk-endpoint-anchor jtk-connected" <div class="bar-box roundedRect jtk-draggable jtk-droppable jtk-endpoint-anchor jtk-connected"
:class="v === dagBarId ? 'active' : ''" :class="v === dagBarId ? 'active' : ''"
:id="v" :id="v"
:key="v"
v-for="(item,v) in tasksTypeList" v-for="(item,v) in tasksTypeList"
@mousedown="_getDagId(v)"> @mousedown="_getDagId(v)">
<div data-toggle="tooltip" :title="item.description"> <div data-toggle="tooltip" :title="item.description">
@ -65,10 +66,12 @@
v-for="(item,$index) in toolOperList" v-for="(item,$index) in toolOperList"
:class="_operationClass(item)" :class="_operationClass(item)"
:id="item.code" :id="item.code"
:key="$index"
@click="_ckOperation(item,$event)"> @click="_ckOperation(item,$event)">
<i class="iconfont" v-html="item.icon" data-toggle="tooltip" :title="item.description" ></i> <i class="iconfont" v-html="item.icon" data-toggle="tooltip" :title="item.description" ></i>
</a> </a>
</div> </div>
<x-button type="text" icon="fa fa-play" @click="dagAutomaticLayout"></x-button>
<x-button <x-button
data-toggle="tooltip" data-toggle="tooltip"
:title="$t('Refresh DAG status')" :title="$t('Refresh DAG status')"
@ -142,7 +145,8 @@
isRtTasks: false, isRtTasks: false,
isRefresh: false, isRefresh: false,
isLoading: false, isLoading: false,
taskId: null taskId: null,
arg: false,
} }
}, },
mixins: [disabledState], mixins: [disabledState],
@ -153,9 +157,44 @@
methods: { methods: {
...mapActions('dag', ['saveDAGchart', 'updateInstance', 'updateDefinition', 'getTaskState']), ...mapActions('dag', ['saveDAGchart', 'updateInstance', 'updateDefinition', 'getTaskState']),
...mapMutations('dag', ['addTasks', 'resetParams', 'setIsEditDag', 'setName']), ...mapMutations('dag', ['addTasks', 'resetParams', 'setIsEditDag', 'setName']),
init () {
// DAG automatic layout
dagAutomaticLayout() {
$('#canvas').html('')
// Destroy round robin
Dag.init({
dag: this,
instance: jsPlumb.getInstance({
Endpoint: [
'Dot', { radius: 1, cssClass: 'dot-style' }
],
Connector: 'Straight',
PaintStyle: { lineWidth: 2, stroke: '#456' }, // Connection style
ConnectionOverlays: [
[
'Arrow',
{
location: 1,
id: 'arrow',
length: 12,
foldback: 0.8
}
]
],
Container: 'canvas'
})
})
if (this.tasks.length) {
Dag.backfill(true)
} else {
Dag.create()
}
},
init (args) {
if (this.tasks.length) { if (this.tasks.length) {
Dag.backfill() Dag.backfill(args)
// Process instances can view status // Process instances can view status
if (this.type === 'instance') { if (this.type === 'instance') {
this._getTaskState(false).then(res => {}) this._getTaskState(false).then(res => {})
@ -513,7 +552,7 @@
}) })
}, },
mounted () { mounted () {
this.init() this.init(this.arg)
}, },
beforeDestroy () { beforeDestroy () {
this.resetParams() this.resetParams()

4
pom.xml

@ -612,10 +612,10 @@
<version>${maven-surefire-plugin.version}</version> <version>${maven-surefire-plugin.version}</version>
<configuration> <configuration>
<includes> <includes>
<include>**/common/utils/*.java</include>
<include>**/common/graph/*.java</include>
<include>**/api/utils/CheckUtilsTest.java</include> <include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include> <include>**/api/utils/FileUtilsTest.java</include>
<include>**/common/graph/*.java</include>
<include>**/*CollectionUtilsTest.java</include><!--run test classes-->
</includes> </includes>
<!-- <skip>true</skip> --> <!-- <skip>true</skip> -->
</configuration> </configuration>

Loading…
Cancel
Save