Browse Source

Merge branch 'dev' into alert_param_reduce

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
dc04f88c32
  1. 54
      README.md
  2. 17
      ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml
  3. 44
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkParamsConstants.java
  4. 132
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSender.java
  5. 2
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/test/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSenderTest.java
  6. 82
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/pom.xml
  7. 37
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannel.java
  8. 82
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactory.java
  9. 30
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertPlugin.java
  10. 49
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuParamsConstants.java
  11. 223
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuSender.java
  12. 50
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/HttpRequestUtil.java
  13. 45
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/test/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactoryTest.java
  14. 75
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/test/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuSenderTest.java
  15. 1
      dolphinscheduler-alert-plugin/pom.xml
  16. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  17. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
  18. 21
      dolphinscheduler-dist/release-docs/licenses/ui-licenses/LICENSE-@form-create-element-ui
  19. 5
      dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml
  20. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
  21. 52
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java
  22. 46
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java
  23. 75
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  24. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java
  25. 154
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
  26. 48
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java
  27. 147
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  28. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
  29. 31
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java
  30. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
  31. 43
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java
  32. 1
      dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue
  33. 2
      pom.xml

54
README.md

@ -7,46 +7,44 @@ Dolphin Scheduler Official Website
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=apache-dolphinscheduler&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache-dolphinscheduler)
> Dolphin Scheduler for Big Data
[![Stargazers over time](https://starchart.cc/apache/incubator-dolphinscheduler.svg)](https://starchart.cc/apache/incubator-dolphinscheduler)
[![EN doc](https://img.shields.io/badge/document-English-blue.svg)](README.md)
[![CN doc](https://img.shields.io/badge/文档-中文版-blue.svg)](README_zh_CN.md)
### Design features:
### Design Features:
Dolphin Scheduler is a distributed and easy-to-extend visual DAG workflow scheduling system. It dedicates to solving the complex dependencies in data processing to make the scheduling system `out of the box` for the data processing process.
DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing various types of jobs available `out of the box`.
Its main objectives are as follows:
- Associate the tasks according to the dependencies of the tasks in a DAG graph, which can visualize the running state of the task in real-time.
- Support many task types: Shell, MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Sub_Process, Procedure, etc.
- Support process scheduling, dependency scheduling, manual scheduling, manual pause/stop/recovery, support for failed retry/alarm, recovery from specified nodes, Kill task, etc.
- Support the priority of process & task, task failover, and task timeout alarm or failure.
- Support process global parameters and node custom parameter settings.
- Support online upload/download of resource files, management, etc. Support online file creation and editing.
- Support task log online viewing and scrolling, online download log, etc.
- Implement cluster HA, decentralize Master cluster and Worker cluster through Zookeeper.
- Support various task types: Shell, MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Sub_Process, Procedure, etc.
- Support scheduling of workflows and dependencies, manual scheduling to pause/stop/recover task, support failure task retry/alarm, recover specified nodes from failure, kill task, etc.
- Support the priority of workflows & tasks, task failover, and task timeout alarm or failure.
- Support workflow global parameters and node customized parameter settings.
- Support online upload/download/management of resource files, etc. Support online file creation and editing.
- Support task log online viewing and scrolling and downloading, etc.
- Have implemented cluster HA, decentralize Master cluster and Worker cluster through Zookeeper.
- Support the viewing of Master/Worker CPU load, memory, and CPU usage metrics.
- Support presenting tree or Gantt chart of workflow history as well as the statistics results of task & process status in each workflow.
- Support backfilling data.
- Support displaying workflow history in tree/Gantt chart, as well as statistical analysis on the task status & process status in each workflow.
- Support back-filling data.
- Support multi-tenant.
- Support internationalization.
- There are more waiting for partners to explore...
- More features waiting for partners to explore...
### What's in Dolphin Scheduler
### What's in DolphinScheduler
Stability | Easy to use | Features | Scalability |
-- | -- | -- | --
Decentralized multi-master and multi-worker | Visualization process defines key information such as task status, task type, retry times, task running machine, visual variables, and so on at a glance.  |  Support pause, recover operation | Support custom task types
HA is supported by itself | All process definition operations are visualized, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, the API mode operation is provided. | Users on Dolphin Scheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler uses distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic online and offline.
Overload processing: Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | |
Decentralized multi-master and multi-worker | Visualization of workflow key information, such as task status, task type, retry times, task operation machine information, visual variables, and so on at a glance.  |  Support pause, recover operation | Support customized task types
support HA | Visualization of all workflow operations, dragging tasks to draw DAGs, configuring data sources and resources. At the same time, for third-party systems, provide API mode operations. | Users on DolphinScheduler can achieve many-to-one or one-to-one mapping relationship through tenants and Hadoop users, which is very important for scheduling large data jobs. | The scheduler supports distributed scheduling, and the overall scheduling capability will increase linearly with the scale of the cluster. Master and Worker support dynamic adjustment.
Overload processing: By using the task queue mechanism, the number of schedulable tasks on a single machine can be flexibly configured. Machine jam can be avoided with high tolerance to numbers of tasks cached in task queue. | One-click deployment | Support traditional shell tasks, and big data platform task scheduling: MR, Spark, SQL (MySQL, PostgreSQL, hive, spark SQL), Python, Procedure, Sub_Process | |
### System partial screenshot
### User Interface Screenshots
![home page](https://user-images.githubusercontent.com/15833811/75218288-bf286400-57d4-11ea-8263-d639c6511d5f.jpg)
![dag](https://user-images.githubusercontent.com/15833811/75236750-3374fe80-57f9-11ea-857d-62a66a5a559d.png)
@ -57,13 +55,9 @@ Overload processing: Overload processing: By using the task queue mechanism, the
![security](https://user-images.githubusercontent.com/15833811/75236441-bfd2f180-57f8-11ea-88bd-f24311e01b7e.png)
![treeview](https://user-images.githubusercontent.com/15833811/75217191-3fe56100-57d1-11ea-8856-f19180d9a879.png)
### QuickStart in Docker
Please referer the official website document:[[QuickStart in Docker](https://dolphinscheduler.apache.org/en-us/docs/1.3.4/user_doc/docker-deployment.html)]
### Recent R&D plan
The work plan of Dolphin Scheduler: [R&D plan](https://github.com/apache/incubator-dolphinscheduler/projects/1), which `In Develop` card shows the features that are currently being developed and TODO card lists what needs to be done(including feature ideas).
### How to contribute
Welcome to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/docs/development/contribute.html)]
### How to Build
@ -80,14 +74,16 @@ dolphinscheduler-dist/target/apache-dolphinscheduler-incubating-${latest.release
### Thanks
Dolphin Scheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects which contribute to making the dream of Dolphin Scheduler comes true. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we expect the partners who have the same passion and conviction to open-source will join in and contribute to the open-source community!
DolphinScheduler is based on a lot of excellent open-source projects, such as google guava, guice, grpc, netty, ali bonecp, quartz, and many open-source projects of Apache and so on.
We would like to express our deep gratitude to all the open-source projects used in Dolphin Scheduler. We hope that we are not only the beneficiaries of open-source, but also give back to the community. Besides, we hope everyone who have the same enthusiasm and passion for open source could join in and contribute to the open-source community!
### Get Help
1. Submit an issue
1. Submit an [[issue](https://github.com/apache/incubator-dolphinscheduler/issues/new/choose)]
1. Subscribe to the mail list: https://dolphinscheduler.apache.org/en-us/docs/development/subscribe.html, then email dev@dolphinscheduler.apache.org
### How to Contribute
The community welcomes everyone to participate in contributing, please refer to this website to find out more: [[How to contribute](https://dolphinscheduler.apache.org/en-us/community/development/contribute.html)]
### License
Please refer to the [LICENSE](https://github.com/apache/incubator-dolphinscheduler/blob/dev/LICENSE) file.

17
ambari_plugin/common-services/DOLPHIN/1.3.3/configuration/dolphin-worker.xml

@ -39,16 +39,18 @@
<value-attributes>
<type>int</type>
</value-attributes>
<description>only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2</description>
<description>only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.reserved.memory</name>
<value>0.3</value>
<description>only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G.</description>
<description>only larger than reserved memory, worker server can work. default value : physical memory * 1/10,
unit is G.
</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.listen.port</name>
<value>1234</value>
@ -64,4 +66,13 @@
<description>default worker group</description>
<on-ambari-upgrade add="true"/>
</property>
<property>
<name>worker.weigth</name>
<value>100</value>
<value-attributes>
<type>int</type>
</value-attributes>
<description>worker weight</description>
<on-ambari-upgrade add="true"/>
</property>
</configuration>

44
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkParamsConstants.java

@ -22,36 +22,24 @@ package org.apache.dolphinscheduler.plugin.alert.dingtalk;
*/
public class DingTalkParamsConstants {
static final String DING_TALK_PROXY_ENABLE = "isEnableProxy";
static final String DING_TALK_WEB_HOOK = "webhook";
static final String NAME_DING_TALK_WEB_HOOK = "WebHook";
static final String DING_TALK_KEYWORD = "keyword";
static final String NAME_DING_TALK_KEYWORD = "Keyword";
static final String NAME_DING_TALK_PROXY_ENABLE = "IsEnableProxy";
static final String DING_TALK_PROXY = "proxy";
static final String NAME_DING_TALK_PROXY = "Proxy";
static final String DING_TALK_PORT = "port";
static final String NAME_DING_TALK_PORT = "Port";
static final String DING_TALK_USER = "user";
static final String NAME_DING_TALK_USER = "User";
static final String DING_TALK_PASSWORD = "password";
static final String NAME_DING_TALK_PASSWORD = "Password";
private DingTalkParamsConstants() {
throw new IllegalStateException("Utility class");
}
static final String DING_TALK_WEB_HOOK = "dingtalk.webhook";
static final String NAME_DING_TALK_WEB_HOOK = "dingTalkWebHook";
static final String DING_TALK_KEYWORD = "dingtalk.keyword";
static final String NAME_DING_TALK_KEYWORD = "dingTalkKeyword";
public static final String DING_TALK_PROXY_ENABLE = "dingtalk.isEnableProxy";
static final String NAME_DING_TALK_PROXY_ENABLE = "dingTalkIsEnableProxy";
static final String DING_TALK_PROXY = "dingtalk.proxy";
static final String NAME_DING_TALK_PROXY = "dingTalkProxy";
static final String DING_TALK_PORT = "dingtalk.port";
static final String NAME_DING_TALK_PORT = "dingTalkPort";
static final String DING_TALK_USER = "dingtalk.user";
static final String NAME_DING_TALK_USER = "dingTalkUser";
static final String DING_TALK_PASSWORD = "dingtalk.password";
static final String NAME_DING_TALK_PASSWORD = "dingTalkPassword";
}

132
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSender.java

@ -75,51 +75,6 @@ public class DingTalkSender {
}
public AlertResult sendDingTalkMsg(String msg, String charset) {
AlertResult alertResult;
try {
String resp = sendMsg(msg, charset);
return checkSendDingTalkSendMsgResult(resp);
} catch (Exception e) {
logger.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;
}
private String sendMsg(String msg, String charset) throws IOException {
String msgToJson = textToJsonString(msg + "#" + keyword);
HttpPost httpPost = constructHttpPost(url, msgToJson, charset);
CloseableHttpClient httpClient;
if (Boolean.TRUE.equals(enableProxy)) {
httpClient = getProxyClient(proxy, port, user, password);
RequestConfig rcf = getProxyConfig(proxy, port);
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send {}, resp: {}", msg, resp);
return resp;
} finally {
httpClient.close();
}
}
private static HttpPost constructHttpPost(String url, String msg, String charset) {
HttpPost post = new HttpPost(url);
StringEntity entity = new StringEntity(msg, charset);
@ -155,27 +110,6 @@ public class DingTalkSender {
return JSONUtils.toJsonString(items);
}
public static class DingTalkSendMsgResponse {
private Integer errcode;
private String errmsg;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
}
private static AlertResult checkSendDingTalkSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
@ -201,4 +135,70 @@ public class DingTalkSender {
return alertResult;
}
public AlertResult sendDingTalkMsg(String title, String content) {
AlertResult alertResult;
try {
String resp = sendMsg(title, content);
return checkSendDingTalkSendMsgResult(resp);
} catch (Exception e) {
logger.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;
}
private String sendMsg(String title, String content) throws IOException {
String msgToJson = textToJsonString(title + content + "#" + keyword);
HttpPost httpPost = constructHttpPost(url, msgToJson, "UTF-8");
CloseableHttpClient httpClient;
if (Boolean.TRUE.equals(enableProxy)) {
httpClient = getProxyClient(proxy, port, user, password);
RequestConfig rcf = getProxyConfig(proxy, port);
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, "UTF-8");
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send title :{},content : {}, resp: {}", title, content, resp);
return resp;
} finally {
httpClient.close();
}
}
public static class DingTalkSendMsgResponse {
private Integer errcode;
private String errmsg;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
}
}

2
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/test/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSenderTest.java

@ -50,7 +50,7 @@ public class DingTalkSenderTest {
dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, "true");
dingTalkSender = new DingTalkSender(dingTalkConfig);
AlertResult alertResult = dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8");
AlertResult alertResult = dingTalkSender.sendDingTalkMsg("title", "content test");
Assert.assertEquals("false",alertResult.getStatus());
}

82
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/pom.xml

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-alert-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>1.3.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert-feishu</artifactId>
<packaging>dolphinscheduler-plugin</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>dolphinscheduler-alert-feishu-${project.version}</finalName>
</build>
</project>

37
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannel.java

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import java.util.Map;
public class FeiShuAlertChannel implements AlertChannel {
@Override
public AlertResult process(AlertInfo alertInfo) {
AlertData alertData = alertInfo.getAlertData();
String alertParams = alertInfo.getAlertParams();
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(alertParams);
return new FeiShuSender(paramsMap).sendFeiShuMsg(alertData);
}
}

82
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactory.java

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.params.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
import org.apache.dolphinscheduler.spi.params.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import java.util.Arrays;
import java.util.List;
public class FeiShuAlertChannelFactory implements AlertChannelFactory {
@Override
public String getName() {
return "Feishu";
}
@Override
public List<PluginParams> getParams() {
InputParam webHookParam = InputParam.newBuilder(FeiShuParamsConstants.NAME_WEB_HOOK, FeiShuParamsConstants.WEB_HOOK)
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
RadioParam isEnableProxy =
RadioParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PROXY_ENABLE, FeiShuParamsConstants.NAME_FEI_SHU_PROXY_ENABLE)
.addParamsOptions(new ParamsOptions("YES", true, false))
.addParamsOptions(new ParamsOptions("NO", false, false))
.setValue(true)
.addValidate(Validate.newBuilder()
.setRequired(false)
.build())
.build();
InputParam proxyParam =
InputParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PROXY, FeiShuParamsConstants.FEI_SHU_PROXY)
.addValidate(Validate.newBuilder()
.setRequired(false).build())
.build();
InputParam portParam = InputParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PORT, FeiShuParamsConstants.FEI_SHU_PORT)
.addValidate(Validate.newBuilder()
.setRequired(false).build())
.build();
InputParam userParam =
InputParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_USER, FeiShuParamsConstants.FEI_SHU_USER)
.addValidate(Validate.newBuilder()
.setRequired(false).build())
.build();
PasswordParam passwordParam = PasswordParam.newBuilder(FeiShuParamsConstants.NAME_FEI_SHU_PASSWORD, FeiShuParamsConstants.FEI_SHU_PASSWORD)
.setPlaceholder("if enable use authentication, you need input password")
.build();
return Arrays.asList(webHookParam, isEnableProxy, proxyParam, portParam, userParam, passwordParam);
}
@Override
public AlertChannel create() {
return new FeiShuAlertChannel();
}
}

30
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertPlugin.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import com.google.common.collect.ImmutableList;
public class FeiShuAlertPlugin implements DolphinSchedulerPlugin {
@Override
public Iterable<AlertChannelFactory> getAlertChannelFactorys() {
return ImmutableList.of(new FeiShuAlertChannelFactory());
}
}

49
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuParamsConstants.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
public class FeiShuParamsConstants {
private FeiShuParamsConstants() {
throw new IllegalStateException("Utility class");
}
static final String WEB_HOOK = "webhook";
static final String NAME_WEB_HOOK = "webHook";
public static final String FEI_SHU_PROXY_ENABLE = "isEnableProxy";
static final String NAME_FEI_SHU_PROXY_ENABLE = "isEnableProxy";
static final String FEI_SHU_PROXY = "proxy";
static final String NAME_FEI_SHU_PROXY = "proxy";
static final String FEI_SHU_PORT = "port";
static final String NAME_FEI_SHU_PORT = "port";
static final String FEI_SHU_USER = "user";
static final String NAME_FEI_SHU_USER = "user";
static final String FEI_SHU_PASSWORD = "password";
static final String NAME_FEI_SHU_PASSWORD = "password";
}

223
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuSender.java

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonProperty;
public class FeiShuSender {
private static final Logger logger = LoggerFactory.getLogger(FeiShuSender.class);
private String url;
private Boolean enableProxy;
private String proxy;
private Integer port;
private String user;
private String password;
FeiShuSender(Map<String, String> config) {
url = config.get(FeiShuParamsConstants.NAME_WEB_HOOK);
enableProxy = Boolean.valueOf(config.get(FeiShuParamsConstants.NAME_FEI_SHU_PROXY_ENABLE));
if (Boolean.TRUE.equals(enableProxy)) {
port = Integer.parseInt(config.get(FeiShuParamsConstants.NAME_FEI_SHU_PORT));
proxy = config.get(FeiShuParamsConstants.NAME_FEI_SHU_PROXY);
user = config.get(FeiShuParamsConstants.NAME_FEI_SHU_USER);
password = config.get(FeiShuParamsConstants.NAME_FEI_SHU_PASSWORD);
}
}
private static RequestConfig getProxyConfig(String proxy, int port) {
HttpHost httpProxy = new HttpHost(proxy, port);
return RequestConfig.custom().setProxy(httpProxy).build();
}
private static String textToJsonString(AlertData alertData) {
Map<String, Object> items = new HashMap<>(2);
items.put("msg_type", "text");
Map<String, String> textContent = new HashMap<>();
byte[] byt = StringUtils.getBytesUtf8(formatContent(alertData));
String txt = StringUtils.newStringUtf8(byt);
textContent.put("text", txt);
items.put("content", textContent);
return JSONUtils.toJsonString(items);
}
private static AlertResult checkSendFeiShuSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
if (org.apache.dolphinscheduler.spi.utils.StringUtils.isBlank(result)) {
alertResult.setMessage("send fei shu msg error");
logger.info("send fei shu msg error,fei shu server resp is null");
return alertResult;
}
FeiShuSendMsgResponse sendMsgResponse = JSONUtils.parseObject(result, FeiShuSendMsgResponse.class);
if (null == sendMsgResponse) {
alertResult.setMessage("send fei shu msg fail");
logger.info("send fei shu msg error,resp error");
return alertResult;
}
if (sendMsgResponse.statusCode == 0) {
alertResult.setStatus("true");
alertResult.setMessage("send fei shu msg success");
return alertResult;
}
alertResult.setMessage(String.format("alert send fei shu msg error : %s", sendMsgResponse.getStatusMessage()));
logger.info("alert send fei shu msg error : {} ,Extra : {} ", sendMsgResponse.getStatusMessage(), sendMsgResponse.getExtra());
return alertResult;
}
public static String formatContent(AlertData alertData) {
if (alertData.getContent() != null) {
List<Map> list;
try {
list = JSONUtils.toList(alertData.getContent(), Map.class);
} catch (Exception e) {
logger.error("json format exception", e);
return null;
}
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`%n", alertData.getTitle()));
for (Map map : list) {
Iterator<Entry<String, Object>> entries = map.entrySet().iterator();
while (entries.hasNext()) {
Entry<String, Object> entry = entries.next();
String key = entry.getKey();
String value = entry.getValue().toString();
contents.append(key + ":" + value);
contents.append("\n");
}
}
return contents.toString();
}
return null;
}
public AlertResult sendFeiShuMsg(AlertData alertData) {
AlertResult alertResult;
try {
String resp = sendMsg(alertData);
return checkSendFeiShuSendMsgResult(resp);
} catch (Exception e) {
logger.info("send fei shu alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send fei shu alert fail.");
}
return alertResult;
}
private String sendMsg(AlertData alertData) throws IOException {
String msgToJson = textToJsonString(alertData);
HttpPost httpPost = HttpRequestUtil.constructHttpPost(url, msgToJson);
CloseableHttpClient httpClient;
httpClient = HttpRequestUtil.getHttpClient(enableProxy, proxy, port, user, password);
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
logger.error("send feishu message error, return http status code: " + statusCode);
}
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, "utf-8");
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send title :{} ,content :{}, resp: {}", alertData.getTitle(), alertData.getContent(), resp);
return resp;
} finally {
httpClient.close();
}
}
public static class FeiShuSendMsgResponse {
@JsonProperty("Extra")
private String extra;
@JsonProperty("StatusCode")
private Integer statusCode;
@JsonProperty("StatusMessage")
private String statusMessage;
public String getExtra() {
return extra;
}
public void setExtra(String extra) {
this.extra = extra;
}
public Integer getStatusCode() {
return statusCode;
}
public void setStatusCode(Integer statusCode) {
this.statusCode = statusCode;
}
public String getStatusMessage() {
return statusMessage;
}
public void setStatusMessage(String statusMessage) {
this.statusMessage = statusMessage;
}
}
}

50
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/main/java/org/apache/dolphinscheduler/plugin/alert/feishu/HttpRequestUtil.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
public class HttpRequestUtil {
public static CloseableHttpClient getHttpClient(boolean enableProxy, String proxy, Integer port, String user, String password) {
if (enableProxy) {
HttpHost httpProxy = new HttpHost(proxy, port);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, password));
return HttpClients.custom().setDefaultCredentialsProvider(provider).build();
} else {
return HttpClients.createDefault();
}
}
public static HttpPost constructHttpPost(String url, String msg) {
HttpPost post = new HttpPost(url);
StringEntity entity = new StringEntity(msg, ContentType.APPLICATION_JSON);
post.setEntity(entity);
return post;
}
}

45
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/test/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuAlertChannelFactoryTest.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class FeiShuAlertChannelFactoryTest {
@Test
public void testGetParams() {
FeiShuAlertChannelFactory feiShuAlertChannelFactory = new FeiShuAlertChannelFactory();
List<PluginParams> params = feiShuAlertChannelFactory.getParams();
JSONUtils.toJsonString(params);
Assert.assertEquals(6, params.size());
}
@Test
public void testCreate() {
FeiShuAlertChannelFactory feiShuAlertChannelFactory = new FeiShuAlertChannelFactory();
AlertChannel alertChannel = feiShuAlertChannelFactory.create();
Assert.assertNotNull(alertChannel);
}
}

75
dolphinscheduler-alert-plugin/dolphinscheduler-alert-feishu/src/test/java/org/apache/dolphinscheduler/plugin/alert/feishu/FeiShuSenderTest.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.feishu;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class FeiShuSenderTest {
private static Map<String, String> feiShuConfig = new HashMap<>();
@Before
public void initFeiShuConfig() {
feiShuConfig.put(FeiShuParamsConstants.WEB_HOOK, "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx");
}
@Test
public void testSend() {
AlertData alertData = new AlertData();
alertData.setTitle("feishu test title");
alertData.setContent("feishu test content");
FeiShuSender feiShuSender = new FeiShuSender(feiShuConfig);
AlertResult alertResult = feiShuSender.sendFeiShuMsg(alertData);
Assert.assertEquals("false", alertResult.getStatus());
}
@Test
public void testFormatContent() {
String alertMsg = "[\n"
+ " {\n"
+ " \"owner\": \"dolphinscheduler\",\n"
+ " \"processEndTime\": \"2021-01-29 19:01:11\",\n"
+ " \"processHost\": \"10.81.129.4:5678\",\n"
+ " \"processId\": 2926,\n"
+ " \"processName\": \"3-20210129190038108\",\n"
+ " \"processStartTime\": \"2021-01-29 19:00:38\",\n"
+ " \"processState\": \"SUCCESS\",\n"
+ " \"processType\": \"START_PROCESS\",\n"
+ " \"projectId\": 2,\n"
+ " \"projectName\": \"testdelproject\",\n"
+ " \"recovery\": \"NO\",\n"
+ " \"retryTimes\": 0,\n"
+ " \"runTimes\": 1,\n"
+ " \"taskId\": 0\n"
+ " }\n"
+ "]";
AlertData alertData = new AlertData();
alertData.setTitle("");
alertData.setContent(alertMsg);
Assert.assertNotNull(FeiShuSender.formatContent(alertData));
}
}

1
dolphinscheduler-alert-plugin/pom.xml

@ -35,6 +35,7 @@
<module>dolphinscheduler-alert-dingtalk</module>
<module>dolphinscheduler-alert-script</module>
<module>dolphinscheduler-alert-http</module>
<module>dolphinscheduler-alert-feishu</module>
</modules>

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -153,19 +153,21 @@ public class WorkerGroupService extends BaseService {
}
}
// available workerGroup list
List<String> availableWorkerGroupList = new ArrayList<>();
for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + "/" + workerGroup;
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
String timeStamp = "";
for (int i = 0; i < childrenNodes.size(); i++) {
String ip = childrenNodes.get(i);
childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":")));
timeStamp = ip.substring(ip.lastIndexOf(":"));
}
if (CollectionUtils.isNotEmpty(childrenNodes)) {
availableWorkerGroupList.add(workerGroup);
WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup);
if (isPaging) {
wg.setIpList(childrenNodes);
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0));
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp);
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
}

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -61,4 +61,8 @@ public class StringUtils {
public static String trim(String str) {
return str == null ? null : str.trim();
}
public static boolean equalsIgnoreCase(String str1, String str2) {
return str1 == null ? str2 == null : str1.equalsIgnoreCase(str2);
}
}

21
dolphinscheduler-dist/release-docs/licenses/ui-licenses/LICENSE-@form-create-element-ui vendored

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2018 xaboy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

5
dolphinscheduler-dist/src/main/provisio/dolphinscheduler.xml vendored

@ -64,4 +64,9 @@
<unpack/>
</artifact>
</artifactSet>
<artifactSet to="lib/plugin/alert/feishu">
<artifact id="${project.groupId}:dolphinscheduler-alert-feishu:zip:${project.version}">
<unpack/>
</artifact>
</artifactSet>
</runtime>

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.bean;
import org.springframework.beans.BeansException;
@ -31,9 +32,7 @@ public class SpringApplicationContext implements ApplicationContextAware {
SpringApplicationContext.applicationContext = applicationContext;
}
public static <T> T getBean(Class<T> requiredType){
public static <T> T getBean(Class<T> requiredType) {
return applicationContext.getBean(requiredType);
}
}

52
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/ServiceException.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
/**
* Custom ZKServerException exception
*/
public class ServiceException extends RuntimeException {
/**
* Construct a new runtime exception with the error message
*
* @param errMsg Error message
*/
public ServiceException(String errMsg) {
super(errMsg);
}
/**
* Construct a new runtime exception with the cause
*
* @param cause cause
*/
public ServiceException(Throwable cause) {
super(cause);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param errMsg message
* @param cause cause
*/
public ServiceException(String errMsg, Throwable cause) {
super(errMsg, cause);
}
}

46
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/permission/PermissionCheck.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
@ -21,11 +22,13 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.List;
import org.slf4j.Logger;
public class PermissionCheck<T> {
/**
* logger
@ -58,6 +61,7 @@ public class PermissionCheck<T> {
/**
* permission check
*
* @param authorizationType authorization type
* @param processService process dao
*/
@ -68,10 +72,6 @@ public class PermissionCheck<T> {
/**
* permission check
* @param authorizationType
* @param processService
* @param needChecks
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
@ -82,11 +82,6 @@ public class PermissionCheck<T> {
/**
* permission check
* @param authorizationType
* @param processService
* @param needChecks
* @param userId
* @param logger
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, T[] needChecks, int userId, Logger logger) {
this.authorizationType = authorizationType;
@ -98,13 +93,8 @@ public class PermissionCheck<T> {
/**
* permission check
* @param logger
* @param authorizationType
* @param processService
* @param resourceList
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId,Logger logger) {
public PermissionCheck(AuthorizationType authorizationType, ProcessService processService, List<ResourceInfo> resourceList, int userId, Logger logger) {
this.authorizationType = authorizationType;
this.processService = processService;
this.resourceList = resourceList;
@ -154,9 +144,10 @@ public class PermissionCheck<T> {
/**
* has permission
*
* @return true if has permission
*/
public boolean hasPermission(){
public boolean hasPermission() {
try {
checkPermission();
return true;
@ -167,23 +158,24 @@ public class PermissionCheck<T> {
/**
* check permission
* @throws Exception exception
*
* @throws ServiceException exception
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
public void checkPermission() throws ServiceException {
if (this.needChecks.length > 0) {
// get user type in order to judge whether the user is admin
User user = processService.getUserById(userId);
if (user == null) {
logger.error("user id {} didn't exist",userId);
throw new RuntimeException(String.format("user %s didn't exist",userId));
logger.error("user id {} doesn't exist", userId);
throw new ServiceException(String.format("user %s doesn't exist", userId));
}
if (user.getUserType() != UserType.ADMIN_USER){
List<T> unauthorizedList = processService.listUnauthorized(userId,needChecks,authorizationType);
if (user.getUserType() != UserType.ADMIN_USER) {
List<T> unauthorizedList = processService.listUnauthorized(userId, needChecks, authorizationType);
// if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList);
throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
if (CollectionUtils.isNotEmpty(unauthorizedList)) {
logger.error("user {} doesn't have permission of {}: {}", user.getUserName(), authorizationType.getDescp(), unauthorizedList);
throw new ServiceException(String.format("user %s doesn't have permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
}
}
}

75
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -84,6 +84,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -110,7 +111,7 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
@ -167,7 +168,7 @@ public class ProcessService {
@Transactional(rollbackFor = Exception.class)
public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host);
//cannot construct process instance, return null;
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command);
moveToErrorCommand(command, "process instance is null");
@ -259,7 +260,7 @@ public class ProcessService {
*/
public Boolean verifyIsNeedCreateCommand(Command command) {
Boolean isNeedCreate = true;
Map<CommandType, Integer> cmdTypeMap = new HashMap<CommandType, Integer>();
EnumMap<CommandType, Integer> cmdTypeMap = new EnumMap<>(CommandType.class);
cmdTypeMap.put(CommandType.REPEAT_RUNNING, 1);
cmdTypeMap.put(CommandType.RECOVER_SUSPENDED_PROCESS, 1);
cmdTypeMap.put(CommandType.START_FAILURE_TASK_PROCESS, 1);
@ -296,9 +297,6 @@ public class ProcessService {
/**
* get task node list by definitionId
*
* @param defineId
* @return
*/
public List<TaskNode> getTaskNodeListByDefinitionId(Integer defineId) {
ProcessDefinition processDefinition = processDefineMapper.selectById(defineId);
@ -435,7 +433,7 @@ public class ProcessService {
List<TaskNode> taskNodeList = processData.getTasks();
if (taskNodeList != null && taskNodeList.size() > 0) {
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskNode taskNode : taskNodeList) {
String parameter = taskNode.getParams();
@ -514,11 +512,9 @@ public class ProcessService {
*/
private Date getScheduleTime(Command command, Map<String, String> cmdParam) {
Date scheduleTime = command.getScheduleTime();
if (scheduleTime == null) {
if (cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMDPARAM_COMPLEMENT_DATA_START_DATE)) {
scheduleTime = DateUtils.stringToDate(cmdParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE));
}
}
return scheduleTime;
}
@ -714,7 +710,7 @@ public class ProcessService {
// generate one new process instance
processInstance = generateNewProcessInstance(processDefinition, command, cmdParam);
}
if (!checkCmdParam(command, cmdParam)) {
if (Boolean.FALSE.equals(checkCmdParam(command, cmdParam))) {
logger.error("command parameter check failed!");
return null;
}
@ -922,13 +918,12 @@ public class ProcessService {
*/
private void initTaskInstance(TaskInstance taskInstance) {
if (!taskInstance.isSubProcess()) {
if (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()) {
if (!taskInstance.isSubProcess()
&& (taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure())) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
return;
}
}
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
}
@ -1049,10 +1044,6 @@ public class ProcessService {
/**
* complement data needs transform parent parameter to child.
*
* @param instanceMap
* @param parentProcessInstance
* @return
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
// set sub work process command
@ -1071,11 +1062,6 @@ public class ProcessService {
/**
* create sub work process command
*
* @param parentProcessInstance
* @param childInstance
* @param instanceMap
* @param task
*/
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
@ -1105,8 +1091,6 @@ public class ProcessService {
/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure'
*
* @param childInstance
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
@ -1119,9 +1103,6 @@ public class ProcessService {
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
*
* @param parentProcessInstance
* @return
*/
private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
CommandType commandType = parentProcessInstance.getCommandType();
@ -1577,7 +1558,7 @@ public class ProcessService {
if (intList == null) {
return new ArrayList<>();
}
List<String> result = new ArrayList<String>(intList.size());
List<String> result = new ArrayList<>(intList.size());
for (Integer intVar : intList) {
result.add(String.valueOf(intVar));
}
@ -1727,8 +1708,8 @@ public class ProcessService {
* @throws Exception if error throws Exception
*/
public CycleDependency getCycleDependency(int masterId, int processDefinitionId, Date scheduledFireTime) throws Exception {
List<CycleDependency> list = getCycleDependencies(masterId, new int[] {processDefinitionId}, scheduledFireTime);
return list.size() > 0 ? list.get(0) : null;
List<CycleDependency> list = getCycleDependencies(masterId, new int[]{processDefinitionId}, scheduledFireTime);
return !list.isEmpty() ? list.get(0) : null;
}
@ -1742,7 +1723,7 @@ public class ProcessService {
* @throws Exception if error throws Exception
*/
public List<CycleDependency> getCycleDependencies(int masterId, int[] ids, Date scheduledFireTime) throws Exception {
List<CycleDependency> cycleDependencyList = new ArrayList<CycleDependency>();
List<CycleDependency> cycleDependencyList = new ArrayList<>();
if (null == ids || ids.length == 0) {
logger.warn("ids[] is empty!is invalid!");
return cycleDependencyList;
@ -1769,14 +1750,10 @@ public class ProcessService {
}
Calendar calendar = Calendar.getInstance();
switch (cycleEnum) {
/*case MINUTE:
calendar.add(Calendar.MINUTE,-61);*/
case HOUR:
calendar.add(Calendar.HOUR, -25);
break;
case DAY:
calendar.add(Calendar.DATE, -32);
break;
case WEEK:
calendar.add(Calendar.DATE, -32);
break;
@ -1784,7 +1761,8 @@ public class ProcessService {
calendar.add(Calendar.MONTH, -13);
break;
default:
logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleEnum.name());
String cycleName = cycleEnum.name();
logger.warn("Dependent process definition's cycleEnum is {},not support!!", cycleName);
continue;
}
Date start = calendar.getTime();
@ -1794,7 +1772,7 @@ public class ProcessService {
} else {
list = CronUtils.getFireDateList(start, scheduledFireTime, depCronExpression);
}
if (list.size() >= 1) {
if (!list.isEmpty()) {
start = list.get(list.size() - 1);
CycleDependency dependency = new CycleDependency(depSchedule.getProcessDefinitionId(), start, CronUtils.getExpirationTime(start, cycleEnum), cycleEnum);
cycleDependencyList.add(dependency);
@ -1867,6 +1845,7 @@ public class ProcessService {
/**
* query project name and user name by processInstanceId.
*
* @param processInstanceId processInstanceId
* @return projectName and userName
*/
@ -1939,30 +1918,27 @@ public class ProcessService {
* @return unauthorized udf function list
*/
public <T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType) {
List<T> resultList = new ArrayList<T>();
List<T> resultList = new ArrayList<>();
if (Objects.nonNull(needChecks) && needChecks.length > 0) {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
Set<T> originResSet = new HashSet<>(Arrays.asList(needChecks));
switch (authorizationType) {
case RESOURCE_FILE_ID:
Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
case UDF_FILE:
Set<Integer> authorizedResourceFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(Resource::getId).collect(toSet());
originResSet.removeAll(authorizedResourceFiles);
break;
case RESOURCE_FILE_NAME:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getFullName()).collect(toSet());
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(Resource::getFullName).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case UDF_FILE:
Set<Integer> authorizedUdfFiles = resourceMapper.listAuthorizedResourceById(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfFiles);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId, needChecks).stream().map(DataSource::getId).collect(toSet());
originResSet.removeAll(authorizedDatasources);
break;
case UDF:
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(UdfFunc::getId).collect(toSet());
originResSet.removeAll(authorizedUdfs);
break;
default:
@ -2007,9 +1983,6 @@ public class ProcessService {
/**
* format task app id in task instance
*
* @param taskInstance
* @return
*/
public String formatTaskAppId(TaskInstance taskInstance) {
ProcessDefinition definition = this.findProcessDefineById(taskInstance.getProcessDefinitionId());

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/ProcessScheduleJob.java

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz;
package org.apache.dolphinscheduler.service.quartz;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -25,6 +25,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
@ -34,8 +37,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Date;
/**
* process schedule job
*/
@ -46,7 +47,7 @@ public class ProcessScheduleJob implements Job {
*/
private static final Logger logger = LoggerFactory.getLogger(ProcessScheduleJob.class);
public ProcessService getProcessService(){
public ProcessService getProcessService() {
return SpringApplicationContext.getBean(ProcessService.class);
}
@ -66,10 +67,8 @@ public class ProcessScheduleJob implements Job {
int projectId = dataMap.getInt(Constants.PROJECT_ID);
int scheduleId = dataMap.getInt(Constants.SCHEDULE_ID);
Date scheduledFireTime = context.getScheduledFireTime();
Date fireTime = context.getFireTime();
logger.info("scheduled fire time :{}, fire time :{}, process id :{}", scheduledFireTime, fireTime, scheduleId);
@ -82,11 +81,10 @@ public class ProcessScheduleJob implements Job {
return;
}
ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
// release state : online/offline
ReleaseState releaseState = processDefinition.getReleaseState();
if (processDefinition == null || releaseState == ReleaseState.OFFLINE) {
if (releaseState == ReleaseState.OFFLINE) {
logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
return;
}
@ -107,7 +105,6 @@ public class ProcessScheduleJob implements Job {
getProcessService().createCommand(command);
}
/**
* delete job
*/

154
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java

@ -14,15 +14,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz;
import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_CLASS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT;
import static org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCENAME;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_GROUP_PRIFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_JOB_PRIFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_MISFIRETHRESHOLD;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_TABLE_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADCOUNT;
import static org.apache.dolphinscheduler.common.Constants.QUARTZ_THREADPRIORITY;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE;
import static org.apache.dolphinscheduler.common.Constants.SCHEDULE_ID;
import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
import static org.apache.dolphinscheduler.common.Constants.STRING_FALSE;
import static org.apache.dolphinscheduler.common.Constants.STRING_TRUE;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.quartz.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.jdbcjobstore.JobStoreTX;
import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
@ -32,15 +93,6 @@ import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
/**
* single Quartz executors instance
*/
@ -70,28 +122,27 @@ public class QuartzExecutors {
private static final QuartzExecutors instance = new QuartzExecutors();
}
private QuartzExecutors() {
try {
conf = new PropertiesConfiguration(QUARTZ_PROPERTIES_PATH);
init();
}catch (ConfigurationException e){
logger.warn("not loaded quartz configuration file, will used default value",e);
} catch (ConfigurationException e) {
logger.warn("not loaded quartz configuration file, will used default value", e);
}
}
/**
* thread safe and performance promote
*
* @return instance of Quartz Executors
*/
public static QuartzExecutors getInstance() {
return Holder.instance;
}
/**
* init
*
* <p>
* Returns a client-usable handle to a Scheduler.
*/
private void init() {
@ -100,33 +151,33 @@ public class QuartzExecutors {
Properties properties = new Properties();
String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(SPRING_DATASOURCE_DRIVER_CLASS_NAME);
if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)){
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
if (dataSourceDriverClass.equals(ORG_POSTGRESQL_DRIVER)) {
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
} else {
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, conf.getString(ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
}
properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCENAME, QUARTZ_INSTANCENAME));
properties.setProperty(ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(ORG_QUARTZ_SCHEDULER_INSTANCEID, QUARTZ_INSTANCEID));
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES,STRING_FALSE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED,STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK,conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
properties.setProperty(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, conf.getString(ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON, STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, conf.getString(ORG_QUARTZ_JOBSTORE_USEPROPERTIES, STRING_FALSE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_CLASS, conf.getString(ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
properties.setProperty(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, conf.getString(ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS, STRING_TRUE));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADCOUNT, conf.getString(ORG_QUARTZ_THREADPOOL_THREADCOUNT, QUARTZ_THREADCOUNT));
properties.setProperty(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, conf.getString(ORG_QUARTZ_THREADPOOL_THREADPRIORITY, QUARTZ_THREADPRIORITY));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLASS, conf.getString(ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
properties.setProperty(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, conf.getString(ORG_QUARTZ_JOBSTORE_TABLEPREFIX, QUARTZ_TABLE_PREFIX));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, conf.getString(ORG_QUARTZ_JOBSTORE_ISCLUSTERED, STRING_TRUE));
properties.setProperty(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, conf.getString(ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, QUARTZ_MISFIRETHRESHOLD));
properties.setProperty(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, conf.getString(ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, QUARTZ_CLUSTERCHECKININTERVAL));
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, DruidConnectionProvider.class.getName()));
schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler();
} catch (SchedulerException e) {
logger.error(e.getMessage(),e);
logger.error(e.getMessage(), e);
System.exit(1);
}
@ -138,19 +189,20 @@ public class QuartzExecutors {
* @throws SchedulerException scheduler exception
*/
public void start() throws SchedulerException {
if (!scheduler.isStarted()){
if (!scheduler.isStarted()) {
scheduler.start();
logger.info("Quartz service started" );
logger.info("Quartz service started");
}
}
/**
* stop all scheduled tasks
*
* <p>
* Halts the Scheduler's firing of Triggers,
* and cleans up all resources associated with the Scheduler.
*
* <p>
* The scheduler cannot be re-started.
*
* @throws SchedulerException scheduler exception
*/
public void shutdown() throws SchedulerException {
@ -161,7 +213,6 @@ public class QuartzExecutors {
}
}
/**
* add task trigger , if this task already exists, return this task with updated trigger
*
@ -173,7 +224,7 @@ public class QuartzExecutors {
* @param cronExpression cron expression
* @param jobDataMap job parameters data map
*/
public void addJob(Class<? extends Job> clazz,String jobName,String jobGroupName,Date startDate, Date endDate,
public void addJob(Class<? extends Job> clazz, String jobName, String jobGroupName, Date startDate, Date endDate,
String cronExpression,
Map<String, Object> jobDataMap) {
lock.writeLock().lock();
@ -218,7 +269,7 @@ public class QuartzExecutors {
CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
String oldCronExpression = oldCronTrigger.getCronExpression();
if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) {
if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
// reschedule job trigger
scheduler.rescheduleJob(triggerKey, cronTrigger);
logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
@ -231,14 +282,12 @@ public class QuartzExecutors {
}
} catch (Exception e) {
logger.error("add job failed", e);
throw new RuntimeException("add job failed", e);
throw new ServiceException("add job failed", e);
} finally {
lock.writeLock().unlock();
}
}
/**
* delete job
*
@ -249,16 +298,16 @@ public class QuartzExecutors {
public boolean deleteJob(String jobName, String jobGroupName) {
lock.writeLock().lock();
try {
JobKey jobKey = new JobKey(jobName,jobGroupName);
if(scheduler.checkExists(jobKey)){
JobKey jobKey = new JobKey(jobName, jobGroupName);
if (scheduler.checkExists(jobKey)) {
logger.info("try to delete job, job name: {}, job group name: {},", jobName, jobGroupName);
return scheduler.deleteJob(jobKey);
}else {
} else {
return true;
}
} catch (SchedulerException e) {
logger.error("delete job : {} failed",jobName, e);
logger.error("delete job : {} failed", jobName, e);
} finally {
lock.writeLock().unlock();
}
@ -269,7 +318,6 @@ public class QuartzExecutors {
* delete all jobs in job group
*
* @param jobGroupName job group name
*
* @return true if all of the Jobs were found and deleted, false if
* one or more were not deleted.
*/
@ -282,7 +330,7 @@ public class QuartzExecutors {
return scheduler.deleteJobs(jobKeys);
} catch (SchedulerException e) {
logger.error("delete all jobs in job group: {} failed",jobGroupName, e);
logger.error("delete all jobs in job group: {} failed", jobGroupName, e);
} finally {
lock.writeLock().unlock();
}
@ -291,6 +339,7 @@ public class QuartzExecutors {
/**
* build job name
*
* @param processId process id
* @return job name
*/
@ -302,6 +351,7 @@ public class QuartzExecutors {
/**
* build job group name
*
* @param projectId project id
* @return job group name
*/

48
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/cron/AbstractCycle.java

@ -14,13 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.quartz.cron;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.Cron;
import com.cronutils.model.field.CronField;
import com.cronutils.model.field.CronFieldName;
import com.cronutils.model.field.expression.*;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import com.cronutils.model.field.expression.Always;
import com.cronutils.model.field.expression.And;
import com.cronutils.model.field.expression.Between;
import com.cronutils.model.field.expression.Every;
import com.cronutils.model.field.expression.FieldExpression;
import com.cronutils.model.field.expression.On;
/**
* Cycle
@ -42,9 +49,10 @@ public abstract class AbstractCycle {
/**
* cycle constructor
*
* @param cron cron
*/
public AbstractCycle(Cron cron) {
protected AbstractCycle(Cron cron) {
if (cron == null) {
throw new IllegalArgumentException("cron must not be null!");
}
@ -60,30 +68,32 @@ public abstract class AbstractCycle {
/**
* whether the minute field has a value
*
* @return if minute field has a value return trueelse return false
*/
protected boolean minFiledIsSetAll(){
protected boolean minFiledIsSetAll() {
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always
|| minFieldExpression instanceof Between || minFieldExpression instanceof And
|| minFieldExpression instanceof On);
}
/**
* whether the minute field has a value of every or always
*
* @return if minute field has a value of every or always return trueelse return false
*/
protected boolean minFiledIsEvery(){
protected boolean minFiledIsEvery() {
FieldExpression minFieldExpression = minField.getExpression();
return (minFieldExpression instanceof Every || minFieldExpression instanceof Always);
}
/**
* whether the hour field has a value
*
* @return if hour field has a value return trueelse return false
*/
protected boolean hourFiledIsSetAll(){
protected boolean hourFiledIsSetAll() {
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always
|| hourFieldExpression instanceof Between || hourFieldExpression instanceof And
@ -92,37 +102,40 @@ public abstract class AbstractCycle {
/**
* whether the hour field has a value of every or always
*
* @return if hour field has a value of every or always return trueelse return false
*/
protected boolean hourFiledIsEvery(){
protected boolean hourFiledIsEvery() {
FieldExpression hourFieldExpression = hourField.getExpression();
return (hourFieldExpression instanceof Every || hourFieldExpression instanceof Always);
}
/**
* whether the day Of month field has a value
*
* @return if day Of month field has a value return trueelse return false
*/
protected boolean dayOfMonthFieldIsSetAll(){
protected boolean dayOfMonthFieldIsSetAll() {
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always
|| dayOfMonthField.getExpression() instanceof Between || dayOfMonthField.getExpression() instanceof And
|| dayOfMonthField.getExpression() instanceof On);
}
/**
* whether the day Of Month field has a value of every or always
*
* @return if day Of Month field has a value of every or always return trueelse return false
*/
protected boolean dayOfMonthFieldIsEvery(){
protected boolean dayOfMonthFieldIsEvery() {
return (dayOfMonthField.getExpression() instanceof Every || dayOfMonthField.getExpression() instanceof Always);
}
/**
* whether month field has a value
*
* @return if month field has a value return trueelse return false
*/
protected boolean monthFieldIsSetAll(){
protected boolean monthFieldIsSetAll() {
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always
|| monthFieldExpression instanceof Between || monthFieldExpression instanceof And
@ -131,18 +144,20 @@ public abstract class AbstractCycle {
/**
* whether the month field has a value of every or always
*
* @return if month field has a value of every or always return trueelse return false
*/
protected boolean monthFieldIsEvery(){
protected boolean monthFieldIsEvery() {
FieldExpression monthFieldExpression = monthField.getExpression();
return (monthFieldExpression instanceof Every || monthFieldExpression instanceof Always);
}
/**
* whether the day Of week field has a value
*
* @return if day Of week field has a value return trueelse return false
*/
protected boolean dayofWeekFieldIsSetAll(){
protected boolean dayofWeekFieldIsSetAll() {
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always
|| dayOfWeekFieldExpression instanceof Between || dayOfWeekFieldExpression instanceof And
@ -151,21 +166,24 @@ public abstract class AbstractCycle {
/**
* whether the day Of week field has a value of every or always
*
* @return if day Of week field has a value of every or always return trueelse return false
*/
protected boolean dayofWeekFieldIsEvery(){
protected boolean dayofWeekFieldIsEvery() {
FieldExpression dayOfWeekFieldExpression = dayOfWeekField.getExpression();
return (dayOfWeekFieldExpression instanceof Every || dayOfWeekFieldExpression instanceof Always);
}
/**
* get cycle enum
*
* @return CycleEnum
*/
protected abstract CycleEnum getCycle();
/**
* get mini level cycle enum
*
* @return CycleEnum
*/
protected abstract CycleEnum getMiniCycle();

147
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -14,22 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* abstract zookeeper client
*/
@ -38,25 +51,23 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* remove dead server by host
*
* @param host host
* @param serverType serverType
* @throws Exception
*/
public void removeDeadServerByHost(String host, String serverType) throws Exception {
public void removeDeadServerByHost(String host, String serverType) {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
@ -64,25 +75,24 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
* @param zNode node path
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) {
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
}else if(opType.equals(ADD_ZK_OP)){
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if(!super.isExisted(deadServerPath)){
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath,(type + UNDERLINE + host));
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
@ -90,51 +100,52 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* get active master num
*
* @return active master number
*/
public int getActiveMasterNum(){
public int getActiveMasterNum() {
List<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
}
} catch (Exception e) {
logger.error("getActiveMasterNum error",e);
logger.error("getActiveMasterNum error", e);
}
return childrenList.size();
}
/**
*
* @return zookeeper quorum
*/
public String getZookeeperQuorum(){
public String getZookeeperQuorum() {
return getZookeeperConfig().getServerList();
}
/**
* get server list.
*
* @param zkNodeType zookeeper node type
* @return server list
*/
public List<Server> getServersList(ZKNodeType zkNodeType){
public List<Server> getServersList(ZKNodeType zkNodeType) {
Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType);
List<Server> masterServers = new ArrayList<>();
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
if(masterServer == null){
if (masterServer == null) {
continue;
}
String key = entry.getKey();
masterServer.setZkDirectory(parentPath + "/"+ key);
masterServer.setZkDirectory(parentPath + "/" + key);
//set host and port
String[] hostAndPort=key.split(COLON);
String[] hosts=hostAndPort[0].split(DIVISION_STRING);
String[] hostAndPort = key.split(COLON);
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
masterServer.setHost(hosts[hosts.length-1]);
masterServer.setHost(hosts[hosts.length - 1]);
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServers.add(masterServer);
}
@ -143,26 +154,27 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* get master server list map.
*
* @param zkNodeType zookeeper node type
* @return result : {host : resource info}
*/
public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
Map<String, String> masterMap = new HashMap<>();
try {
String path = getZNodeParentPath(zkNodeType);
List<String> serverList = super.getChildrenKeys(path);
if(zkNodeType == ZKNodeType.WORKER){
if (zkNodeType == ZKNodeType.WORKER) {
List<String> workerList = new ArrayList<>();
for(String group : serverList){
for (String group : serverList) {
List<String> groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
for(String groupServer : groupServers){
for (String groupServer : groupServers) {
workerList.add(group + Constants.SLASH + groupServer);
}
}
serverList = workerList;
}
for(String server : serverList){
for (String server : serverList) {
masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
}
} catch (Exception e) {
@ -174,20 +186,21 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* check the zookeeper node already exists
*
* @param host host
* @param zkNodeType zookeeper node type
* @return true if exists
*/
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
String path = getZNodeParentPath(zkNodeType);
if(StringUtils.isEmpty(path)){
if (StringUtils.isEmpty(path)) {
logger.error("check zk node exists error, host:{}, zk node type:{}",
host, zkNodeType.toString());
host, zkNodeType);
return false;
}
Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){
if(hostKey.contains(host)){
for (String hostKey : serverMaps.keySet()) {
if (hostKey.contains(host)) {
return true;
}
}
@ -195,37 +208,33 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
}
/**
*
* @return get worker node parent path
*/
protected String getWorkerZNodeParentPath(){
protected String getWorkerZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
}
/**
*
* @return get master node parent path
*/
protected String getMasterZNodeParentPath(){
protected String getMasterZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
}
/**
*
* @return get master lock path
*/
public String getMasterLockPath(){
public String getMasterLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
}
/**
*
* @param zkNodeType zookeeper node type
* @return get zookeeper node parent path
*/
public String getZNodeParentPath(ZKNodeType zkNodeType) {
String path = "";
switch (zkNodeType){
switch (zkNodeType) {
case MASTER:
return getMasterZNodeParentPath();
case WORKER:
@ -239,50 +248,47 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
}
/**
*
* @return get dead server node parent path
*/
protected String getDeadZNodeParentPath(){
protected String getDeadZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/**
*
* @return get master start up lock path
*/
public String getMasterStartUpLockPath(){
public String getMasterStartUpLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
}
/**
*
* @return get master failover lock path
*/
public String getMasterFailoverLockPath(){
public String getMasterFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
}
/**
*
* @return get worker failover lock path
*/
public String getWorkerFailoverLockPath(){
public String getWorkerFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
}
/**
* release mutex
*
* @param mutex mutex
*/
public void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){
if (mutex != null) {
try {
mutex.release();
} catch (Exception e) {
if("instance must be started before calling this method".equals(e.getMessage())){
if ("instance must be started before calling this method".equals(e.getMessage())) {
logger.warn("lock release");
}else{
logger.error("lock release failed",e);
} else {
logger.error("lock release failed", e);
}
}
@ -292,7 +298,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* init system znode
*/
protected void initSystemZNode(){
protected void initSystemZNode() {
try {
persist(getMasterZNodeParentPath(), "");
persist(getWorkerZNodeParentPath(), "");
@ -300,22 +306,23 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
logger.info("initialize server nodes success.");
} catch (Exception e) {
logger.error("init system znode failed",e);
logger.error("init system znode failed", e);
}
}
/**
* get host ip, string format: masterParentPath/ip
*
* @param path path
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
if(StringUtils.isEmpty(path)){
if (StringUtils.isEmpty(path)) {
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if(pathArray.length < 1){
if (pathArray.length < 1) {
logger.error("parse ip error: {}", path);
return "";
}
@ -325,11 +332,11 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
@Override
public String toString() {
return "AbstractZKClient{" +
"zkClient=" + getZkClient() +
", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
'}';
return "AbstractZKClient{"
+ "zkClient=" + getZkClient()
+ ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\''
+ ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\''
+ '}';
}
}

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java

@ -14,9 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
@ -25,18 +30,16 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/**
* Shared Curator zookeeper client
*/
@ -49,7 +52,6 @@ public class CuratorZookeeperClient implements InitializingBean {
private CuratorFramework zkClient;
@Override
public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient();
@ -91,7 +93,7 @@ public class CuratorZookeeperClient implements InitializingBean {
zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ServiceException(ex);
}
return zkClient;
}

31
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZKServer.java

@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* just speed experience version
@ -51,10 +54,10 @@ public class ZKServer {
ZKServer zkServer;
if (args.length == 0) {
zkServer = new ZKServer();
} else if (args.length == 1){
zkServer = new ZKServer(Integer.valueOf(args[0]), "");
} else if (args.length == 1) {
zkServer = new ZKServer(Integer.parseInt(args[0]), "");
} else {
zkServer = new ZKServer(Integer.valueOf(args[0]), args[1]);
zkServer = new ZKServer(Integer.parseInt(args[0]), args[1]);
}
zkServer.registerHook();
zkServer.start();
@ -73,7 +76,7 @@ public class ZKServer {
}
private void registerHook() {
/**
/*
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
@ -90,7 +93,7 @@ public class ZKServer {
}
}
public boolean isStarted(){
public boolean isStarted() {
return isStarted.get();
}
@ -119,8 +122,8 @@ public class ZKServer {
if (file.exists()) {
logger.warn("The path of zk server exists");
}
logger.info("zk server starting, data dir path:{}" , zkDataDir);
startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
logger.info("zk server starting, data dir path:{}", zkDataDir);
startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME, "60");
}
/**
@ -131,7 +134,7 @@ public class ZKServer {
* @param tickTime zk tick time
* @param maxClientCnxns zk max client connections
*/
private void startLocalZkServer(final int port, final String dataDirPath,final int tickTime,String maxClientCnxns) {
private void startLocalZkServer(final int port, final String dataDirPath, final int tickTime, String maxClientCnxns) {
if (isStarted.compareAndSet(false, true)) {
zooKeeperServerMain = new PublicZooKeeperServerMain();
logger.info("Zookeeper data path : {} ", dataDirPath);
@ -144,8 +147,7 @@ public class ZKServer {
zooKeeperServerMain.initializeAndRun(args);
} catch (QuorumPeerConfig.ConfigException | IOException e) {
logger.warn("Caught exception while starting ZK", e);
throw new RuntimeException(e);
throw new ServiceException("Caught exception while starting ZK", e);
}
}
}
@ -159,7 +161,7 @@ public class ZKServer {
logger.info("zk server stopped");
} catch (Exception e) {
logger.error("Failed to stop ZK ",e);
logger.error("Failed to stop ZK ", e);
}
}
@ -180,8 +182,7 @@ public class ZKServer {
org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
}
} catch (Exception e) {
logger.warn("Caught exception while stopping ZK server", e);
throw new RuntimeException(e);
throw new ServiceException("Caught exception while starting ZK", e);
}
}
}

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -14,21 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
@ -36,6 +39,7 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
private TreeCache treeCache;
/**
* register a unified listener of /${dsRoot},
*/
@ -59,14 +63,16 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
throw new ServiceException(e);
}
}
//for sub class
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path){}
protected void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path) {
// Used by sub class
}
public String getFromCache(final String cachePath, final String key) {
public String getFromCache(final String key) {
ChildData resultInCache = treeCache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
@ -74,11 +80,11 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
return null;
}
public TreeCache getTreeCache(final String cachePath) {
public TreeCache getTreeCache() {
return treeCache;
}
public void addListener(TreeCacheListener listener){
public void addListener(TreeCacheListener listener) {
this.treeCache.getListenable().addListener(listener);
}

43
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -14,13 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
@ -29,18 +33,16 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/**
* zk base operator
*/
@ -64,19 +66,23 @@ public class ZookeeperOperator implements InitializingBean {
/**
* this method is for sub class,
*/
protected void registerListener(){}
protected void registerListener() {
// Used by sub class
}
protected void treeCacheStart(){}
protected void treeCacheStart() {
// Used by sub class
}
public void initStateLister() {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState) -> {
if(newState == ConnectionState.LOST){
if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
} else if(newState == ConnectionState.RECONNECTED){
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
} else if(newState == ConnectionState.SUSPENDED){
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("connection SUSPENDED to zookeeper");
}
});
@ -85,7 +91,8 @@ public class ZookeeperOperator implements InitializingBean {
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),
"zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
@ -114,7 +121,7 @@ public class ZookeeperOperator implements InitializingBean {
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
throw new RuntimeException(ex);
throw new ServiceException(ex);
}
return zkClient;
}
@ -138,12 +145,12 @@ public class ZookeeperOperator implements InitializingBean {
throw new IllegalStateException(ex);
} catch (Exception ex) {
logger.error("getChildrenKeys key : {}", key, ex);
throw new RuntimeException(ex);
throw new ServiceException(ex);
}
}
public boolean hasChildren(final String key){
Stat stat ;
public boolean hasChildren(final String key) {
Stat stat;
try {
stat = zkClient.checkExists().forPath(key);
return stat.getNumChildren() >= 1;

1
dolphinscheduler-ui/src/js/conf/home/pages/datasource/pages/list/index.vue

@ -22,6 +22,7 @@
<el-button size="mini" @click="_create('')">{{$t('Create Datasource')}}</el-button>
<el-dialog
:title="item ?($t('Edit')+$t('Datasource')) : ($t('Create')+$t('Datasource'))"
v-if="dialogVisible"
:visible.sync="dialogVisible"
width="auto"
:append-to-body="true">

2
pom.xml

@ -973,6 +973,8 @@
<include>**/plugin/alert/script/ScriptSenderTest.java</include>
<include>**/plugin/alert/http/HttpAlertChannelFactoryTest.java</include>
<include>**/plugin/alert/http/HttpAlertChannelTest.java</include>
<include>**/plugin/alert/feishu/FeiShuAlertChannelFactoryTest.java</include>
<include>**/plugin/alert/feishu/FeiShuSenderTest.java</include>
<include>**/plugin/alert/http/HttpAlertPluginTest.java</include>
<include>**/plugin/alert/http/HttpSenderTest.java</include>
<include>**/spi/params/PluginParamsTransferTest.java</include>

Loading…
Cancel
Save