Browse Source

Merge pull request #15 from apache/dev

update
pull/2/head
samz406 5 years ago committed by GitHub
parent
commit
c6150e3824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  3. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertStatus.java
  4. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java
  5. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java
  6. 60
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
  7. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java
  8. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/FailureStrategy.java
  9. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java
  10. 9
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Priority.java
  11. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ReleaseState.java
  12. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceType.java
  13. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/RunMode.java
  14. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ShowType.java
  15. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java
  16. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskDependType.java
  17. 9
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  18. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java
  19. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UserType.java
  20. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WarningType.java
  21. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java
  22. 163
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
  23. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SpringApplicationContext.java
  24. 35
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java
  25. 48
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java
  26. 11
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
  27. 4
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java
  28. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
  29. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/Application.java
  30. 59
      dolphinscheduler-dist/pom.xml
  31. 2
      dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml
  32. 236
      dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml
  33. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  34. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  35. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  36. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java
  37. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  38. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
  39. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java
  40. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java
  41. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java
  42. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java
  43. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java
  44. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
  45. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
  46. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  47. 96
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  48. 61
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
  49. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
  50. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java
  51. 165
      dolphinscheduler-ui/pom.xml

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/AccessTokenService.java

@ -154,8 +154,13 @@ public class AccessTokenService extends BaseService {
*/
public Map<String, Object> updateToken(int id,int userId, String expireTime, String token) {
Map<String, Object> result = new HashMap<>(5);
AccessToken accessToken = new AccessToken();
accessToken.setId(id);
AccessToken accessToken = accessTokenMapper.selectById(id);
if (accessToken == null) {
logger.error("access token not exist, access token id {}", id);
putMsg(result, Status.ACCESS_TOKEN_NOT_EXIST);
return result;
}
accessToken.setUserId(userId);
accessToken.setExpireTime(DateUtils.stringToDate(expireTime));
accessToken.setToken(token);

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

@ -210,7 +210,6 @@ public class ResourcesService extends BaseService {
}
Resource resource = resourcesMapper.selectById(resourceId);
String originResourceName = resource.getAlias();
if (resource == null) {
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
@ -236,6 +235,7 @@ public class ResourcesService extends BaseService {
}
//get the file suffix
String originResourceName = resource.getAlias();
String suffix = originResourceName.substring(originResourceName.lastIndexOf("."));
//if the name without suffix then add it ,else use the origin name

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertStatus.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* alert status
*/
@Getter
public enum AlertStatus {
/**
* 0 waiting executed; 1 execute successfully2 execute failed
@ -40,4 +38,12 @@ public enum AlertStatus {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* warning message notification method
*/
@Getter
public enum AlertType {
/**
* 0 email; 1 SMS
@ -39,4 +37,12 @@ public enum AlertType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CommandType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* command types
*/
@Getter
public enum CommandType {
/**
@ -59,4 +57,12 @@ public enum CommandType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

60
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java

@ -17,38 +17,44 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* data base types
*/
@Getter
public enum DbType {
/**
* 0 mysql
* 1 postgresql
* 2 hive
* 3 spark
* 4 clickhouse
* 5 oracle
* 6 sqlserver
* 7 db2
*/
MYSQL(0, "mysql"),
POSTGRESQL(1, "postgresql"),
HIVE(2, "hive"),
SPARK(3, "spark"),
CLICKHOUSE(4, "clickhouse"),
ORACLE(5, "oracle"),
SQLSERVER(6, "sqlserver"),
DB2(7, "db2");
/**
* 0 mysql
* 1 postgresql
* 2 hive
* 3 spark
* 4 clickhouse
* 5 oracle
* 6 sqlserver
* 7 db2
*/
MYSQL(0, "mysql"),
POSTGRESQL(1, "postgresql"),
HIVE(2, "hive"),
SPARK(3, "spark"),
CLICKHOUSE(4, "clickhouse"),
ORACLE(5, "oracle"),
SQLSERVER(6, "sqlserver"),
DB2(7, "db2");
DbType(int code, String descp){
this.code = code;
this.descp = descp;
}
DbType(int code, String descp) {
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionStatus.java

@ -18,13 +18,11 @@ package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* running status for workflow and task nodes
*
*/
@Getter
public enum ExecutionStatus {
/**
@ -123,5 +121,11 @@ public enum ExecutionStatus {
return this == KILL || this == STOP ;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/FailureStrategy.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* failure policy when some task node failed.
*/
@Getter
public enum FailureStrategy {
/**
@ -40,4 +38,12 @@ public enum FailureStrategy {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Flag.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* have_script
@ -27,7 +26,6 @@ import lombok.Getter;
* have_map_variables
* have_alert
*/
@Getter
public enum Flag {
/**
* 0 no
@ -45,4 +43,12 @@ public enum Flag {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

9
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Priority.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* define process and task priority
*/
@Getter
public enum Priority {
/**
* 0 highest priority
@ -46,4 +44,11 @@ public enum Priority {
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ReleaseState.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* process define release state
*/
@Getter
public enum ReleaseState {
/**
@ -50,4 +48,12 @@ public enum ReleaseState {
//For values out of enum scope
return null;
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResourceType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* resource type
*/
@Getter
public enum ResourceType {
/**
* 0 file, 1 udf
@ -39,4 +37,12 @@ public enum ResourceType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/RunMode.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* complement data run mode
*/
@Getter
public enum RunMode {
/**
* 0 serial run
@ -39,4 +37,12 @@ public enum RunMode {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ShowType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* show type for email
*/
@Getter
public enum ShowType {
/**
* 0 TABLE;
@ -44,4 +42,12 @@ public enum ShowType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SparkVersion.java

@ -17,9 +17,7 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
@Getter
public enum SparkVersion {
/**
@ -37,4 +35,12 @@ public enum SparkVersion {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskDependType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* task node depend type
*/
@Getter
public enum TaskDependType {
/**
* 0 run current tasks only
@ -41,4 +39,12 @@ public enum TaskDependType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

9
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* task node type
*/
@Getter
public enum TaskType {
/**
* 0 SHELL
@ -61,4 +59,11 @@ public enum TaskType {
return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT);
}
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UdfType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* UDF type
*/
@Getter
public enum UdfType {
/**
* 0 hive; 1 spark
@ -38,4 +36,12 @@ public enum UdfType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/UserType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* user type
*/
@Getter
public enum UserType {
/**
* 0 admin user; 1 general user
@ -39,5 +37,13 @@ public enum UserType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WarningType.java

@ -17,12 +17,10 @@
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.Getter;
/**
* types for whether to send warning when process ending;
*/
@Getter
public enum WarningType {
/**
* 0 do not send warning;
@ -44,4 +42,12 @@ public enum WarningType {
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueFactory.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.queue;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,7 +44,7 @@ public class TaskQueueFactory {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
logger.info("task queue impl use zookeeper ");
return TaskQueueZkImpl.getInstance();
return SpringApplicationContext.getBean(TaskQueueZkImpl.class);
}else{
logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
System.exit(-1);

163
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

@ -17,22 +17,14 @@
package org.apache.dolphinscheduler.common.queue;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.Bytes;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider;
import org.apache.dolphinscheduler.common.zk.ZookeeperConfig;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.apache.dolphinscheduler.common.zk.ZookeeperOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
@ -40,35 +32,13 @@ import java.util.*;
* A singleton of a task queue implemented with zookeeper
* tasks queue implemention
*/
@Service
public class TaskQueueZkImpl implements ITaskQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
private static volatile TaskQueueZkImpl instance;
private CuratorFramework zkClient;
private ZookeeperConfig zookeeperConfig;
private CuratorFramework getZkClient() {
return zkClient;
}
private TaskQueueZkImpl(){
init();
}
public static TaskQueueZkImpl getInstance(){
if (null == instance) {
synchronized (TaskQueueZkImpl.class) {
if(null == instance) {
instance = new TaskQueueZkImpl();
}
}
}
return instance;
}
@Autowired
private ZookeeperOperator zookeeperOperator;
/**
* get all tasks from tasks queue
@ -78,14 +48,12 @@ public class TaskQueueZkImpl implements ITaskQueue {
@Override
public List<String> getAllTasks(String key) {
try {
List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
return list;
} catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e);
}
return new ArrayList<String>();
return new ArrayList<>();
}
/**
@ -99,22 +67,8 @@ public class TaskQueueZkImpl implements ITaskQueue {
public boolean checkTaskExists(String key, String task) {
String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;
try {
Stat stat = zkClient.checkExists().forPath(taskPath);
if(null == stat){
logger.info("check task:{} not exist in task queue",task);
return false;
}else{
logger.info("check task {} exists in task queue ",task);
return true;
}
return zookeeperOperator.isExisted(taskPath);
} catch (Exception e) {
logger.info(String.format("task {} check exists in task queue exception ", task), e);
}
return false;
}
@ -128,9 +82,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
public boolean add(String key, String value){
try {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
zookeeperOperator.persist(taskIdPath, value);
return true;
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
@ -153,8 +105,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
@Override
public List<String> poll(String key, int tasksNum) {
try{
CuratorFramework zk = getZkClient();
List<String> list = zk.getChildren().forPath(getTasksPath(key));
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
if(list != null && list.size() > 0){
@ -277,15 +228,12 @@ public class TaskQueueZkImpl implements ITaskQueue {
@Override
public void removeNode(String key, String nodeValue){
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
String taskIdPath = tasksQueuePath + nodeValue;
logger.info("consume task {}", taskIdPath);
logger.info("removeNode task {}", taskIdPath);
try{
Stat stat = zk.checkExists().forPath(taskIdPath);
if(stat != null){
zk.delete().forPath(taskIdPath);
}
zookeeperOperator.remove(taskIdPath);
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
}
@ -307,13 +255,10 @@ public class TaskQueueZkImpl implements ITaskQueue {
if(value != null && value.trim().length() > 0){
String path = getTasksPath(key) + Constants.SINGLE_SLASH;
CuratorFramework zk = getZkClient();
Stat stat = zk.checkExists().forPath(path + value);
if(null == stat){
String result = zk.create().withMode(CreateMode.PERSISTENT).forPath(path + value,Bytes.toBytes(value));
logger.info("add task:{} to tasks set result:{} ",value,result);
}else{
if(!zookeeperOperator.isExisted(path + value)){
zookeeperOperator.persist(path + value,value);
logger.info("add task:{} to tasks set ",value);
} else{
logger.info("task {} exists in tasks set ",value);
}
@ -336,15 +281,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
public void srem(String key, String value) {
try{
String path = getTasksPath(key) + Constants.SINGLE_SLASH;
CuratorFramework zk = getZkClient();
Stat stat = zk.checkExists().forPath(path + value);
if(null != stat){
zk.delete().forPath(path + value);
logger.info("delete task:{} from tasks set ",value);
}else{
logger.info("delete task:{} from tasks set fail, there is no this task",value);
}
zookeeperOperator.remove(path + value);
}catch(Exception e){
logger.error(String.format("delete task:" + value + " exception"),e);
@ -363,7 +300,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
Set<String> tasksSet = new HashSet<>();
try {
List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
for (String task : list) {
tasksSet.add(task);
@ -377,56 +314,6 @@ public class TaskQueueZkImpl implements ITaskQueue {
return tasksSet;
}
/**
* Init the task queue of zookeeper node
*/
private void init(){
initZkClient();
try {
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
if(zkClient.checkExists().forPath(taskQueuePath) == null){
// create a persistent parent node
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(taskQueuePath);
logger.info("create tasks queue parent node success : {} ",taskQueuePath);
}
}
} catch (Exception e) {
logger.error("create zk node failure",e);
}
}
private void initZkClient() {
Configuration conf = null;
try {
conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH);
} catch (ConfigurationException ex) {
logger.error("load zookeeper properties file failed, system exit");
System.exit(-1);
}
zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(conf.getString("zookeeper.quorum")))
.retryPolicy(new ExponentialBackoffRetry(conf.getInt("zookeeper.retry.base.sleep"), conf.getInt("zookeeper.retry.maxtime"), conf.getInt("zookeeper.retry.max.sleep")))
.sessionTimeoutMs(conf.getInt("zookeeper.session.timeout"))
.connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout"))
.build();
zkClient.start();
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Clear the task queue of zookeeper node
*/
@ -437,16 +324,12 @@ public class TaskQueueZkImpl implements ITaskQueue {
String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
if(zkClient.checkExists().forPath(taskQueuePath) != null){
List<String> list = zkClient.getChildren().forPath(taskQueuePath);
if(zookeeperOperator.isExisted(taskQueuePath)){
List<String> list = zookeeperOperator.getChildrenKeys(taskQueuePath);
for (String task : list) {
zkClient.delete().forPath(taskQueuePath + Constants.SINGLE_SLASH + task);
zookeeperOperator.remove(taskQueuePath + Constants.SINGLE_SLASH + task);
logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task);
}
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/SpringApplicationContext.java → dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/SpringApplicationContext.java

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
package org.apache.dolphinscheduler.common.utils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;

35
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public abstract class AbstractListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
}
protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path);
}

48
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java

@ -16,8 +16,10 @@
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,30 +36,37 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);
//kay is zk path, value is TreeCache
private ConcurrentHashMap<String, TreeCache> allCaches = new ConcurrentHashMap<>();
TreeCache treeCache;
/**
* @param cachePath zk path
* @param listener operator
* register a unified listener of /${dsRoot},
*/
public void registerListener(final String cachePath, final TreeCacheListener listener) {
TreeCache newCache = new TreeCache(zkClient, cachePath);
logger.info("add listener to zk path: {}", cachePath);
@Override
protected void registerListener() {
treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot());
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try {
newCache.start();
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", cachePath);
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
newCache.getListenable().addListener(listener);
treeCache.getListenable().addListener((client, event) -> {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
});
allCaches.put(cachePath, newCache);
}
//for sub class
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}
public String getFromCache(final String cachePath, final String key) {
ChildData resultInCache = allCaches.get(checkNotNull(cachePath)).getCurrentData(key);
ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
}
@ -65,18 +74,15 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
}
public TreeCache getTreeCache(final String cachePath) {
return allCaches.get(checkNotNull(cachePath));
return treeCache;
}
public void close() {
allCaches.forEach((path, cache) -> {
cache.close();
try {
Thread.sleep(500);
} catch (InterruptedException ignore) {
}
});
treeCache.close();
try {
Thread.sleep(500);
} catch (InterruptedException ignore) {
}
super.close();
}
}

11
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java

@ -57,11 +57,13 @@ public class ZookeeperOperator implements InitializingBean {
public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient();
initStateLister();
//init();
registerListener();
}
//for subclass
//protected void init(){}
/**
* this method is for sub class,
*/
protected void registerListener(){}
public void initStateLister() {
checkNotNull(zkClient);
@ -127,9 +129,6 @@ public class ZookeeperOperator implements InitializingBean {
List<String> values;
try {
values = zkClient.getChildren().forPath(key);
if (CollectionUtils.isEmpty(values)) {
logger.warn("getChildrenKeys key : {} is empty", key);
}
return values;
} catch (InterruptedException ex) {
logger.error("getChildrenKeys key : {} InterruptedException", key);

4
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java

@ -32,11 +32,9 @@ import static org.junit.Assert.*;
/**
* task queue test
*/
@Ignore
public class TaskQueueZKImplTest extends BaseTaskQueueTest {
@Before
public void before(){

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@ -105,7 +105,8 @@ public class ProcessDao {
/**
* task queue impl
*/
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
@Autowired
private ITaskQueue taskQueue;
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
* @param logger logger

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/Application.java

@ -21,7 +21,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler.dao")
@ComponentScan("org.apache.dolphinscheduler")
public class Application {
public static void main(String[] args) {

59
dolphinscheduler-dist/pom.xml vendored

@ -102,6 +102,63 @@
</build>
</profile>
<profile>
<id>nginx</id>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>dolphinscheduler-nginx</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/dolphinscheduler-nginx.xml</descriptor>
</descriptors>
<appendAssemblyId>true</appendAssemblyId>
</configuration>
</execution>
<execution>
<id>src</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/dolphinscheduler-src.xml</descriptor>
</descriptors>
<appendAssemblyId>true</appendAssemblyId>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>rpmbuild</id>
<build>
@ -214,6 +271,7 @@
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>config/*.*</include>
</includes>
</source>
@ -222,7 +280,6 @@
${basedir}/../script
</location>
<includes>
<include>config/*.*</include>
<include>env/*.*</include>
</includes>
</source>

2
dolphinscheduler-dist/src/main/assembly/dolphinscheduler-binary.xml vendored

@ -94,6 +94,7 @@
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>config/*.*</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
@ -162,7 +163,6 @@
<fileSet>
<directory>${basedir}/../script</directory>
<includes>
<include>config/*.*</include>
<include>env/*.*</include>
</includes>
<outputDirectory>./conf</outputDirectory>

236
dolphinscheduler-dist/src/main/assembly/dolphinscheduler-nginx.xml vendored

@ -0,0 +1,236 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>dolphinscheduler-nginx</id>
<formats>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>true</includeBaseDirectory>
<baseDirectory>${project.build.finalName}-dolphinscheduler-bin</baseDirectory>
<fileSets>
<!--alert start-->
<fileSet>
<directory>${basedir}/../dolphinscheduler-alert/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>**/*.ftl</include>
</includes>
<outputDirectory>./conf</outputDirectory>
</fileSet>
<!--alert end-->
<!--api start-->
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources/bin</directory>
<includes>
<include>*.*</include>
</includes>
<directoryMode>755</directoryMode>
<outputDirectory>bin</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-dao/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-api/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<!--api end-->
<!--server start-->
<fileSet>
<directory>${basedir}/../dolphinscheduler-server/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>config/*.*</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-common/src/main/resources/bin</directory>
<includes>
<include>*.*</include>
</includes>
<directoryMode>755</directoryMode>
<outputDirectory>bin</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-dao/src/main/resources</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
<include>**/*.json</include>
<include>**/*.yml</include>
</includes>
<outputDirectory>conf</outputDirectory>
</fileSet>
<!--server end-->
<fileSet>
<directory>${basedir}/../dolphinscheduler-server/target/dolphinscheduler-server-${project.version}</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-api/target/dolphinscheduler-api-${project.version}</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-alert/target/dolphinscheduler-alert-${project.version}</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-ui/dist</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>./ui/dist</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../dolphinscheduler-ui</directory>
<includes>
<include>install-dolphinscheduler-ui.sh</include>
</includes>
<outputDirectory>./ui</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../sql</directory>
<includes>
<include>**/*</include>
</includes>
<outputDirectory>./sql</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../script</directory>
<includes>
<include>*.*</include>
</includes>
<outputDirectory>./script</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../script</directory>
<includes>
<include>env/*.*</include>
</includes>
<outputDirectory>./conf</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/../script</directory>
<includes>
<include>start-all.sh</include>
<include>stop-all.sh</include>
<include>dolphinscheduler-daemon.sh</include>
</includes>
<outputDirectory>./bin</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/.././</directory>
<includes>
<include>*.sh</include>
<include>*.py</include>
<include>DISCLAIMER</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
<fileSet>
<directory>${basedir}/release-docs</directory>
<useDefaultExcludes>true</useDefaultExcludes>
<includes>
<include>**/*</include>
</includes>
<outputDirectory>.</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<outputDirectory>lib</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<excludes>
<exclude>javax.servlet:servlet-api</exclude>
<exclude>org.eclipse.jetty.aggregate:jetty-all</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -23,18 +23,17 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
import org.apache.dolphinscheduler.server.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.server.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -18,13 +18,13 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.queue.TaskQueueFactory;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.BeanContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerThread.java

@ -22,12 +22,12 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -29,12 +29,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.FetchTaskThread;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -25,12 +25,12 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.slf4j.Logger;
@ -155,6 +155,7 @@ public class FetchTaskThread implements Runnable{
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
if (CollectionUtils.isEmpty(tasksQueueList)){
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
// creating distributed locks, lock path /dolphinscheduler/lock/worker

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractYarnTask.java

@ -16,10 +16,10 @@
*/
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.slf4j.Logger;
/**

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentExecute.java

@ -23,10 +23,10 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DateInterval;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java

@ -25,9 +25,9 @@ import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java

@ -30,10 +30,10 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.utils.Bytes;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.http.HttpEntity;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/processdure/ProcedureTask.java

@ -29,10 +29,10 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java

@ -22,9 +22,9 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java

@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.common.task.sql.SqlType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
@ -43,7 +44,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

96
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.zk.AbstractListener;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
@ -31,9 +30,6 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
@ -101,12 +97,6 @@ public class ZKMasterClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
// monitor master
this.listenerMaster();
// monitor worker
this.listenerWorker();
// register master
this.registerMaster();
@ -158,31 +148,22 @@ public class ZKMasterClient extends AbstractZKClient {
}
}
/**
* monitor master
* handle path events that this class cares about
* @param client zkClient
* @param event path event
* @param path zk path
*/
public void listenerMaster(){
registerListener(getZNodeParentPath(ZKNodeType.MASTER), new AbstractListener() {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
switch (event.getType()) {
case NODE_ADDED:
logger.info("master node added : {}", path);
break;
case NODE_REMOVED:
String serverHost = getHostByEventDataPath(path);
if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
return;
}
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
default:
break;
}
}
});
}
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(path.startsWith(getZNodeParentPath(ZKNodeType.MASTER)+Constants.SINGLE_SLASH)){ //monitor master
handleMasterEvent(event,path);
}else if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){ //monitor worker
handleWorkerEvent(event,path);
}
//other path event, ignore
}
/**
* remove zookeeper node path
@ -273,25 +254,40 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* monitor worker
* monitor master
*/
public void listenerWorker(){
registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
logger.info("worker node deleted : {}", path);
removeZKNodePath(path, ZKNodeType.WORKER, true);
break;
default:
break;
public void handleMasterEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("master node added : {}", path);
break;
case NODE_REMOVED:
String serverHost = getHostByEventDataPath(path);
if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
return;
}
}
});
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
default:
break;
}
}
/**
* monitor worker
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
logger.info("worker node deleted : {}", path);
removeZKNodePath(path, ZKNodeType.WORKER, true);
break;
default:
break;
}
}

61
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

@ -19,20 +19,13 @@ package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.zk.AbstractListener;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadFactory;
/**
* zookeeper worker client
@ -61,9 +54,6 @@ public class ZKWorkerClient extends AbstractZKClient {
// init system znode
this.initSystemZNode();
// monitor worker
this.listenerWorker();
// register worker
this.registWorker();
}
@ -83,31 +73,38 @@ public class ZKWorkerClient extends AbstractZKClient {
System.exit(-1);
}
}
/**
* monitor worker
* handle path events that this class cares about
* @param client zkClient
* @param event path event
* @param path zk path
*/
private void listenerWorker(){
registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() {
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
default:
break;
}
}
});
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
handleWorkerEvent(event,path);
}
}
/**
* monitor worker
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
default:
break;
}
}
/**

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java

@ -20,10 +20,10 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/sql/SqlExecutorTest.java

@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;

165
dolphinscheduler-ui/pom.xml

@ -32,55 +32,120 @@
<node.version>v12.12.0</node.version>
<npm.version>6.11.3</npm.version>
</properties>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>${frontend-maven-plugin.version}</version>
<executions>
<execution>
<id>install node and npm</id>
<goals>
<goal>install-node-and-npm</goal>
</goals>
<configuration>
<nodeVersion>${node.version}</nodeVersion>
<npmVersion>${npm.version}</npmVersion>
</configuration>
</execution>
<execution>
<id>npm install node-sass --unsafe-perm</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install node-sass --unsafe-perm</arguments>
</configuration>
</execution>
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install</arguments>
</configuration>
</execution>
<execution>
<id>npm run build:release</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build:release</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>nginx</id>
<build>
<plugins>
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>${frontend-maven-plugin.version}</version>
<executions>
<execution>
<id>install node and npm</id>
<goals>
<goal>install-node-and-npm</goal>
</goals>
<configuration>
<nodeVersion>${node.version}</nodeVersion>
<npmVersion>${npm.version}</npmVersion>
</configuration>
</execution>
<execution>
<id>npm install node-sass --unsafe-perm</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install node-sass --unsafe-perm</arguments>
</configuration>
</execution>
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install</arguments>
</configuration>
</execution>
<execution>
<id>npm run build</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>${frontend-maven-plugin.version}</version>
<executions>
<execution>
<id>install node and npm</id>
<goals>
<goal>install-node-and-npm</goal>
</goals>
<configuration>
<nodeVersion>${node.version}</nodeVersion>
<npmVersion>${npm.version}</npmVersion>
</configuration>
</execution>
<execution>
<id>npm install node-sass --unsafe-perm</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install node-sass --unsafe-perm</arguments>
</configuration>
</execution>
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<phase>generate-resources</phase>
<configuration>
<arguments>install</arguments>
</configuration>
</execution>
<execution>
<id>npm run build:release</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>run build:release</arguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Loading…
Cancel
Save