From 095408434450bf17b331b942d136c423c2d69565 Mon Sep 17 00:00:00 2001 From: Tboy Date: Wed, 18 Dec 2019 10:52:59 +0800 Subject: [PATCH] merge db (#2) * Add FileUtilsTest.java , the unit test for FileUtils (#1493) * #839 enhancement : add Spark Task Component can switch Spark Version (#1494) * add Spark Version in Spark Component add Spark Version in Spark Component * add license for SparkVersion.class add license * 1 add spark task UT 2 add spark version param check * add assert check for sparkTaskTest * fix AbstractTask's handle method exception (#1490) * fix AbstractTask's handle method exception * update ut * add ZkServer for UT (#1499) * add ZkServer for UT * Add FileUtilsTest.java , the unit test for FileUtils (#1493) (#1) * updates for reference ZkServer * DAG automatic layout (#1497) * Password verification and v-for add key * DAG automatic layout * Add common utils CollectionUtils.java DateUtils.java unit test (#1496) * dateutil test * pom.xml * Update README_zh_CN.md * Update README.md * Add ExcelUtilsTest.java , the unit test for ExcelUtils (#1500) * fix issue:1477 some tasks would be running all the time when db delayed(#1477) (#1501) * fix issue:1477 some tasks would be running all the time when db delayed * fix issue:1477 some tasks would be running all the time when db delayed * fix issue:1477 some tasks would be running all the time when db delayed * change npm run build:combined to npm burn build:release (#1504) --- README.md | 3 +- README_zh_CN.md | 5 +- .../alert/utils/ExcelUtils.java | 9 +- .../alert/utils/ExcelUtilsTest.java | 92 +++++++ .../dolphinscheduler/common/Constants.java | 2 +- .../common/enums/SparkVersion.java | 40 +++ .../common/queue/ITaskQueue.java | 2 +- .../common/queue/TaskQueueZkImpl.java | 4 +- .../common/task/spark/SparkParameters.java | 17 +- .../common/utils/CollectionUtils.java | 42 ++-- .../common/utils/DateUtils.java | 6 +- .../utils/dependent/DependentDateUtils.java | 2 +- .../common/utils/CollectionUtilsTest.java | 66 ++++- .../common/utils/DateUtilsTest.java | 102 +++++++- .../dolphinscheduler/common/zk/TestZk.java | 43 ++++ .../dolphinscheduler/common/zk/ZKServer.java | 2 + .../dolphinscheduler/dao/ProcessDao.java | 58 +++-- .../dao/mapper/UserMapperTest.java | 4 +- .../runner/MasterBaseTaskExecThread.java | 28 ++- .../master/runner/MasterTaskExecThread.java | 2 + .../server/worker/task/AbstractYarnTask.java | 1 + .../worker/task/dependent/DependentTask.java | 3 +- .../server/worker/task/http/HttpTask.java | 25 +- .../task/processdure/ProcedureTask.java | 14 +- .../server/worker/task/python/PythonTask.java | 1 + .../server/worker/task/shell/ShellTask.java | 1 + .../server/worker/task/spark/SparkTask.java | 21 +- .../server/worker/task/sql/SqlTask.java | 4 +- .../task/dependent/DependentTaskTest.java | 2 +- .../worker/task/spark/SparkTaskTest.java | 141 +++++++++++ ....combined.js => webpack.config.release.js} | 0 dolphinscheduler-ui/package.json | 2 +- .../src/js/conf/home/pages/dag/_source/dag.js | 238 ++++++++++++++++-- .../js/conf/home/pages/dag/_source/dag.vue | 47 +++- .../dag/_source/formModel/tasks/spark.vue | 26 +- .../src/js/module/i18n/locale/en_US.js | 4 +- .../src/js/module/i18n/locale/zh_CN.js | 3 +- pom.xml | 6 +- 38 files changed, 927 insertions(+), 141 deletions(-) create mode 100644 dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java rename dolphinscheduler-ui/build/{webpack.config.combined.js => webpack.config.release.js} (100%) diff --git a/README.md b/README.md index c72fc52a26..aee0e3b2e3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -Dolphin Scheduler +Dolphin Scheduler Official Website +[dolphinscheduler.apache.org](https://dolphinscheduler.apache.org) ============ [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Total Lines](https://tokei.rs/b1/github/apache/Incubator-DolphinScheduler?category=lines)](https://github.com/apache/Incubator-DolphinScheduler) diff --git a/README_zh_CN.md b/README_zh_CN.md index ef7e2956a4..f8855aca6c 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -1,4 +1,5 @@ -Dolphin Scheduler +Dolphin Scheduler Official Website +[dolphinscheduler.apache.org](https://dolphinscheduler.apache.org) ============ [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Total Lines](https://tokei.rs/b1/github/apache/Incubator-DolphinScheduler?category=lines)](https://github.com/apache/Incubator-DolphinScheduler) @@ -88,7 +89,7 @@ Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、g ### 获得帮助 1. Submit an issue -1. Mail list: dev@dolphinscheduler.apache.org. Mail to dev-subscribe@dolphinscheduler.apache.org, follow the reply to subscribe the mail list. +1. Mail to dev-subscribe@dolphinscheduler.apache.org, follow the reply to subscribe the mail list. then you can send mail to dev@dolphinscheduler.apache.org. 1. Contact WeChat group manager, ID 510570367. This is for Mandarin(CN) discussion. ### 版权 diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java index 522a1b9513..2aee3d4ef6 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java @@ -41,12 +41,9 @@ public class ExcelUtils { */ public static void genExcelFile(String content,String title,String xlsFilePath){ List itemsList; - try { - itemsList = JSONUtils.toList(content, LinkedHashMap.class); - }catch (Exception e){ - logger.error(String.format("json format incorrect : %s",content),e); - throw new RuntimeException("json format incorrect",e); - } + + //The JSONUtils.toList has been try catch ex + itemsList = JSONUtils.toList(content, LinkedHashMap.class); if (itemsList == null || itemsList.size() == 0){ logger.error("itemsList is null"); diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java new file mode 100644 index 0000000000..3ef43aeef4 --- /dev/null +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.utils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import static org.junit.Assert.assertTrue; + +public class ExcelUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(ExcelUtilsTest.class); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private String rootPath = null; + + @Before + public void setUp() throws Exception { + + folder.create(); + rootPath = folder.getRoot().getAbsolutePath(); + } + + @After + public void tearDown() throws Exception { + + folder.delete(); + } + + /** + * Test GenExcelFile + */ + @Test + public void testGenExcelFile() { + + //Define dest file path + String xlsFilePath = rootPath + System.getProperty("file.separator"); + logger.info("xlsFilePath: "+xlsFilePath); + + //Define correctContent + String correctContent = "[{\"name\":\"ds name\",\"value\":\"ds value\"}]"; + + //Define incorrectContent + String incorrectContent1 = "{\"name\":\"ds name\",\"value\":\"ds value\"}"; + + //Define title + String title = "test report"; + + //Invoke genExcelFile with correctContent + ExcelUtils.genExcelFile(correctContent, title, xlsFilePath); + + //Test file exists + File xlsFile = new File(xlsFilePath + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS); + assertTrue(xlsFile.exists()); + + //Expected RuntimeException + expectedException.expect(RuntimeException.class); + + //Expected error message + expectedException.expectMessage("itemsList is null"); + + //Invoke genExcelFile with incorrectContent, will cause RuntimeException + ExcelUtils.genExcelFile(incorrectContent1, title, xlsFilePath); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 791c0bb558..c675ad55bd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -439,7 +439,7 @@ public final class Constants { /** * default master commit retry interval */ - public static final int defaultMasterCommitRetryInterval = 100; + public static final int defaultMasterCommitRetryInterval = 3000; /** * time unit secong to minutes diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java new file mode 100644 index 0000000000..e3f7c73797 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; +import lombok.Getter; + +@Getter +public enum SparkVersion { + + /** + * 0 SPARK1 + * 1 SPARK2 + */ + SPARK1(0, "SPARK1"), + SPARK2(1, "SPARK2"); + + SparkVersion(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java index 054e25dd3a..6e937f0c3e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java @@ -45,7 +45,7 @@ public interface ITaskQueue { * @param key queue name * @param value */ - void add(String key, String value); + boolean add(String key, String value); /** * an element pops out of the queue diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index 76d88868f2..3fd012dd30 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -118,14 +118,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override - public void add(String key, String value) { + public boolean add(String key, String value){ try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); + return true; } catch (Exception e) { logger.error("add task to tasks queue exception",e); + return false; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java index 41263f0a74..dbafddfddd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/spark/SparkParameters.java @@ -95,6 +95,11 @@ public class SparkParameters extends AbstractParameters { */ private ProgramType programType; + /** + * spark version + */ + private String sparkVersion; + public ResourceInfo getMainJar() { return mainJar; } @@ -200,9 +205,17 @@ public class SparkParameters extends AbstractParameters { this.programType = programType; } + public String getSparkVersion() { + return sparkVersion; + } + + public void setSparkVersion(String sparkVersion) { + this.sparkVersion = sparkVersion; + } + @Override public boolean checkParameters() { - return mainJar != null && programType != null; + return mainJar != null && programType != null && sparkVersion != null; } @@ -211,7 +224,7 @@ public class SparkParameters extends AbstractParameters { if(resourceList !=null ) { this.resourceList.add(mainJar); return resourceList.stream() - .map(p -> p.getRes()).collect(Collectors.toList()); + .map(ResourceInfo::getRes).collect(Collectors.toList()); } return null; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java index e69f9a9f76..9c02111c36 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java @@ -86,21 +86,20 @@ public class CollectionUtils { * @return string to map */ public static Map stringToMap(String str, String separator, String keyPrefix) { - if (null == str || "".equals(str)) { - return null; + Map emptyMap = new HashMap<>(0); + if (StringUtils.isEmpty(str)) { + return emptyMap; } - if (null == separator || "".equals(separator)) { - return null; + if (StringUtils.isEmpty(separator)) { + return emptyMap; } String[] strings = str.split(separator); - int mapLength = strings.length; - if ((strings.length % 2) != 0) { - mapLength = mapLength + 1; - } - - Map map = new HashMap<>(mapLength); + Map map = new HashMap<>(strings.length); for (int i = 0; i < strings.length; i++) { String[] strArray = strings[i].split("="); + if (strArray.length != 2) { + return emptyMap; + } //strArray[0] KEY strArray[1] VALUE if (StringUtils.isEmpty(keyPrefix)) { map.put(strArray[0], strArray[1]); @@ -146,7 +145,7 @@ public class CollectionUtils { * @param obj the object * @return the maximum frequency of the object */ - public final int max(final Object obj) { + private int max(final Object obj) { return Math.max(freqA(obj), freqB(obj)); } @@ -156,7 +155,7 @@ public class CollectionUtils { * @param obj the object * @return the minimum frequency of the object */ - public final int min(final Object obj) { + private int min(final Object obj) { return Math.min(freqA(obj), freqB(obj)); } @@ -180,10 +179,10 @@ public class CollectionUtils { return getFreq(obj, cardinalityB); } - private final int getFreq(final Object obj, final Map freqMap) { + private int getFreq(final Object obj, final Map freqMap) { final Integer count = freqMap.get(obj); if (count != null) { - return count.intValue(); + return count; } return 0; } @@ -203,7 +202,7 @@ public class CollectionUtils { return true; } - if ((a == null && b != null) || a != null && b == null) { + if (a == null || b == null) { return false; } @@ -253,12 +252,7 @@ public class CollectionUtils { public static Map getCardinalityMap(final Iterable coll) { final Map count = new HashMap(); for (final O obj : coll) { - final Integer c = count.get(obj); - if (c == null) { - count.put(obj, Integer.valueOf(1)); - } else { - count.put(obj, Integer.valueOf(c.intValue() + 1)); - } + count.put(obj, count.getOrDefault(obj, 0) + 1); } return count; } @@ -273,6 +267,12 @@ public class CollectionUtils { */ public static List> getListByExclusion(List originList, Set exclusionSet) { List> instanceList = new ArrayList<>(); + if (exclusionSet == null) { + exclusionSet = new HashSet<>(); + } + if (originList == null) { + return instanceList; + } Map instanceMap; for (T instance : originList) { Map dataMap = new BeanMap(instance); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java index 0f30e7fbe0..3455d5344c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java @@ -291,14 +291,14 @@ public class DateUtils { * get some hour of day * * @param date date - * @param hours hours + * @param offsetHour hours * @return some hour of day * */ - public static Date getSomeHourOfDay(Date date, int hours) { + public static Date getSomeHourOfDay(Date date, int offsetHour) { Calendar cal = Calendar.getInstance(); cal.setTime(date); - cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) - hours); + cal.set(Calendar.HOUR_OF_DAY, cal.get(Calendar.HOUR_OF_DAY) + offsetHour); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java index 574343d0cb..103e75fb61 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/dependent/DependentDateUtils.java @@ -34,7 +34,7 @@ public class DependentDateUtils { public static List getLastHoursInterval(Date businessDate, int hourNumber){ List dateIntervals = new ArrayList<>(); for(int index = hourNumber; index > 0; index--){ - Date lastHour = DateUtils.getSomeHourOfDay(businessDate, index); + Date lastHour = DateUtils.getSomeHourOfDay(businessDate, -index); Date beginTime = DateUtils.getStartOfHour(lastHour); Date endTime = DateUtils.getEndOfHour(lastHour); dateIntervals.add(new DateInterval(beginTime, endTime)); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java index 30c11522b6..7321879ab8 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.common.utils; +import org.apache.dolphinscheduler.common.Constants; import org.junit.Assert; import org.junit.Test; @@ -26,19 +27,26 @@ public class CollectionUtilsTest { @Test public void equalLists() { + Assert.assertTrue(CollectionUtils.equalLists(null,null)); + Assert.assertTrue(CollectionUtils.equalLists(new ArrayList(),new ArrayList())); List a = new ArrayList(); a.add(1); a.add(2); - a.add(3); List b = new ArrayList(); - b.add(3); + b.add(1); + b.add(2); + Assert.assertTrue(CollectionUtils.equalLists(a, b)); + a.add(1); + Assert.assertFalse(CollectionUtils.equalLists(a, b)); b.add(2); + Assert.assertFalse(CollectionUtils.equalLists(a, b)); + a.add(2); b.add(1); - Assert.assertTrue(CollectionUtils.equalLists(a,b)); - Assert.assertTrue(CollectionUtils.equalLists(null,null)); - List c = new ArrayList(); - Assert.assertFalse(CollectionUtils.equalLists(c,null)); - Assert.assertFalse(CollectionUtils.equalLists(c,a)); + a.add(4); + b.add(2); + Assert.assertFalse(CollectionUtils.equalLists(a, b)); + Assert.assertFalse(CollectionUtils.equalLists(null, new ArrayList())); + Assert.assertFalse(CollectionUtils.equalLists(new ArrayList(), null)); } @Test @@ -56,7 +64,49 @@ public class CollectionUtilsTest { @Test public void stringToMap() { - Map a = CollectionUtils.stringToMap("a=b;c=d", ";", ""); + Map a = CollectionUtils.stringToMap("a=b;c=d;", ";"); Assert.assertNotNull(a); + Assert.assertTrue(a.size() == 2); + a = CollectionUtils.stringToMap(null, ";"); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("", ";"); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d", ""); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d", null); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d;e=f", ";"); + Assert.assertEquals(a.size(), 3); + a = CollectionUtils.stringToMap("a;b=f", ";"); + Assert.assertTrue(a.isEmpty()); + a = CollectionUtils.stringToMap("a=b;c=d;e=f;", ";", "test"); + Assert.assertEquals(a.size(), 3); + Assert.assertNotNull(a.get("testa")); } + + @Test + public void getListByExclusion() { + Assert.assertNotNull(CollectionUtils.getListByExclusion(null, null)); + List originList = new ArrayList<>(); + originList.add(1); + originList.add(2); + List> ret = CollectionUtils.getListByExclusion(originList, null); + Assert.assertEquals(ret.size(), 2); + ret = CollectionUtils.getListByExclusion(originList, new HashSet<>()); + Assert.assertEquals(ret.size(), 2); + Assert.assertFalse(ret.get(0).isEmpty()); + Set exclusion = new HashSet<>(); + exclusion.add(Constants.CLASS); + ret = CollectionUtils.getListByExclusion(originList, exclusion); + Assert.assertEquals(ret.size(), 2); + Assert.assertTrue(ret.get(0).isEmpty()); + } + + @Test + public void isNotEmpty() { + List list = new ArrayList<>(); + Assert.assertFalse(CollectionUtils.isNotEmpty(list)); + Assert.assertFalse(CollectionUtils.isNotEmpty(null)); + } + } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java index bcaa391042..6800f6b542 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java @@ -18,13 +18,11 @@ package org.apache.dolphinscheduler.common.utils; import org.junit.Assert; import org.junit.Test; - import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtilsTest { - @Test public void format2Readable() throws ParseException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -54,4 +52,102 @@ public class DateUtilsTest { Assert.assertEquals(sunday, sunday1); } -} \ No newline at end of file + + @Test + public void diffHours(){ + Date d1 = DateUtils.stringToDate("2019-01-28 00:00:00"); + Date d2 = DateUtils.stringToDate("2019-01-28 20:00:00"); + Assert.assertEquals(DateUtils.diffHours(d1, d2), 20); + Date d3 = DateUtils.stringToDate("2019-01-28 20:00:00"); + Assert.assertEquals(DateUtils.diffHours(d3, d2), 0); + Assert.assertEquals(DateUtils.diffHours(d2, d1), 20); + Date d4 = null; + Assert.assertEquals(DateUtils.diffHours(d2, d4), 0); + } + + @Test + public void dateToString() { + Date d1 = DateUtils.stringToDate("2019-01-28"); + Assert.assertNull(d1); + d1 = DateUtils.stringToDate("2019-01-28 00:00:00"); + Assert.assertEquals(DateUtils.dateToString(d1), "2019-01-28 00:00:00"); + } + + @Test + public void getSomeDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 00:00:00"); + Date curr = DateUtils.getSomeDay(d1, 1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-01 00:00:00"); + Assert.assertEquals(DateUtils.dateToString(DateUtils.getSomeDay(d1, -31)), "2018-12-31 00:00:00"); + } + + @Test + public void getFirstDayOfMonth() { + Date d1 = DateUtils.stringToDate("2019-01-31 00:00:00"); + Date curr = DateUtils.getFirstDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-01 00:00:00"); + + d1 = DateUtils.stringToDate("2019-01-31 01:59:00"); + curr = DateUtils.getFirstDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-01 01:59:00"); + } + + @Test + public void getSomeHourOfDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59"); + Date curr = DateUtils.getSomeHourOfDay(d1, -1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 10:00:00"); + curr = DateUtils.getSomeHourOfDay(d1, 0); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:00:00"); + curr = DateUtils.getSomeHourOfDay(d1, 2); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 13:00:00"); + curr = DateUtils.getSomeHourOfDay(d1, 24); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-01 11:00:00"); + } + + @Test + public void getLastDayOfMonth() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59"); + Date curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); + d1 = DateUtils.stringToDate("2019-01-02 11:59:59"); + curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); + + d1 = DateUtils.stringToDate("2019-02-02 11:59:59"); + curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-02-28 11:59:59"); + + d1 = DateUtils.stringToDate("2020-02-02 11:59:59"); + curr = DateUtils.getLastDayOfMonth(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2020-02-29 11:59:59"); + } + + @Test + public void getStartOfDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:59:59"); + Date curr = DateUtils.getStartOfDay(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 00:00:00"); + } + + @Test + public void getEndOfDay() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59"); + Date curr = DateUtils.getEndOfDay(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 23:59:59"); + } + + @Test + public void getStartOfHour() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59"); + Date curr = DateUtils.getStartOfHour(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:00:00"); + } + + @Test + public void getEndOfHour() { + Date d1 = DateUtils.stringToDate("2019-01-31 11:00:59"); + Date curr = DateUtils.getEndOfHour(d1); + Assert.assertEquals(DateUtils.dateToString(curr), "2019-01-31 11:59:59"); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java new file mode 100644 index 0000000000..5c3db2d5d1 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZk.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * demo for using zkServer + */ +public class TestZk { + + @Before + public void before(){ + ZKServer.start(); + } + + @Test + public void test(){ + Assert.assertTrue(ZKServer.isStarted()); + } + + @After + public void after(){ + ZKServer.stop(); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java index 34c1807cf5..9d33fe1a1d 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java @@ -79,6 +79,7 @@ public class ZKServer { * @param port The port to listen on */ public static void startLocalZkServer(final int port) { + startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis()); startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + "test-" + System.currentTimeMillis()); } @@ -138,6 +139,7 @@ public class ZKServer { try { stopLocalZkServer(true); logger.info("zk server stopped"); + } catch (Exception e) { logger.error("Failed to stop ZK ",e); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index 3080efa234..eb97ad75b2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -758,7 +758,7 @@ public class ProcessDao { } /** - * submit task to mysql and task queue + * submit task to db * submit sub process to command * @param taskInstance taskInstance * @param processInstance processInstance @@ -769,21 +769,18 @@ public class ProcessDao { logger.info("start submit task : {}, instance id:{}, state: {}, ", taskInstance.getName(), processInstance.getId(), processInstance.getState() ); processInstance = this.findProcessInstanceDetailById(processInstance.getId()); - //submit to mysql - TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance); - if(task.isSubProcess() && !task.getState().typeIsFinished()){ - ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task); - - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); - createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task); - }else if(!task.getState().typeIsFinished()){ - //submit to task queue - task.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - submitTaskToQueue(task); - } - logger.info("submit task :{} state:{} complete, instance id:{} state: {} ", + //submit to db + TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); + if(task == null){ + logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + return task; + } + if(!task.getState().typeIsFinished()){ + createSubWorkProcessCommand(processInstance, task); + } + + logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -845,13 +842,18 @@ public class ProcessDao { /** * create sub work process command * @param parentProcessInstance parentProcessInstance - * @param instanceMap instanceMap - * @param childDefineId instanceMap * @param task task */ private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - ProcessInstanceMap instanceMap, - Integer childDefineId, TaskInstance task){ + TaskInstance task){ + if(!task.isSubProcess()){ + return; + } + ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); CommandType fatherType = parentProcessInstance.getCommandType(); @@ -921,7 +923,7 @@ public class ProcessDao { * @param processInstance processInstance * @return task instance */ - public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){ + public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance){ ExecutionStatus processInstanceState = processInstance.getState(); if(taskInstance.getState().typeIsFailure()){ @@ -949,7 +951,10 @@ public class ProcessDao { taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState)); taskInstance.setSubmitTime(new Date()); - saveTaskInstance(taskInstance); + boolean saveResult = saveTaskInstance(taskInstance); + if(!saveResult){ + return null; + } return taskInstance; } @@ -961,6 +966,10 @@ public class ProcessDao { public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ + if(taskInstance.getState().typeIsFinished()){ + logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); + return true; + } // task cannot submit when running if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){ logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName())); @@ -971,14 +980,13 @@ public class ProcessDao { return true; } logger.info("task ready to queue: {}" , taskInstance); - taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); + boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) ); - return true; + return insertQueueResult; }catch (Exception e){ logger.error("submit task to queue Exception: ", e); logger.error("task queue error : %s", JSONUtils.toJson(taskInstance)); return false; - } } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java index a4b8618bfd..da17e14044 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java @@ -154,7 +154,7 @@ public class UserMapperTest { accessToken.setToken("secrettoken"); accessToken.setCreateTime(new Date()); accessToken.setUpdateTime(new Date()); - accessToken.setExpireTime(DateUtils.getSomeHourOfDay(new Date(),-1)); + accessToken.setExpireTime(DateUtils.getSomeHourOfDay(new Date(),1)); accessTokenMapper.insert(accessToken); return accessToken; } @@ -356,4 +356,4 @@ public class UserMapperTest { accessTokenMapper.deleteById(accessToken.getId()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 4934df1978..9bb5c555fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -114,21 +114,37 @@ public class MasterBaseTaskExecThread implements Callable { Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); int retryTimes = 1; - - while (retryTimes <= commitRetryTimes){ + boolean taskDBFlag = false; + boolean taskQueueFlag = false; + TaskInstance task = null; + while (true){ try { - TaskInstance task = processDao.submitTask(taskInstance, processInstance); - if(task != null){ + if(!taskDBFlag){ + // submit task to db + task = processDao.submitTask(taskInstance, processInstance); + if(task != null && task.getId() != 0){ + taskDBFlag = true; + } + } + if(taskDBFlag && !taskQueueFlag){ + // submit task to queue + taskQueueFlag = processDao.submitTaskToQueue(task); + } + if(taskDBFlag && taskQueueFlag){ return task; } - logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes); + if(!taskDBFlag){ + logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes); + }else if(!taskQueueFlag){ + logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes); + + } Thread.sleep(commitRetryInterval); } catch (Exception e) { logger.error("task commit to mysql and queue failed : " + e.getMessage(),e); } retryTimes += 1; } - return null; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index f617d5f74d..e91deca511 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -91,6 +91,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public Boolean waitTaskQuit(){ // query new state taskInstance = processDao.findTaskInstanceById(taskInstance.getId()); + logger.info("wait task: process id: {}, task id:{}, task name:{} complete", + this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); // task time out Boolean checkTimeout = false; TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java index 776290aafa..b9b3ad6824 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java @@ -68,6 +68,7 @@ public abstract class AbstractYarnTask extends AbstractTask { } catch (Exception e) { logger.error("yarn process failure", e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index 30c5a1a5d2..b0bb4c6f4c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -99,7 +99,7 @@ public class DependentTask extends AbstractTask { } @Override - public void handle(){ + public void handle() throws Exception { // set the name of the current thread String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); @@ -135,6 +135,7 @@ public class DependentTask extends AbstractTask { }catch (Exception e){ logger.error(e.getMessage(),e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index a5c7390499..44eef65aba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -113,23 +113,20 @@ public class HttpTask extends AbstractTask { long startTime = System.currentTimeMillis(); String statusCode = null; String body = null; - try(CloseableHttpClient client = createHttpClient()) { - try(CloseableHttpResponse response = sendRequest(client)) { - statusCode = String.valueOf(getStatusCode(response)); - body = getResponseBody(response); - exitStatusCode = validResponse(body, statusCode); - long costTime = System.currentTimeMillis() - startTime; - logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}", - DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output); - }catch (Exception e) { - appendMessage(e.toString()); - exitStatusCode = -1; - logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); - } - } catch (Exception e) { + + try(CloseableHttpClient client = createHttpClient(); + CloseableHttpResponse response = sendRequest(client)) { + statusCode = String.valueOf(getStatusCode(response)); + body = getResponseBody(response); + exitStatusCode = validResponse(body, statusCode); + long costTime = System.currentTimeMillis() - startTime; + logger.info("startTime: {}, httpUrl: {}, httpMethod: {}, costTime : {}Millisecond, statusCode : {}, body : {}, log : {}", + DateUtils.format2Readable(startTime), httpParameters.getUrl(),httpParameters.getHttpMethod(), costTime, statusCode, body, output); + }catch (Exception e){ appendMessage(e.toString()); exitStatusCode = -1; logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:"+output, e); + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java index 3418c741f8..59cf8a6e24 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java @@ -97,14 +97,13 @@ public class ProcedureTask extends AbstractTask { procedureParameters.getMethod(), procedureParameters.getLocalParams()); - // determine whether there is a data source - if (procedureParameters.getDatasource() == 0){ - logger.error("datasource id not exists"); + DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource()); + if (dataSource == null){ + logger.error("datasource not exists"); exitStatusCode = -1; - return; + throw new IllegalArgumentException("datasource not found"); } - DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource()); logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", dataSource.getName(), dataSource.getType(), @@ -112,11 +111,6 @@ public class ProcedureTask extends AbstractTask { dataSource.getUserId(), dataSource.getConnectionParams()); - if (dataSource == null){ - logger.error("datasource not exists"); - exitStatusCode = -1; - return; - } Connection connection = null; CallableStatement stmt = null; try { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index fffd5f080d..f6227b15a4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -98,6 +98,7 @@ public class PythonTask extends AbstractTask { } catch (Exception e) { logger.error("python task failure", e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index fb7d2268e5..438d373775 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -106,6 +106,7 @@ public class ShellTask extends AbstractTask { } catch (Exception e) { logger.error("shell task failure", e); exitStatusCode = -1; + throw e; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index 1fd54785d1..34f7d13ca8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.server.worker.task.spark; +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.enums.SparkVersion; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; @@ -25,7 +27,6 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; import org.apache.dolphinscheduler.server.worker.task.TaskProps; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import java.util.ArrayList; @@ -38,9 +39,14 @@ import java.util.Map; public class SparkTask extends AbstractYarnTask { /** - * spark command + * spark1 command + */ + private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; + + /** + * spark2 command */ - private static final String SPARK_COMMAND = "spark-submit"; + private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; /** * spark parameters @@ -89,7 +95,14 @@ public class SparkTask extends AbstractYarnTask { protected String buildCommand() { List args = new ArrayList<>(); - args.add(SPARK_COMMAND); + //spark version + String sparkCommand = SPARK2_COMMAND; + + if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SPARK1_COMMAND; + } + + args.add(sparkCommand); // other parameters args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index f0478ac74c..ccfee2efec 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -261,9 +261,7 @@ public class SqlTask extends AbstractTask { Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), SEMICOLON, HIVE_CONF); - if(connParamMap != null){ - paramProp.putAll(connParamMap); - } + paramProp.putAll(connParamMap); connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), paramProp); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index 3d428eab89..272fb546da 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -29,7 +29,7 @@ public class DependentTaskTest { @Test - public void testDependInit(){ + public void testDependInit() throws Exception{ TaskProps taskProps = new TaskProps(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java new file mode 100644 index 0000000000..b502e13bc6 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTaskTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.spark; + +import org.apache.commons.lang3.StringUtils; +import org.apache.dolphinscheduler.common.enums.SparkVersion; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.spark.SparkParameters; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.server.utils.ParamUtils; +import org.apache.dolphinscheduler.server.utils.SparkArgsUtils; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class SparkTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(SparkTaskTest.class); + + /** + * spark1 command + */ + private static final String SPARK1_COMMAND = "${SPARK_HOME1}/bin/spark-submit"; + + /** + * spark2 command + */ + private static final String SPARK2_COMMAND = "${SPARK_HOME2}/bin/spark-submit"; + + @Test + public void testSparkTaskInit() { + + TaskProps taskProps = new TaskProps(); + + String spark1Params = "{" + + "\"mainArgs\":\"\", " + + "\"driverMemory\":\"1G\", " + + "\"executorMemory\":\"2G\", " + + "\"programType\":\"SCALA\", " + + "\"mainClass\":\"basicetl.GlobalUserCar\", " + + "\"driverCores\":\"2\", " + + "\"deployMode\":\"cluster\", " + + "\"executorCores\":2, " + + "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + + "\"sparkVersion\":\"SPARK1\", " + + "\"numExecutors\":\"10\", " + + "\"localParams\":[], " + + "\"others\":\"\", " + + "\"resourceList\":[]" + + "}"; + + String spark2Params = "{" + + "\"mainArgs\":\"\", " + + "\"driverMemory\":\"1G\", " + + "\"executorMemory\":\"2G\", " + + "\"programType\":\"SCALA\", " + + "\"mainClass\":\"basicetl.GlobalUserCar\", " + + "\"driverCores\":\"2\", " + + "\"deployMode\":\"cluster\", " + + "\"executorCores\":2, " + + "\"mainJar\":{\"res\":\"test-1.0-SNAPSHOT.jar\"}, " + + "\"sparkVersion\":\"SPARK2\", " + + "\"numExecutors\":\"10\", " + + "\"localParams\":[], " + + "\"others\":\"\", " + + "\"resourceList\":[]" + + "}"; + + taskProps.setTaskParams(spark2Params); + + logger.info("spark task params {}", taskProps.getTaskParams()); + + SparkParameters sparkParameters = JSONUtils.parseObject(taskProps.getTaskParams(), SparkParameters.class); + + assert sparkParameters != null; + if (!sparkParameters.checkParameters()) { + throw new RuntimeException("spark task params is not valid"); + } + sparkParameters.setQueue(taskProps.getQueue()); + + if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { + String args = sparkParameters.getMainArgs(); + + /** + * combining local and global parameters + */ + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + sparkParameters.getLocalParametersMap(), + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); + if (paramsMap != null) { + args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); + } + sparkParameters.setMainArgs(args); + } + + List args = new ArrayList<>(); + + //spark version + String sparkCommand = SPARK2_COMMAND; + + if (SparkVersion.SPARK1.name().equals(sparkParameters.getSparkVersion())) { + sparkCommand = SPARK1_COMMAND; + } + + args.add(sparkCommand); + + // other parameters + args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); + + String sparkArgs = String.join(" ", args); + + logger.info("spark task command : {}", sparkArgs); + + Assert.assertEquals(sparkArgs.split(" ")[0], SPARK2_COMMAND ); + + } +} diff --git a/dolphinscheduler-ui/build/webpack.config.combined.js b/dolphinscheduler-ui/build/webpack.config.release.js similarity index 100% rename from dolphinscheduler-ui/build/webpack.config.combined.js rename to dolphinscheduler-ui/build/webpack.config.release.js diff --git a/dolphinscheduler-ui/package.json b/dolphinscheduler-ui/package.json index 2aeda8b841..9b02888e00 100644 --- a/dolphinscheduler-ui/package.json +++ b/dolphinscheduler-ui/package.json @@ -11,7 +11,7 @@ "lint:fix": "standard \"**/*.{js,vue}\" --fix", "start": "npm run dev", "combo": "node ./build/combo.js", - "build:combined": "npm run clean && cross-env NODE_ENV=production PUBLIC_PATH=/dolphinscheduler/ui webpack --config ./build/webpack.config.combined.js" + "build:release": "npm run clean && cross-env NODE_ENV=production PUBLIC_PATH=/dolphinscheduler/ui webpack --config ./build/webpack.config.release.js" }, "dependencies": { "ans-ui": "1.1.4", diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js index 0a86186933..11f22132c7 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.js @@ -91,22 +91,232 @@ Dag.prototype.toolbarEvent = function ({ item, code, is }) { /** * Echo data display */ -Dag.prototype.backfill = function () { - jsPlumb.ready(() => { - JSP.init({ - dag: this.dag, - instance: this.instance +Dag.prototype.backfill = function (arg) { + if(arg) { + let locationsValue = store.state.dag.locations + let locationsValue1 = store.state.dag.locations + let locationsValue2 = store.state.dag.locations + let arr = [] + for (let i in locationsValue1) { + let objs = new Object(); + objs.id = i + arr.push(Object.assign(objs,locationsValue1[i])); //Attributes + } + let tmp = [] + for(let i in locationsValue2) { + if(locationsValue2[i].targetarr !='' && locationsValue2[i].targetarr.split(',').length>1) { + tmp.push(locationsValue2[i]) + } + } + + function copy (array) { + let newArray = [] + for(let item of array) { + newArray.push(item); + } + return newArray; + } + + + let newArr = copy(arr) + function getNewArr() { + for(let i= 0; i1) { + newArr[i].targetarr = newArr[i].targetarr.split(',').shift() + } + } + return newArr + } + getNewArr() + /** + * @description Transform flat data into a tree structure + * @param {Array} arr Flat data + * @param {String} pidStr targetarr key name + * @param {String} idStr id key name + * @param {String} childrenStr children key name + */ + function fommat({arrayList, pidStr = 'targetarr', idStr = 'id', childrenStr = 'children'}) { + let listOjb = {}; // Used to store objects of the form {key: obj} + let treeList = []; // An array to store the final tree structure data + // Transform the data into {key: obj} format, which is convenient for the following data processing + for (let i = 0; i < arrayList.length; i++) { + listOjb[arrayList[i][idStr]] = arrayList[i] + } + // Format data based on pid + for (let j = 0; j < arrayList.length; j++) { + // Determine if the parent exists + // let haveParent = arrayList[j].targetarr.split(',').length>1?listOjb[arrayList[j].targetarr.split(',')[0]]:listOjb[arrayList[j][pidStr]] + let haveParent = listOjb[arrayList[j][pidStr]] + if (haveParent) { + // If there is no parent children field, create a children field + !haveParent[childrenStr] && (haveParent[childrenStr] = []) + // Insert child in parent + haveParent[childrenStr].push(arrayList[j]) + } else { + // If there is no parent, insert directly into the outermost layer + treeList.push(arrayList[j]) + } + } + return treeList + } + let datas = fommat({arrayList: newArr,pidStr: 'targetarr'}) + // Count the number of leaf nodes + function getLeafCountTree(json) { + if(!json.children) { + json.colspan = 1; + return 1; + } else { + let leafCount = 0; + for(let i = 0 ; i < json.children.length ; i++){ + leafCount = leafCount + getLeafCountTree(json.children[i]); + } + json.colspan = leafCount; + return leafCount; + } + } + // Number of tree node levels + let countTree = getLeafCountTree(datas[0]) + function getMaxFloor(treeData) { + let floor = 0 + let v = this + let max = 0 + function each (data, floor) { + data.forEach(e => { + e.floor = floor + e.x=floor*170 + if (floor > max) { + max = floor + } + if (e.children) { + each(e.children, floor + 1) + } + }) + } + each(treeData,1) + return max + } + getMaxFloor(datas) + // The last child of each node + let lastchildren = []; + forxh(datas); + function forxh(list) { + for (let i = 0; i < list.length; i++) { + let chlist = list[i]; + if (chlist.children) { + forxh(chlist.children); + } else { + lastchildren.push(chlist); + } + } + } + // Get all parent nodes above the leaf node + function treeFindPath (tree, func, path,n) { + if (!tree) return [] + for (const data of tree) { + path.push(data.name) + if (func(data)) return path + if (data.children) { + const findChildren = treeFindPath(data.children, func, path,n) + if (findChildren.length) return findChildren + } + path.pop() + } + return [] + } + function toLine(data){ + return data.reduce((arr, {id, name, targetarr, x, y, children = []}) => + arr.concat([{id, name, targetarr, x, y}], toLine(children)), []) + return result; + } + let listarr = toLine(datas); + let listarrs = toLine(datas) + let dataObject = {} + for(let i = 0; i value2) { + return 1; + } else { + return 0; + } + }; + } + + lastchildren = lastchildren.sort(createComparisonFunction('x')) + + // Coordinate value of each leaf node + for(let a = 0; a data.targetarr===lastchildren[i].targetarr,[],i+1) + for(let j = 0; j1) { + dataObject[Object.keys(locationsValue1)[0]].y = (countTree/2)*120+50 + } + + locationsValue = dataObject + jsPlumb.ready(() => { + JSP.init({ + dag: this.dag, + instance: this.instance + }) + // Backfill + JSP.jspBackfill({ + // connects + connects: _.cloneDeep(store.state.dag.connects), + // Node location information + locations: _.cloneDeep(locationsValue), + // Node data + largeJson: _.cloneDeep(store.state.dag.tasks) + }) }) - // Backfill - JSP.jspBackfill({ - // connects - connects: _.cloneDeep(store.state.dag.connects), - // Node location information - locations: _.cloneDeep(store.state.dag.locations), - // Node data - largeJson: _.cloneDeep(store.state.dag.tasks) + } else { + jsPlumb.ready(() => { + JSP.init({ + dag: this.dag, + instance: this.instance + }) + // Backfill + JSP.jspBackfill({ + // connects + connects: _.cloneDeep(store.state.dag.connects), + // Node location information + locations: _.cloneDeep(store.state.dag.locations), + // Node data + largeJson: _.cloneDeep(store.state.dag.tasks) + }) }) - }) + } } /** diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue index 0c8cd2e861..94dc2651be 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue @@ -22,6 +22,7 @@
@@ -65,10 +66,12 @@ v-for="(item,$index) in toolOperList" :class="_operationClass(item)" :id="item.code" + :key="$index" @click="_ckOperation(item,$event)">
+ {}) @@ -513,7 +552,7 @@ }) }, mounted () { - this.init() + this.init(this.arg) }, beforeDestroy () { this.resetParams() diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue index 939dccb0f5..2457a23faa 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/spark.vue @@ -32,6 +32,22 @@
+ +
{{$t('Spark Version')}}
+
+ + + + +
+
{{$t('Main class')}}
@@ -224,7 +240,11 @@ // Program type programType: 'SCALA', // Program type(List) - programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }] + programTypeList: [{ code: 'JAVA' }, { code: 'SCALA' }, { code: 'PYTHON' }], + // Spark version + sparkVersion: 'SPARK2', + // Spark version(LIst) + sparkVersionList: [{ code: 'SPARK2' }, { code: 'SPARK1' }] } }, props: { @@ -318,7 +338,8 @@ executorCores: this.executorCores, mainArgs: this.mainArgs, others: this.others, - programType: this.programType + programType: this.programType, + sparkVersion: this.sparkVersion }) return true }, @@ -366,6 +387,7 @@ this.mainArgs = o.params.mainArgs || '' this.others = o.params.others this.programType = o.params.programType || 'SCALA' + this.sparkVersion = o.params.sparkVersion || 'SPARK2' // backfill resourceList let resourceList = o.params.resourceList || [] diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index fb87146dff..dfc7c72c49 100644 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -82,7 +82,6 @@ export default { 'Please enter a positive integer': 'Please enter a positive integer', 'Program Type': 'Program Type', 'Main class': 'Main class', - 'Please enter main class': 'Please enter main class', 'Main jar package': 'Main jar package', 'Please enter main jar package': 'Please enter main jar package', 'Command-line parameters': 'Command-line parameters', @@ -506,5 +505,6 @@ export default { 'There is no data for this period of time': 'There is no data for this period of time', 'IP address cannot be empty': 'IP address cannot be empty', 'Please enter the correct IP': 'Please enter the correct IP', - 'Please generate token': 'Please generate token' + 'Please generate token': 'Please generate token', + 'Spark Version': 'Spark Version' } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 11fd0940bf..7df7cdd230 100644 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -505,5 +505,6 @@ export default { 'There is no data for this period of time': '该时间段无数据', 'IP address cannot be empty': 'IP地址不能为空', 'Please enter the correct IP': '请输入正确的IP', - 'Please generate token': '请生成Token' + 'Please generate token': '请生成Token', + 'Spark Version': 'Spark版本' } diff --git a/pom.xml b/pom.xml index 61c2297621..53607c3b60 100644 --- a/pom.xml +++ b/pom.xml @@ -612,10 +612,14 @@ ${maven-surefire-plugin.version} + **/common/utils/*.java **/api/utils/CheckUtilsTest.java **/api/utils/FileUtilsTest.java + **/common/graph/*.java - **/*CollectionUtilsTest.java + **/api/utils/CheckUtilsTest.java + **/api/utils/FileUtilsTest.java + **/alert/utils/ExcelUtilsTest.java