Browse Source

Merge pull request #8 from apache/dev

update
pull/2/head
samz406 5 years ago committed by GitHub
parent
commit
da7b531537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      README.md
  2. 5
      README_zh_CN.md
  3. 7
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java
  4. 92
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java
  5. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java
  7. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
  8. 43
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java
  9. 43
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java
  10. 3
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java
  11. 56
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
  12. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  13. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java
  14. 0
      dolphinscheduler-ui/build/webpack.config.release.js
  15. 2
      dolphinscheduler-ui/package.json
  16. 4
      pom.xml

3
README.md

@ -1,4 +1,5 @@
Dolphin Scheduler
Dolphin Scheduler Official Website
[dolphinscheduler.apache.org](https://dolphinscheduler.apache.org)
============
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![Total Lines](https://tokei.rs/b1/github/apache/Incubator-DolphinScheduler?category=lines)](https://github.com/apache/Incubator-DolphinScheduler)

5
README_zh_CN.md

@ -1,4 +1,5 @@
Dolphin Scheduler
Dolphin Scheduler Official Website
[dolphinscheduler.apache.org](https://dolphinscheduler.apache.org)
============
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![Total Lines](https://tokei.rs/b1/github/apache/Incubator-DolphinScheduler?category=lines)](https://github.com/apache/Incubator-DolphinScheduler)
@ -88,7 +89,7 @@ Dolphin Scheduler使用了很多优秀的开源项目,比如google的guava、g
### 获得帮助
1. Submit an issue
1. Mail list: dev@dolphinscheduler.apache.org. Mail to dev-subscribe@dolphinscheduler.apache.org, follow the reply to subscribe the mail list.
1. Mail to dev-subscribe@dolphinscheduler.apache.org, follow the reply to subscribe the mail list. then you can send mail to dev@dolphinscheduler.apache.org.
1. Contact WeChat group manager, ID 510570367. This is for Mandarin(CN) discussion.
### 版权

7
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/ExcelUtils.java

@ -41,12 +41,9 @@ public class ExcelUtils {
*/
public static void genExcelFile(String content,String title,String xlsFilePath){
List<LinkedHashMap> itemsList;
try {
//The JSONUtils.toList has been try catch ex
itemsList = JSONUtils.toList(content, LinkedHashMap.class);
}catch (Exception e){
logger.error(String.format("json format incorrect : %s",content),e);
throw new RuntimeException("json format incorrect",e);
}
if (itemsList == null || itemsList.size() == 0){
logger.error("itemsList is null");

92
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/ExcelUtilsTest.java

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import static org.junit.Assert.assertTrue;
public class ExcelUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ExcelUtilsTest.class);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private String rootPath = null;
@Before
public void setUp() throws Exception {
folder.create();
rootPath = folder.getRoot().getAbsolutePath();
}
@After
public void tearDown() throws Exception {
folder.delete();
}
/**
* Test GenExcelFile
*/
@Test
public void testGenExcelFile() {
//Define dest file path
String xlsFilePath = rootPath + System.getProperty("file.separator");
logger.info("xlsFilePath: "+xlsFilePath);
//Define correctContent
String correctContent = "[{\"name\":\"ds name\",\"value\":\"ds value\"}]";
//Define incorrectContent
String incorrectContent1 = "{\"name\":\"ds name\",\"value\":\"ds value\"}";
//Define title
String title = "test report";
//Invoke genExcelFile with correctContent
ExcelUtils.genExcelFile(correctContent, title, xlsFilePath);
//Test file exists
File xlsFile = new File(xlsFilePath + Constants.SINGLE_SLASH + title + Constants.EXCEL_SUFFIX_XLS);
assertTrue(xlsFile.exists());
//Expected RuntimeException
expectedException.expect(RuntimeException.class);
//Expected error message
expectedException.expectMessage("itemsList is null");
//Invoke genExcelFile with incorrectContent, will cause RuntimeException
ExcelUtils.genExcelFile(incorrectContent1, title, xlsFilePath);
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -439,7 +439,7 @@ public final class Constants {
/**
* default master commit retry interval
*/
public static final int defaultMasterCommitRetryInterval = 100;
public static final int defaultMasterCommitRetryInterval = 3000;
/**
* time unit secong to minutes

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java

@ -45,7 +45,7 @@ public interface ITaskQueue {
* @param key queue name
* @param value
*/
void add(String key, String value);
boolean add(String key, String value);
/**
* an element pops out of the queue

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

@ -118,14 +118,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
@Override
public void add(String key, String value) {
public boolean add(String key, String value){
try {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
return true;
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
return false;
}
}

43
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/BaseTaskQueueTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.queue;
import org.apache.dolphinscheduler.common.zk.ZKServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* base task queue test for only start zk server once
*/
public class BaseTaskQueueTest {
protected static ITaskQueue tasksQueue = null;
@BeforeClass
public static void setup() {
ZKServer.start();
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
}
@AfterClass
public static void tearDown() {
tasksQueue.delete();
ZKServer.stop();
}
}

43
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/TestZkServer.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* demo for using zkServer
*/
public class TestZkServer {
@Before
public void before(){
ZKServer.start();
}
@Test
public void test(){
Assert.assertTrue(ZKServer.isStarted());
}
@After
public void after(){
ZKServer.stop();
}
}

3
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/zk/ZKServer.java

@ -80,6 +80,7 @@ public class ZKServer {
*/
public static void startLocalZkServer(final int port) {
startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis());
startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + "test-" + System.currentTimeMillis());
}
/**
@ -137,6 +138,8 @@ public class ZKServer {
public static void stop() {
try {
stopLocalZkServer(true);
logger.info("zk server stopped");
} catch (Exception e) {
logger.error("Failed to stop ZK ",e);
}

56
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@ -758,7 +758,7 @@ public class ProcessDao {
}
/**
* submit task to mysql and task queue
* submit task to db
* submit sub process to command
* @param taskInstance taskInstance
* @param processInstance processInstance
@ -769,21 +769,18 @@ public class ProcessDao {
logger.info("start submit task : {}, instance id:{}, state: {}, ",
taskInstance.getName(), processInstance.getId(), processInstance.getState() );
processInstance = this.findProcessInstanceDetailById(processInstance.getId());
//submit to mysql
TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
if(task.isSubProcess() && !task.getState().typeIsFinished()){
ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
//submit to db
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if(task == null){
logger.error("end submit task to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(), taskInstance.getProcessInstance(), processInstance.getState());
return task;
}
if(!task.getState().typeIsFinished()){
createSubWorkProcessCommand(processInstance, task);
}
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
}else if(!task.getState().typeIsFinished()){
//submit to task queue
task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
submitTaskToQueue(task);
}
logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ",
taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
return task;
}
@ -845,13 +842,18 @@ public class ProcessDao {
/**
* create sub work process command
* @param parentProcessInstance parentProcessInstance
* @param instanceMap instanceMap
* @param childDefineId instanceMap
* @param task task
*/
private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstanceMap instanceMap,
Integer childDefineId, TaskInstance task){
TaskInstance task){
if(!task.isSubProcess()){
return;
}
ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());
CommandType fatherType = parentProcessInstance.getCommandType();
@ -921,7 +923,7 @@ public class ProcessDao {
* @param processInstance processInstance
* @return task instance
*/
public TaskInstance submitTaskInstanceToMysql(TaskInstance taskInstance, ProcessInstance processInstance){
public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance){
ExecutionStatus processInstanceState = processInstance.getState();
if(taskInstance.getState().typeIsFailure()){
@ -949,7 +951,10 @@ public class ProcessDao {
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance, processInstanceState));
taskInstance.setSubmitTime(new Date());
saveTaskInstance(taskInstance);
boolean saveResult = saveTaskInstance(taskInstance);
if(!saveResult){
return null;
}
return taskInstance;
}
@ -961,6 +966,10 @@ public class ProcessDao {
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
@ -971,14 +980,13 @@ public class ProcessDao {
return true;
}
logger.info("task ready to queue: {}" , taskInstance);
taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
boolean insertQueueResult = taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
return true;
return insertQueueResult;
}catch (Exception e){
logger.error("submit task to queue Exception: ", e);
logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
return false;
}
}

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -114,21 +114,37 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
int retryTimes = 1;
while (retryTimes <= commitRetryTimes){
boolean taskDBFlag = false;
boolean taskQueueFlag = false;
TaskInstance task = null;
while (true){
try {
TaskInstance task = processDao.submitTask(taskInstance, processInstance);
if(task != null){
if(!taskDBFlag){
// submit task to db
task = processDao.submitTask(taskInstance, processInstance);
if(task != null && task.getId() != 0){
taskDBFlag = true;
}
}
if(taskDBFlag && !taskQueueFlag){
// submit task to queue
taskQueueFlag = processDao.submitTaskToQueue(task);
}
if(taskDBFlag && taskQueueFlag){
return task;
}
logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
if(!taskDBFlag){
logger.error("task commit to db failed , task has already retry {} times, please check the database", retryTimes);
}else if(!taskQueueFlag){
logger.error("task commit to queue failed , task has already retry {} times, please check the database", retryTimes);
}
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
}
retryTimes += 1;
}
return null;
}
/**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java

@ -91,6 +91,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public Boolean waitTaskQuit(){
// query new state
taskInstance = processDao.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out
Boolean checkTimeout = false;
TaskTimeoutParameter taskTimeoutParameter = getTaskTimeoutParameter();

0
dolphinscheduler-ui/build/webpack.config.combined.js → dolphinscheduler-ui/build/webpack.config.release.js

2
dolphinscheduler-ui/package.json

@ -11,7 +11,7 @@
"lint:fix": "standard \"**/*.{js,vue}\" --fix",
"start": "npm run dev",
"combo": "node ./build/combo.js",
"build:combined": "npm run clean && cross-env NODE_ENV=production PUBLIC_PATH=/dolphinscheduler/ui webpack --config ./build/webpack.config.combined.js"
"build:release": "npm run clean && cross-env NODE_ENV=production PUBLIC_PATH=/dolphinscheduler/ui webpack --config ./build/webpack.config.release.js"
},
"dependencies": {
"ans-ui": "1.1.4",

4
pom.xml

@ -613,10 +613,14 @@
<configuration>
<includes>
<include>**/common/utils/*.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include>
<include>**/common/graph/*.java</include>
<include>**/common/queue/*.java</include>
<include>**/api/utils/CheckUtilsTest.java</include>
<include>**/api/utils/FileUtilsTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
</includes>
<!-- <skip>true</skip> -->
</configuration>

Loading…
Cancel
Save