diff --git a/README.md b/README.md index c72fc52a26..aee0e3b2e3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -Dolphin Scheduler +Dolphin Scheduler Official Website +[dolphinscheduler.apache.org](https://dolphinscheduler.apache.org) ============ [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Total Lines](https://tokei.rs/b1/github/apache/Incubator-DolphinScheduler?category=lines)](https://github.com/apache/Incubator-DolphinScheduler) diff --git a/README_zh_CN.md b/README_zh_CN.md index ef7e2956a4..f8855aca6c 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -1,4 +1,5 @@ -Dolphin Scheduler +Dolphin Scheduler Official Website +[dolphinscheduler.apache.org](https://dolphinscheduler.apache.org) ============ [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Total Lines](https://tokei.rs/b1/github/apache/Incubator-DolphinScheduler?category=lines)](https://github.com/apache/Incubator-DolphinScheduler) @@ -88,7 +89,7 @@ Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、g ### 获得帮助 1. Submit an issue -1. Mail list: dev@dolphinscheduler.apache.org. Mail to dev-subscribe@dolphinscheduler.apache.org, follow the reply to subscribe the mail list. +1. Mail to dev-subscribe@dolphinscheduler.apache.org, follow the reply to subscribe the mail list. then you can send mail to dev@dolphinscheduler.apache.org. 1. Contact WeChat group manager, ID 510570367. This is for Mandarin(CN) discussion. ### 版权 diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java index 522a1b9513..2aee3d4ef6 100644 --- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java +++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java @@ -41,12 +41,9 @@ public class ExcelUtils { */ public static void genExcelFile(String content,String title,String xlsFilePath){ List itemsList; - try { - itemsList = JSONUtils.toList(content, LinkedHashMap.class); - }catch (Exception e){ - logger.error(String.format("json format incorrect : %s",content),e); - throw new RuntimeException("json format incorrect",e); - } + + //The JSONUtils.toList has been try catch ex + itemsList = JSONUtils.toList(content, LinkedHashMap.class); if (itemsList == null || itemsList.size() == 0){ logger.error("itemsList is null"); diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java new file mode 100644 index 0000000000..3ef43aeef4 --- /dev/null +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.alert.utils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import static org.junit.Assert.assertTrue; + +public class ExcelUtilsTest { + + private static final Logger logger = LoggerFactory.getLogger(ExcelUtilsTest.class); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private String rootPath = null; + + @Before + public void setUp() throws Exception { + + folder.create(); + rootPath = folder.getRoot().getAbsolutePath(); + } + + @After + public void tearDown() throws Exception { + + folder.delete(); + } + + /** + * Test GenExcelFile + */ + @Test + public void testGenExcelFile() { + + //Define dest file path + String xlsFilePath = rootPath + System.getProperty("file.separator"); + logger.info("xlsFilePath: "+xlsFilePath); + + //Define correctContent + String correctContent = "[{\"name\":\"ds name\",\"value\":\"ds value\"}]"; + + //Define incorrectContent + String incorrectContent1 = "{\"name\":\"ds name\",\"value\":\"ds value\"}"; + + //Define title + String title = "test report"; + + //Invoke genExcelFile with correctContent + ExcelUtils.genExcelFile(correctContent, title, xlsFilePath); + + //Test file exists + File xlsFile = new File(xlsFilePath + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS); + assertTrue(xlsFile.exists()); + + //Expected RuntimeException + expectedException.expect(RuntimeException.class); + + //Expected error message + expectedException.expectMessage("itemsList is null"); + + //Invoke genExcelFile with incorrectContent, will cause RuntimeException + ExcelUtils.genExcelFile(incorrectContent1, title, xlsFilePath); + + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 791c0bb558..c675ad55bd 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -439,7 +439,7 @@ public final class Constants { /** * default master commit retry interval */ - public static final int defaultMasterCommitRetryInterval = 100; + public static final int defaultMasterCommitRetryInterval = 3000; /** * time unit secong to minutes diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java index 054e25dd3a..6e937f0c3e 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java @@ -45,7 +45,7 @@ public interface ITaskQueue { * @param key queue name * @param value */ - void add(String key, String value); + boolean add(String key, String value); /** * an element pops out of the queue diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index b0ecc62710..d2096f6cd1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -118,14 +118,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override - public void add(String key, String value) { + public boolean add(String key, String value){ try { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); + return true; } catch (Exception e) { logger.error("add task to tasks queue exception",e); + return false; } } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java new file mode 100644 index 0000000000..0bd4266bb7 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.queue; + +import org.apache.dolphinscheduler.common.zk.ZKServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +/** + * base task queue test for only start zk server once + */ +public class BaseTaskQueueTest { + + protected static ITaskQueue tasksQueue = null; + + @BeforeClass + public static void setup() { + ZKServer.start(); + tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + //clear all data + tasksQueue.delete(); + } + + @AfterClass + public static void tearDown() { + tasksQueue.delete(); + ZKServer.stop(); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java new file mode 100644 index 0000000000..d1a0526309 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * demo for using zkServer + */ +public class TestZkServer { + + @Before + public void before(){ + ZKServer.start(); + } + + @Test + public void test(){ + Assert.assertTrue(ZKServer.isStarted()); + } + + @After + public void after(){ + ZKServer.stop(); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java index 5aba9fd8a1..9d33fe1a1d 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java @@ -80,6 +80,7 @@ public class ZKServer { */ public static void startLocalZkServer(final int port) { startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis()); + startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + "test-" + System.currentTimeMillis()); } /** @@ -137,6 +138,8 @@ public class ZKServer { public static void stop() { try { stopLocalZkServer(true); + logger.info("zk server stopped"); + } catch (Exception e) { logger.error("Failed to stop ZK ",e); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java index 3080efa234..eb97ad75b2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java @@ -758,7 +758,7 @@ public class ProcessDao { } /** - * submit task to mysql and task queue + * submit task to db * submit sub process to command * @param taskInstance taskInstance * @param processInstance processInstance @@ -769,21 +769,18 @@ public class ProcessDao { logger.info("start submit task : {}, instance id:{}, state: {}, ", taskInstance.getName(), processInstance.getId(), processInstance.getState() ); processInstance = this.findProcessInstanceDetailById(processInstance.getId()); - //submit to mysql - TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance); - if(task.isSubProcess() && !task.getState().typeIsFinished()){ - ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task); - - TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); - Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); - Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); - createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task); - }else if(!task.getState().typeIsFinished()){ - //submit to task queue - task.setProcessInstancePriority(processInstance.getProcessInstancePriority()); - submitTaskToQueue(task); - } - logger.info("submit task :{} state:{} complete, instance id:{} state: {} ", + //submit to db + TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance); + if(task == null){ + logger.error("end submit task to db error, task name:{}, process id:{} state: {} ", + taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState()); + return task; + } + if(!task.getState().typeIsFinished()){ + createSubWorkProcessCommand(processInstance, task); + } + + logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ", taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState()); return task; } @@ -845,13 +842,18 @@ public class ProcessDao { /** * create sub work process command * @param parentProcessInstance parentProcessInstance - * @param instanceMap instanceMap - * @param childDefineId instanceMap * @param task task */ private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance, - ProcessInstanceMap instanceMap, - Integer childDefineId, TaskInstance task){ + TaskInstance task){ + if(!task.isSubProcess()){ + return; + } + ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task); + TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class); + Map subProcessParam = JSONUtils.toMap(taskNode.getParams()); + Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID)); + ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId()); CommandType fatherType = parentProcessInstance.getCommandType(); @@ -921,7 +923,7 @@ public class ProcessDao { * @param processInstance processInstance * @return task instance */ - public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){ + public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance){ ExecutionStatus processInstanceState = processInstance.getState(); if(taskInstance.getState().typeIsFailure()){ @@ -949,7 +951,10 @@ public class ProcessDao { taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState)); taskInstance.setSubmitTime(new Date()); - saveTaskInstance(taskInstance); + boolean saveResult = saveTaskInstance(taskInstance); + if(!saveResult){ + return null; + } return taskInstance; } @@ -961,6 +966,10 @@ public class ProcessDao { public Boolean submitTaskToQueue(TaskInstance taskInstance) { try{ + if(taskInstance.getState().typeIsFinished()){ + logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString())); + return true; + } // task cannot submit when running if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){ logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName())); @@ -971,14 +980,13 @@ public class ProcessDao { return true; } logger.info("task ready to queue: {}" , taskInstance); - taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); + boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance)); logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) ); - return true; + return insertQueueResult; }catch (Exception e){ logger.error("submit task to queue Exception: ", e); logger.error("task queue error : %s", JSONUtils.toJson(taskInstance)); return false; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java index 4934df1978..9bb5c555fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java @@ -114,21 +114,37 @@ public class MasterBaseTaskExecThread implements Callable { Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval(); int retryTimes = 1; - - while (retryTimes <= commitRetryTimes){ + boolean taskDBFlag = false; + boolean taskQueueFlag = false; + TaskInstance task = null; + while (true){ try { - TaskInstance task = processDao.submitTask(taskInstance, processInstance); - if(task != null){ + if(!taskDBFlag){ + // submit task to db + task = processDao.submitTask(taskInstance, processInstance); + if(task != null && task.getId() != 0){ + taskDBFlag = true; + } + } + if(taskDBFlag && !taskQueueFlag){ + // submit task to queue + taskQueueFlag = processDao.submitTaskToQueue(task); + } + if(taskDBFlag && taskQueueFlag){ return task; } - logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes); + if(!taskDBFlag){ + logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes); + }else if(!taskQueueFlag){ + logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes); + + } Thread.sleep(commitRetryInterval); } catch (Exception e) { logger.error("task commit to mysql and queue failed : " + e.getMessage(),e); } retryTimes += 1; } - return null; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index f617d5f74d..e91deca511 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -91,6 +91,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public Boolean waitTaskQuit(){ // query new state taskInstance = processDao.findTaskInstanceById(taskInstance.getId()); + logger.info("wait task: process id: {}, task id:{}, task name:{} complete", + this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); // task time out Boolean checkTimeout = false; TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter(); diff --git a/dolphinscheduler-ui/build/webpack.config.combined.js b/dolphinscheduler-ui/build/webpack.config.release.js similarity index 100% rename from dolphinscheduler-ui/build/webpack.config.combined.js rename to dolphinscheduler-ui/build/webpack.config.release.js diff --git a/dolphinscheduler-ui/package.json b/dolphinscheduler-ui/package.json index 2aeda8b841..9b02888e00 100644 --- a/dolphinscheduler-ui/package.json +++ b/dolphinscheduler-ui/package.json @@ -11,7 +11,7 @@ "lint:fix": "standard \"**/*.{js,vue}\" --fix", "start": "npm run dev", "combo": "node ./build/combo.js", - "build:combined": "npm run clean && cross-env NODE_ENV=production PUBLIC_PATH=/dolphinscheduler/ui webpack --config ./build/webpack.config.combined.js" + "build:release": "npm run clean && cross-env NODE_ENV=production PUBLIC_PATH=/dolphinscheduler/ui webpack --config ./build/webpack.config.release.js" }, "dependencies": { "ans-ui": "1.1.4", diff --git a/pom.xml b/pom.xml index 2413c56f21..a56fe6e892 100644 --- a/pom.xml +++ b/pom.xml @@ -613,10 +613,14 @@ **/common/utils/*.java + **/api/utils/CheckUtilsTest.java + **/api/utils/FileUtilsTest.java + **/common/graph/*.java **/common/queue/*.java **/api/utils/CheckUtilsTest.java **/api/utils/FileUtilsTest.java + **/alert/utils/ExcelUtilsTest.java