Browse Source

[feature#3356] alert-spi support DingTalk&WeChat (#3869)

* [feature-3665][ui]Add element-ui (#3666)

* [feature-3665][ui]Add element-ui

* add license

* Add form-create plug-in and alarm group management add sample demo

* Modify node version

* fix

* fix

* [feature][ui]Alert plugin design (#3734)

* [feature-3665][ui]Add element-ui (#3666)

* [feature-3665][ui]Add element-ui

* add license

* Add form-create plug-in and alarm group management add sample demo

* Modify node version

* fix

* fix

* [Feature-3682][ui]Add form-create plug-in and alarm group management add sample demo (#3683)

* Add form-create plug-in and alarm group management add sample demo

* Modify node version

* fix

* fix

* [feature][ui] Add alarm instance page

* [feature-3665][ui]Add element-ui (#3666)

* [feature-3665][ui]Add element-ui

* add license

* Add form-create plug-in and alarm group management add sample demo

* Modify node version

* fix

* fix

* [Feature-3189][alert,spi,dao,plugin-api] base code of dolphinscheduler spi and alert plugin implement (#3601)

* DS SPI

* Add DolphinScheduler SPI , and rebuilt the code of the Alert plug-in based on SPI

* Add DolphinScheduler SPI , and rebuilt the code of the Alert plug-in based on SPI

* add TODO

* delete

* compile

* spi commit

* Plugin Alert

* fix some bug

* add todo

* change web ui from alpacajs to form-create

* remove module

* add plugin schema

* add license header

* update alert and spi module version

* update the alert plugin sub module version

* comment the maven.local.repository param

* move utils from spi to common module

* add license header

* add license header and delete some chinese comment

* update spi packages

* delete no use alert_xx.properties

* update mysql.connector.version back to 5.1.34

* delete no use comment in pom.xml

* update email stmp password

* add license

* add semicolon to sql/upgrade/1.4.0_schema/mysql/dolphinscheduler_ddl.sql file

* format the code style

* format new clase file with checkstyle

* update plugin params to Builder model

* move JSONUtils to SPI because plugin can not dependency common module

* move JSONUtils to SPI because plugin can not dependency common module

* delete collection dependency

* replace PluginParamsTransfer to spi PluginParamsTransfer

* update dolphinscheduler-maven-plugin to 1.0.0

* update license

* update apache-rat-plugin add exclude '.iml' file

* check license

* ArtifactResolver only use in development and configPlugins is not empty

* ArtifactResolver only use in development and configPlugins is not empty

* ArtifactResolver only use in development and configPlugins is not empty

* default datasource should be postgresql

* add license files

* add license files

* postgresql port should be 5432

* postgresql test

* mv show_type to spi

add license header to AlertConstants

* check style fix

* copy check style file from branch dev

* alert show_type set by plugin

* alert show_type set by plugin

* add PluginDefineMapper to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/DependencyConfig.java

* add Bean to TaskCallbackServiceTestConfig

* add Bean to TaskCallbackServiceTestConfig

* fix check style

* check style fix

* [feature-3665][ui]Add element-ui (#3666)

* [feature-3665][ui]Add element-ui

* add license

* fix check style

* [Feature-3682][ui]Add form-create plug-in and alarm group management add sample demo (#3683)

* Add form-create plug-in and alarm group management add sample demo

* Modify node version

* fix

* fix

* check style fix

* rollback test change

* rollback test change

* rollback dao pom change

* [feature-3665][ui]Add element-ui (#3666)

* [feature-3665][ui]Add element-ui

* add license

* Add form-create plug-in and alarm group management add sample demo

* Modify node version

* fix

* fix

* add ut to pom.xml

* add upgrade schema to global schema

* fix ut failed

* fix ut failed

* fix ut failed

* fix ut failed

* add test EmailAlertPluginTest to pom.xml

* fix ut failed

* fix ut failed

* fix check style

* update license header to presto license header

* presto license header not check

* fix ut coverage

* fix ut coverage

* fix ut

* fix ut

* fix ut

* fix ut coverage

* fix ut coverage

* fix ut coverage

* fix ut coverage

* fix ut coverage

* fix ut coverage

Co-authored-by: break60 <790061044@qq.com>

* [feature#3356] alert-spi support DingTalk

this closes # 3356

* add test

* code style

* we chat alert

* support we chat alert

* support we chat alert

* support we chat alert,update ding talk alert

* code style

* add test

* code style

* clean old code

* clean old code

* code smell

* code style

* add test

* simple config

* code style

* code style

* code style

* delete old file

* fix log content error

Co-authored-by: break60 <790061044@qq.com>
Co-authored-by: gaojun2048 <32193458+gaojun2048@users.noreply.github.com>
pull/3/MERGE
Kirs 4 years ago committed by GitHub
parent
commit
449cbbe21e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/pom.xml
  2. 41
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannel.java
  3. 89
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactory.java
  4. 34
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertPlugin.java
  5. 54
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkParamsConstants.java
  6. 204
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSender.java
  7. 48
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/test/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactoryTest.java
  8. 57
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/test/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSenderTest.java
  9. 42
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/pom.xml
  10. 41
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannel.java
  11. 94
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactory.java
  12. 34
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertConstants.java
  13. 56
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertParamsConstants.java
  14. 34
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertPlugin.java
  15. 328
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatSender.java
  16. 48
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/test/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactoryTest.java
  17. 89
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/test/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatSenderTest.java
  18. 55
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/DingTalkManager.java
  19. 63
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java
  20. 138
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java
  21. 290
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
  22. 110
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java
  23. 307
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java
  24. 2
      dolphinscheduler-spi/pom.xml
  25. 4
      pom.xml

46
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/pom.xml

@ -29,4 +29,50 @@
<artifactId>dolphinscheduler-alert-dingtalk</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

41
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannel.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import java.util.Map;
/**
* DingTalkAlertChannel
*/
public class DingTalkAlertChannel implements AlertChannel {
@Override
public AlertResult process(AlertInfo alertInfo) {
AlertData alertData = alertInfo.getAlertData();
String alertParams = alertInfo.getAlertParams();
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(alertParams);
return new DingTalkSender(paramsMap).sendDingTalkMsg(alertData.getTitle(), alertData.getContent());
}
}

89
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactory.java

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.params.InputParam;
import org.apache.dolphinscheduler.spi.params.PasswordParam;
import org.apache.dolphinscheduler.spi.params.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import java.util.Arrays;
import java.util.List;
/**
* DingTalkAlertChannelFactory
*/
public class DingTalkAlertChannelFactory implements AlertChannelFactory {
@Override
public String getName() {
return "ding talk alert";
}
@Override
public List<PluginParams> getParams() {
InputParam webHookParam = InputParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_WEB_HOOK, DingTalkParamsConstants.DING_TALK_WEB_HOOK)
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam keywordParam = InputParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_KEYWORD, DingTalkParamsConstants.DING_TALK_KEYWORD)
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
RadioParam isEnableProxy =
RadioParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE)
.addParamsOptions(new ParamsOptions("YES", true, false))
.addParamsOptions(new ParamsOptions("NO", false, false))
.setValue(true)
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam proxyParam =
InputParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_PROXY, DingTalkParamsConstants.DING_TALK_PROXY)
.addValidate(Validate.newBuilder()
.setRequired(true).build())
.build();
InputParam portParam = InputParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_PORT, DingTalkParamsConstants.DING_TALK_PORT)
.addValidate(Validate.newBuilder()
.setRequired(true).build())
.build();
InputParam userParam =
InputParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_USER, DingTalkParamsConstants.DING_TALK_USER)
.addValidate(Validate.newBuilder()
.setRequired(true).build())
.build();
PasswordParam passwordParam = PasswordParam.newBuilder(DingTalkParamsConstants.NAME_DING_TALK_PASSWORD, DingTalkParamsConstants.DING_TALK_PASSWORD)
.setPlaceholder("if enable use authentication, you need input password")
.build();
return Arrays.asList(webHookParam, keywordParam, isEnableProxy, proxyParam, portParam, userParam, passwordParam);
}
@Override
public AlertChannel create() {
return new DingTalkAlertChannel();
}
}

34
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertPlugin.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import com.google.common.collect.ImmutableList;
/**
* DingTalkAlertPlugin
*/
public class DingTalkAlertPlugin implements DolphinSchedulerPlugin {
@Override
public Iterable<AlertChannelFactory> getAlertChannelFactorys() {
return ImmutableList.of(new DingTalkAlertChannelFactory());
}
}

54
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkParamsConstants.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
/**
* DingTalkParamsConstants
*/
public class DingTalkParamsConstants {
static final String DING_TALK_WEB_HOOK = "dingtalk.webhook";
static final String NAME_DING_TALK_WEB_HOOK = "dingTalkWebHook";
static final String DING_TALK_KEYWORD = "dingtalk.keyword";
static final String NAME_DING_TALK_KEYWORD = "dingTalkKeyword";
public static final String DING_TALK_PROXY_ENABLE = "dingtalk.isEnableProxy";
static final String NAME_DING_TALK_PROXY_ENABLE = "dingTalkIsEnableProxy";
static final String DING_TALK_PROXY = "dingtalk.proxy";
static final String NAME_DING_TALK_PROXY = "dingTalkProxy";
static final String DING_TALK_PORT = "dingtalk.port";
static final String NAME_DING_TALK_PORT = "dingTalkPort";
static final String DING_TALK_USER = "dingtalk.user";
static final String NAME_DING_TALK_USER = "dingTalkUser";
static final String DING_TALK_PASSWORD = "dingtalk.password";
static final String NAME_DING_TALK_PASSWORD = "dingTalkPassword";
}

204
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/main/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSender.java

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ding Talk Sender
*/
public class DingTalkSender {
private static final Logger logger = LoggerFactory.getLogger(DingTalkSender.class);
private String url;
private String keyword;
private Boolean enableProxy;
private String proxy;
private Integer port;
private String user;
private String password;
DingTalkSender(Map<String, String> config) {
url = config.get(DingTalkParamsConstants.NAME_DING_TALK_WEB_HOOK);
keyword = config.get(DingTalkParamsConstants.NAME_DING_TALK_KEYWORD);
enableProxy = Boolean.valueOf(config.get(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE));
if (Boolean.TRUE.equals(enableProxy)) {
port = Integer.parseInt(config.get(DingTalkParamsConstants.NAME_DING_TALK_PORT));
proxy = config.get(DingTalkParamsConstants.NAME_DING_TALK_PROXY);
user = config.get(DingTalkParamsConstants.DING_TALK_USER);
password = config.get(DingTalkParamsConstants.NAME_DING_TALK_PASSWORD);
}
}
public AlertResult sendDingTalkMsg(String msg, String charset) {
AlertResult alertResult;
try {
String resp = sendMsg(msg, charset);
return checkSendDingTalkSendMsgResult(resp);
} catch (Exception e) {
logger.info("send ding talk alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setStatus("false");
alertResult.setMessage("send ding talk alert fail.");
}
return alertResult;
}
private String sendMsg(String msg, String charset) throws IOException {
String msgToJson = textToJsonString(msg + "#" + keyword);
HttpPost httpPost = constructHttpPost(url, msgToJson, charset);
CloseableHttpClient httpClient;
if (Boolean.TRUE.equals(enableProxy)) {
httpClient = getProxyClient(proxy, port, user, password);
RequestConfig rcf = getProxyConfig(proxy, port);
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send [%s], resp:{%s}", msg, resp);
return resp;
} finally {
httpClient.close();
}
}
private static HttpPost constructHttpPost(String url, String msg, String charset) {
HttpPost post = new HttpPost(url);
StringEntity entity = new StringEntity(msg, charset);
post.setEntity(entity);
post.addHeader("Content-Type", "application/json; charset=utf-8");
return post;
}
private static CloseableHttpClient getProxyClient(String proxy, int port, String user, String password) {
HttpHost httpProxy = new HttpHost(proxy, port);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, password));
return HttpClients.custom().setDefaultCredentialsProvider(provider).build();
}
private static CloseableHttpClient getDefaultClient() {
return HttpClients.createDefault();
}
private static RequestConfig getProxyConfig(String proxy, int port) {
HttpHost httpProxy = new HttpHost(proxy, port);
return RequestConfig.custom().setProxy(httpProxy).build();
}
private static String textToJsonString(String text) {
Map<String, Object> items = new HashMap<>();
items.put("msgtype", "text");
Map<String, String> textContent = new HashMap<>();
byte[] byt = StringUtils.getBytesUtf8(text);
String txt = StringUtils.newStringUtf8(byt);
textContent.put("content", txt);
items.put("text", textContent);
return JSONUtils.toJsonString(items);
}
public static class DingTalkSendMsgResponse {
private Integer errcode;
private String errmsg;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
}
private static AlertResult checkSendDingTalkSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
if (null == result) {
alertResult.setMessage("send ding talk msg error");
logger.info("send ding talk msg error,ding talk server resp is null");
return alertResult;
}
DingTalkSendMsgResponse sendMsgResponse = JSONUtils.parseObject(result, DingTalkSendMsgResponse.class);
if (null == sendMsgResponse) {
alertResult.setMessage("send ding talk msg fail");
logger.info("send ding talk msg error,resp error");
return alertResult;
}
if (sendMsgResponse.errcode == 0) {
alertResult.setStatus("true");
alertResult.setMessage("send ding talk msg success");
return alertResult;
}
alertResult.setMessage(String.format("alert send ding talk msg error : %s", sendMsgResponse.getErrmsg()));
logger.info("alert send ding talk msg error : {}", sendMsgResponse.getErrmsg());
return alertResult;
}
}

48
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/test/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkAlertChannelFactoryTest.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
/**
* DingTalkAlertChannelFactoryTest
*/
public class DingTalkAlertChannelFactoryTest {
@Test
public void testGetParams() {
DingTalkAlertChannelFactory dingTalkAlertChannelFactory = new DingTalkAlertChannelFactory();
List<PluginParams> params = dingTalkAlertChannelFactory.getParams();
JSONUtils.toJsonString(params);
Assert.assertEquals(7, params.size());
}
@Test
public void testCreate() {
DingTalkAlertChannelFactory dingTalkAlertChannelFactory = new DingTalkAlertChannelFactory();
AlertChannel alertChannel = dingTalkAlertChannelFactory.create();
Assert.assertNotNull(alertChannel);
}
}

57
dolphinscheduler-alert-plugin/dolphinscheduler-alert-dingtalk/src/test/java/org/apache/dolphinscheduler/plugin/alert/dingtalk/DingTalkSenderTest.java

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.dingtalk;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* DingTalkSenderTest
*/
public class DingTalkSenderTest {
private static Map<String, String> dingTalkConfig = new HashMap<>();
@Before
public void initDingTalkConfig() {
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_KEYWORD, "keyWord");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_WEB_HOOK, "url");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, "false");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PASSWORD, "password");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PORT, "9988");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_USER, "user1,user2");
}
@Test
public void testSend() {
DingTalkSender dingTalkSender = new DingTalkSender(dingTalkConfig);
dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8");
dingTalkConfig.put(DingTalkParamsConstants.NAME_DING_TALK_PROXY_ENABLE, "true");
dingTalkSender = new DingTalkSender(dingTalkConfig);
AlertResult alertResult = dingTalkSender.sendDingTalkMsg("keyWord+Welcome", "UTF-8");
Assert.assertEquals("false",alertResult.getStatus());
}
}

42
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/pom.xml

@ -27,6 +27,48 @@
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert-wechat</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

41
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannel.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import java.util.Map;
/**
* WeChatAlertChannel
*/
public class WeChatAlertChannel implements AlertChannel {
@Override
public AlertResult process(AlertInfo info) {
AlertData alertData = info.getAlertData();
String alertParams = info.getAlertParams();
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(alertParams);
return new WeChatSender(paramsMap).sendEnterpriseWeChat(alertData.getTitle(), alertData.getContent());
}
}

94
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactory.java

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.ShowType;
import org.apache.dolphinscheduler.spi.params.InputParam;
import org.apache.dolphinscheduler.spi.params.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import java.util.Arrays;
import java.util.List;
/**
* WeChatAlertChannelFactory
*/
public class WeChatAlertChannelFactory implements AlertChannelFactory {
@Override
public String getName() {
return "we chat alert";
}
@Override
public List<PluginParams> getParams() {
InputParam corpIdParam = InputParam.newBuilder(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_CORP_ID, WeChatAlertParamsConstants.ENTERPRISE_WE_CHAT_CORP_ID)
.setPlaceholder("please input corp id ")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam secretParam = InputParam.newBuilder(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_SECRET, WeChatAlertParamsConstants.ENTERPRISE_WE_CHAT_SECRET)
.setPlaceholder("please input secret ")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam usersParam = InputParam.newBuilder(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_USERS, WeChatAlertParamsConstants.ENTERPRISE_WE_CHAT_USERS)
.setPlaceholder("please input users ")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam userSendMsgParam = InputParam.newBuilder(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_USER_SEND_MSG, WeChatAlertParamsConstants.ENTERPRISE_WE_CHAT_USER_SEND_MSG)
.setPlaceholder("please input corp id ")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
InputParam agentIdParam = InputParam.newBuilder(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_AGENT_ID, WeChatAlertParamsConstants.ENTERPRISE_WE_CHAT_AGENT_ID)
.setPlaceholder("please input agent id ")
.addValidate(Validate.newBuilder()
.setRequired(true)
.build())
.build();
RadioParam showType = RadioParam.newBuilder(AlertConstants.SHOW_TYPE, AlertConstants.SHOW_TYPE)
.addParamsOptions(new ParamsOptions(ShowType.TABLE.getDescp(), ShowType.TABLE.getDescp(), false))
.addParamsOptions(new ParamsOptions(ShowType.TEXT.getDescp(), ShowType.TEXT.getDescp(), false))
.setValue(ShowType.TABLE.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
return Arrays.asList(corpIdParam, secretParam, usersParam, userSendMsgParam, agentIdParam, showType);
}
@Override
public AlertChannel create() {
return new WeChatAlertChannel();
}
}

34
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertConstants.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
/**
* WeChatAlertConstants
*/
public class WeChatAlertConstants {
static final String MARKDOWN_QUOTE = ">";
static final String MARKDOWN_ENTER = "\n";
static final String CHARSET = "UTF-8";
static final String WE_CHAT_PUSH_URL = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}";
static final String WE_CHAT_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpId}&corpsecret={secret}";
}

56
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertParamsConstants.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
/**
* WeChatAlertParamsConstants
*/
public class WeChatAlertParamsConstants {
static final String ENTERPRISE_WE_CHAT_CORP_ID = "enterprise.wechat.corp.id";
static final String NAME_ENTERPRISE_WE_CHAT_CORP_ID = "enterpriseWeChatCorpId";
static final String ENTERPRISE_WE_CHAT_SECRET = "enterprise.wechat.secret";
static final String NAME_ENTERPRISE_WE_CHAT_SECRET = "enterpriseWeChatSecret";
static final String ENTERPRISE_WE_CHAT_TEAM_SEND_MSG = "enterprise.wechat.team.send.msg";
static final String NAME_ENTERPRISE_WE_CHAT_TEAM_SEND_MSG = "enterpriseWeChatTeamSendMsg";
static final String ENTERPRISE_WE_CHAT_USER_SEND_MSG = "enterprise.wechat.user.send.msg";
static final String NAME_ENTERPRISE_WE_CHAT_USER_SEND_MSG = "enterpriseWeChatUserSendMsg";
static final String ENTERPRISE_WE_CHAT_AGENT_ID = "enterprise.wechat.agent.id";
static final String NAME_ENTERPRISE_WE_CHAT_AGENT_ID = "enterpriseWeChatAgentId";
static final String ENTERPRISE_WE_CHAT_USERS = "enterprise.wechat.users";
static final String NAME_ENTERPRISE_WE_CHAT_USERS = "enterpriseWeChatUsers";
}

34
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertPlugin.java

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
import org.apache.dolphinscheduler.spi.DolphinSchedulerPlugin;
import org.apache.dolphinscheduler.spi.alert.AlertChannelFactory;
import com.google.common.collect.ImmutableList;
/**
* WeChatAlertPlugin
*/
public class WeChatAlertPlugin implements DolphinSchedulerPlugin {
@Override
public Iterable<AlertChannelFactory> getAlertChannelFactorys() {
return ImmutableList.of(new WeChatAlertChannelFactory());
}
}

328
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/main/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatSender.java

@ -0,0 +1,328 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
import static java.util.Objects.requireNonNull;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.alert.ShowType;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* WeChatSender
*/
public class WeChatSender {
private static Logger logger = LoggerFactory.getLogger(WeChatSender.class);
private String weChatAgentId;
private String weChatUsers;
private String weChatTeamSendMsg;
private String weChatUserSendMsg;
private String weChatTokenUrlReplace;
private String weChatToken;
private String showType;
private static final String agentIdRegExp = "{agentId}";
private static final String msgRegExp = "{msg}";
private static final String userRegExp = "{toUser}";
private static final String corpIdRegex = "{corpId}";
private static final String secretRegex = "{secret}";
private static final String toPartyRegex = "{toParty}";
private static final String toUserRegex = "{toUser}";
private static final String tokenRegex = "{token}";
WeChatSender(Map<String, String> config) {
weChatAgentId = config.get(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_AGENT_ID);
weChatUsers = config.get(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_USERS);
String weChatCorpId = config.get(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_CORP_ID);
String weChatSecret = config.get(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_SECRET);
String weChatTokenUrl = WeChatAlertConstants.WE_CHAT_TOKEN_URL;
weChatTeamSendMsg = config.get(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_TEAM_SEND_MSG);
weChatUserSendMsg = config.get(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_USER_SEND_MSG);
showType = config.get(AlertConstants.SHOW_TYPE);
requireNonNull(showType, AlertConstants.SHOW_TYPE + " must not null");
weChatTokenUrlReplace = weChatTokenUrl
.replace(corpIdRegex, weChatCorpId)
.replace(secretRegex, weChatSecret);
weChatToken = getToken();
}
/**
* make user multi user message
*
* @param toUser the toUser
* @param agentId the agentId
* @param msg the msg
* @return Enterprise WeChat send message
*/
private String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) {
String listUser = mkString(toUser);
return weChatUserSendMsg.replace(userRegExp, listUser)
.replace(agentIdRegExp, agentId)
.replace(msgRegExp, msg);
}
/**
* send Enterprise WeChat
*
* @return Enterprise WeChat resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""}
* @throws Exception the Exception
*/
public AlertResult sendEnterpriseWeChat(String title, String content) {
List<String> userList = Arrays.asList(weChatUsers.split(","));
String data = markdownByAlert(title, content);
String msg = makeUserSendMsg(userList, weChatAgentId, data);
String enterpriseWeChatPushUrlReplace = WeChatAlertConstants.WE_CHAT_PUSH_URL.replace(tokenRegex, weChatToken);
AlertResult alertResult;
try {
return checkWeChatSendMsgResult(post(enterpriseWeChatPushUrlReplace, msg));
} catch (Exception e) {
logger.info("send we chat alert msg exception : {}", e.getMessage());
alertResult = new AlertResult();
alertResult.setMessage("send we chat alert fail");
alertResult.setStatus("false");
}
return alertResult;
}
private static String post(String url, String data) throws IOException {
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpPost httpPost = new HttpPost(url);
httpPost.setEntity(new StringEntity(data, WeChatAlertConstants.CHARSET));
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, WeChatAlertConstants.CHARSET);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Enterprise WeChat send [{}], param:{}, resp:{}",
url, data, resp);
return resp;
}
}
/**
* convert table to markdown style
*
* @param title the title
* @param content the content
* @return markdown table content
*/
private static String markdownTable(String title, String content) {
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
if (null == mapItemsList || mapItemsList.isEmpty()) {
logger.error("itemsList is null");
throw new RuntimeException("itemsList is null");
}
StringBuilder contents = new StringBuilder(200);
for (LinkedHashMap mapItems : mapItemsList) {
Set<Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Entry<String, Object>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(String.format("`%s`%s", title, WeChatAlertConstants.MARKDOWN_ENTER));
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
t.append(WeChatAlertConstants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(WeChatAlertConstants.MARKDOWN_ENTER);
}
contents.append(t);
}
return contents.toString();
}
/**
* convert text to markdown style
*
* @param title the title
* @param content the content
* @return markdown text
*/
private static String markdownText(String title, String content) {
if (StringUtils.isNotEmpty(content)) {
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
if (null == mapItemsList || mapItemsList.isEmpty()) {
logger.error("itemsList is null");
throw new RuntimeException("itemsList is null");
}
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`%n", title));
for (LinkedHashMap mapItems : mapItemsList) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
for (Entry<String, Object> entry : entries) {
contents.append(WeChatAlertConstants.MARKDOWN_QUOTE);
contents.append(entry.getKey()).append(":").append(entry.getValue());
contents.append(WeChatAlertConstants.MARKDOWN_ENTER);
}
}
return contents.toString();
}
return null;
}
/**
* Determine the mardown style based on the show type of the alert
*
* @return the markdown alert table/text
*/
private String markdownByAlert(String title, String content) {
String result = "";
if (showType.equals(ShowType.TABLE.getDescp())) {
result = markdownTable(title, content);
} else if (showType.equals(ShowType.TEXT.getDescp())) {
result = markdownText(title, content);
}
return result;
}
private String getToken() {
try {
return get(weChatTokenUrlReplace);
} catch (IOException e) {
logger.info("we chat alert get token error{}", e.getMessage());
}
return null;
}
private static String get(String url) throws IOException {
String resp;
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(url);
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, WeChatAlertConstants.CHARSET);
EntityUtils.consume(entity);
}
HashMap map = JSONUtils.parseObject(resp, HashMap.class);
if (map != null) {
return map.get("access_token").toString();
} else {
return null;
}
}
}
private static String mkString(Iterable<String> list) {
if (null == list || StringUtils.isEmpty("|")) {
return null;
}
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String item : list) {
if (first) {
first = false;
} else {
sb.append("|");
}
sb.append(item);
}
return sb.toString();
}
public static class WeChatSendMsgResponse {
private Integer errcode;
private String errmsg;
public Integer getErrcode() {
return errcode;
}
public void setErrcode(Integer errcode) {
this.errcode = errcode;
}
public String getErrmsg() {
return errmsg;
}
public void setErrmsg(String errmsg) {
this.errmsg = errmsg;
}
}
private static AlertResult checkWeChatSendMsgResult(String result) {
AlertResult alertResult = new AlertResult();
alertResult.setStatus("false");
if (null == result) {
alertResult.setMessage("we chat send fail");
logger.info("send we chat msg error,resp is null");
return alertResult;
}
WeChatSendMsgResponse sendMsgResponse = JSONUtils.parseObject(result, WeChatSendMsgResponse.class);
if (null == sendMsgResponse) {
alertResult.setMessage("we chat send fail");
logger.info("send we chat msg error,resp error");
return alertResult;
}
if (sendMsgResponse.errcode == 0) {
alertResult.setStatus("true");
alertResult.setMessage("we chat alert send success");
return alertResult;
}
alertResult.setStatus("false");
alertResult.setMessage(sendMsgResponse.getErrmsg());
return alertResult;
}
}

48
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/test/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatAlertChannelFactoryTest.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
/**
* WeChatAlertChannelFactoryTest
*/
public class WeChatAlertChannelFactoryTest {
@Test
public void testGetParams() {
WeChatAlertChannelFactory weChatAlertChannelFactory = new WeChatAlertChannelFactory();
List<PluginParams> params = weChatAlertChannelFactory.getParams();
JSONUtils.toJsonString(params);
Assert.assertEquals(6, params.size());
}
@Test
public void testCreate() {
WeChatAlertChannelFactory dingTalkAlertChannelFactory = new WeChatAlertChannelFactory();
AlertChannel alertChannel = dingTalkAlertChannelFactory.create();
Assert.assertNotNull(alertChannel);
}
}

89
dolphinscheduler-alert-plugin/dolphinscheduler-alert-wechat/src/test/java/org/apache/dolphinscheduler/plugin/alert/wechat/WeChatSenderTest.java

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.wechat;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import org.apache.dolphinscheduler.spi.alert.ShowType;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* WeChatSenderTest
*/
public class WeChatSenderTest {
private static Map<String, String> weChatConfig = new HashMap<>();
private String content = "[{\"id\":\"69\","
+
"\"name\":\"UserBehavior-0--1193959466\","
+
"\"Job name\":\"Start workflow\","
+
"\"State\":\"SUCCESS\","
+
"\"Recovery\":\"NO\","
+
"\"Run time\":\"1\","
+
"\"Start time\": \"2018-08-06 10:31:34.0\","
+
"\"End time\": \"2018-08-06 10:31:49.0\","
+
"\"Host\": \"192.168.xx.xx\","
+
"\"Notify group\" :\"4\"}]";
@Before
public void initDingTalkConfig() {
// Just for this test, I will delete these configurations before this PR is merged
weChatConfig.put(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_AGENT_ID, "1000002");
weChatConfig.put(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_CORP_ID, "ww8cc690c06761eaa3");
weChatConfig.put(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_SECRET, "MYL0_O91ICNrdjkAhgeXIOAj4gEKIirf0-xoYnA25vg");
weChatConfig.put(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_USER_SEND_MSG, "{\"touser\":\"{toUser}\",\"agentid\":{agentId}"
+
",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"{msg}\"}}"
);
weChatConfig.put(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_USERS, "Kris");
weChatConfig.put(WeChatAlertParamsConstants.NAME_ENTERPRISE_WE_CHAT_TEAM_SEND_MSG, "msg");
weChatConfig.put(AlertConstants.SHOW_TYPE, ShowType.TABLE.getDescp());
}
@Test
public void testSendWeChatTableMsg() {
WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.sendEnterpriseWeChat("test", content);
Assert.assertEquals("true", alertResult.getStatus());
}
@Test
public void testSendWeChatTextMsg() {
weChatConfig.put(AlertConstants.SHOW_TYPE, ShowType.TEXT.getDescp());
WeChatSender weChatSender = new WeChatSender(weChatConfig);
AlertResult alertResult = weChatSender.sendEnterpriseWeChat("test", content);
Assert.assertEquals("true", alertResult.getStatus());
}
}

55
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/DingTalkManager.java

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.manager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.DingTalkUtils;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ding Talk Manager
*/
public class DingTalkManager {
private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class);
public Map<String, Object> send(AlertInfo alert) {
Map<String, Object> retMap = new HashMap<>();
retMap.put(Constants.STATUS, false);
logger.info("send message {}", alert.getAlertData().getTitle());
try {
String msg = buildMessage(alert);
DingTalkUtils.sendDingTalkMsg(msg, Constants.UTF_8);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
retMap.put(Constants.STATUS, true);
return retMap;
}
private String buildMessage(AlertInfo alert) {
String msg = alert.getAlertData().getContent();
return msg;
}
}

63
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EnterpriseWeChatManager.java

@ -1,63 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.manager;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.EnterpriseWeChatUtils;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Enterprise WeChat Manager
*/
public class EnterpriseWeChatManager {
private static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatManager.class);
/**
* Enterprise We Chat send
*
* @param alertInfo the alert info
* @param token the token
* @return the send result
*/
public Map<String, Object> send(AlertInfo alertInfo, String token) {
Map<String, Object> retMap = new HashMap<>();
retMap.put(Constants.STATUS, false);
String agentId = EnterpriseWeChatUtils.ENTERPRISE_WE_CHAT_AGENT_ID;
String users = EnterpriseWeChatUtils.ENTERPRISE_WE_CHAT_USERS;
List<String> userList = Arrays.asList(users.split(","));
logger.info("send message {}", alertInfo.getAlertData().getTitle());
String msg = EnterpriseWeChatUtils.makeUserSendMsg(userList, agentId, EnterpriseWeChatUtils.markdownByAlert(alertInfo));
try {
EnterpriseWeChatUtils.sendEnterpriseWeChat(Constants.UTF_8, msg, token);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
retMap.put(Constants.STATUS, true);
return retMap;
}
}

138
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtils.java

@ -1,138 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DingTalkUtils utils
* support send msg to ding talk by robot message push function.
* support proxy setting
*/
public class DingTalkUtils {
public static final Logger logger = LoggerFactory.getLogger(DingTalkUtils.class);
public static final boolean isEnableDingTalk = PropertyUtils.getBoolean(Constants.DINGTALK_ENABLE);
private static final String dingTaskUrl = PropertyUtils.getString(Constants.DINGTALK_WEBHOOK);
private static final String keyword = PropertyUtils.getString(Constants.DINGTALK_KEYWORD);
private static final Boolean isEnableProxy = PropertyUtils.getBoolean(Constants.DINGTALK_PROXY_ENABLE);
private static final String proxy = PropertyUtils.getString(Constants.DINGTALK_PROXY);
private static final String user = PropertyUtils.getString(Constants.DINGTALK_USER);
private static final String passwd = PropertyUtils.getString(Constants.DINGTALK_PASSWORD);
private static final Integer port = PropertyUtils.getInt(Constants.DINGTALK_PORT);
/**
* send message interface
* only support text message format now.
*
* @param msg message context to send
* @param charset charset type
* @return result of sending msg
* @throws IOException the IOException
*/
public static String sendDingTalkMsg(String msg, String charset) throws IOException {
String msgToJson = textToJsonString(msg + "#" + keyword);
HttpPost httpPost = constructHttpPost(msgToJson, charset);
CloseableHttpClient httpClient;
if (isEnableProxy) {
httpClient = getProxyClient();
RequestConfig rcf = getProxyConfig();
httpPost.setConfig(rcf);
} else {
httpClient = getDefaultClient();
}
try {
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Ding Talk send [{}], resp:{%s}", msg, resp);
return resp;
} finally {
httpClient.close();
}
}
public static HttpPost constructHttpPost(String msg, String charset) {
HttpPost post = new HttpPost(dingTaskUrl);
StringEntity entity = new StringEntity(msg, charset);
post.setEntity(entity);
post.addHeader("Content-Type", "application/json; charset=utf-8");
return post;
}
public static CloseableHttpClient getProxyClient() {
HttpHost httpProxy = new HttpHost(proxy, port);
CredentialsProvider provider = new BasicCredentialsProvider();
provider.setCredentials(new AuthScope(httpProxy), new UsernamePasswordCredentials(user, passwd));
CloseableHttpClient httpClient = HttpClients.custom().setDefaultCredentialsProvider(provider).build();
return httpClient;
}
public static CloseableHttpClient getDefaultClient() {
return HttpClients.createDefault();
}
public static RequestConfig getProxyConfig() {
HttpHost httpProxy = new HttpHost(proxy, port);
return RequestConfig.custom().setProxy(httpProxy).build();
}
public static String textToJsonString(String text) {
Map<String, Object> items = new HashMap<String, Object>();
items.put("msgtype", "text");
Map<String, String> textContent = new HashMap<String, String>();
byte[] byt = StringUtils.getBytesUtf8(text);
String txt = StringUtils.newStringUtf8(byt);
textContent.put("content", txt);
items.put("text", textContent);
return JSONUtils.toJsonString(items);
}
}

290
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java

@ -1,290 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.ShowType;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Enterprise WeChat utils
*/
public class EnterpriseWeChatUtils {
public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class);
public static final String ENTERPRISE_WE_CHAT_AGENT_ID = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_AGENT_ID);
public static final String ENTERPRISE_WE_CHAT_USERS = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USERS);
private static final String ENTERPRISE_WE_CHAT_CORP_ID = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_CORP_ID);
private static final String ENTERPRISE_WE_CHAT_SECRET = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_SECRET);
private static final String ENTERPRISE_WE_CHAT_TOKEN_URL = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL);
private static final String ENTERPRISE_WE_CHAT_TOKEN_URL_REPLACE = ENTERPRISE_WE_CHAT_TOKEN_URL == null ? null : ENTERPRISE_WE_CHAT_TOKEN_URL
.replaceAll("\\{corpId}", ENTERPRISE_WE_CHAT_CORP_ID)
.replaceAll("\\{secret}", ENTERPRISE_WE_CHAT_SECRET);
private static final String ENTERPRISE_WE_CHAT_PUSH_URL = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_PUSH_URL);
private static final String ENTERPRISE_WE_CHAT_TEAM_SEND_MSG = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG);
private static final String ENTERPRISE_WE_CHAT_USER_SEND_MSG = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG);
private static final String agentIdRegExp = "\\{agentId}";
private static final String msgRegExp = "\\{msg}";
private static final String userRegExp = "\\{toUser}";
/**
* get Enterprise WeChat is enable
*
* @return isEnable
*/
public static boolean isEnable() {
Boolean isEnable = null;
try {
isEnable = PropertyUtils.getBoolean(Constants.ENTERPRISE_WECHAT_ENABLE);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (isEnable == null) {
return false;
}
return isEnable;
}
/**
* get Enterprise WeChat token info
*
* @return token string info
* @throws IOException the IOException
*/
public static String getToken() throws IOException {
String resp;
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpGet = new HttpGet(ENTERPRISE_WE_CHAT_TOKEN_URL_REPLACE);
CloseableHttpResponse response = httpClient.execute(httpGet);
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, Constants.UTF_8);
EntityUtils.consume(entity);
} finally {
response.close();
}
Map<String, String> map = JSONUtils.toMap(resp);
if (map != null) {
return map.get("access_token");
} else {
return null;
}
} finally {
httpClient.close();
}
}
/**
* make team single Enterprise WeChat message
*
* @param toParty the toParty
* @param agentId the agentId
* @param msg the msg
* @return Enterprise WeChat send message
*/
public static String makeTeamSendMsg(String toParty, String agentId, String msg) {
return ENTERPRISE_WE_CHAT_TEAM_SEND_MSG.replaceAll("\\{toParty}", toParty)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
* make team multi Enterprise WeChat message
*
* @param toParty the toParty
* @param agentId the agentId
* @param msg the msg
* @return Enterprise WeChat send message
*/
public static String makeTeamSendMsg(Collection<String> toParty, String agentId, String msg) {
String listParty = FuncUtils.mkString(toParty, "|");
return ENTERPRISE_WE_CHAT_TEAM_SEND_MSG.replaceAll("\\{toParty}", listParty)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
* make team single user message
*
* @param toUser the toUser
* @param agentId the agentId
* @param msg the msg
* @return Enterprise WeChat send message
*/
public static String makeUserSendMsg(String toUser, String agentId, String msg) {
return ENTERPRISE_WE_CHAT_USER_SEND_MSG.replaceAll("\\{toUser}", toUser)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
* make team multi user message
*
* @param toUser the toUser
* @param agentId the agentId
* @param msg the msg
* @return Enterprise WeChat send message
*/
public static String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) {
String listUser = FuncUtils.mkString(toUser, "|");
return ENTERPRISE_WE_CHAT_USER_SEND_MSG.replaceAll(userRegExp, listUser)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
* send Enterprise WeChat
*
* @param charset the charset
* @param data the data
* @param token the token
* @return Enterprise WeChat resp, demo: {"errcode":0,"errmsg":"ok","invaliduser":""}
* @throws IOException the IOException
*/
public static String sendEnterpriseWeChat(String charset, String data, String token) throws IOException {
String enterpriseWeChatPushUrlReplace = ENTERPRISE_WE_CHAT_PUSH_URL.replaceAll("\\{token}", token);
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpPost httpPost = new HttpPost(enterpriseWeChatPushUrlReplace);
httpPost.setEntity(new StringEntity(data, charset));
CloseableHttpResponse response = httpClient.execute(httpPost);
String resp;
try {
HttpEntity entity = response.getEntity();
resp = EntityUtils.toString(entity, charset);
EntityUtils.consume(entity);
} finally {
response.close();
}
logger.info("Enterprise WeChat send [{}], param:{}, resp:{}",
ENTERPRISE_WE_CHAT_PUSH_URL, data, resp);
return resp;
} finally {
httpClient.close();
}
}
/**
* convert table to markdown style
*
* @param title the title
* @param content the content
* @return markdown table content
*/
public static String markdownTable(String title, String content) {
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
StringBuilder contents = new StringBuilder(200);
if (null != mapItemsList) {
for (LinkedHashMap mapItems : mapItemsList) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(String.format("`%s`%s", title, Constants.MARKDOWN_ENTER));
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
t.append(Constants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(Constants.MARKDOWN_ENTER);
}
contents.append(t);
}
}
return contents.toString();
}
/**
* convert text to markdown style
*
* @param title the title
* @param content the content
* @return markdown text
*/
public static String markdownText(String title, String content) {
if (StringUtils.isNotEmpty(content)) {
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
if (null != mapItemsList) {
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`%n", title));
for (LinkedHashMap mapItems : mapItemsList) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
contents.append(Constants.MARKDOWN_QUOTE);
contents.append(entry.getKey()).append(":").append(entry.getValue());
contents.append(Constants.MARKDOWN_ENTER);
}
}
return contents.toString();
}
}
return null;
}
/**
* Determine the mardown style based on the show type of the alert
*
* @return the markdown alert table/text
*/
public static String markdownByAlert(AlertInfo alertInfo) {
String result = "";
Map<String, String> paramsMap = PluginParamsTransfer.getPluginParamsMap(alertInfo.getAlertParams());
String showType = paramsMap.get(AlertConstants.SHOW_TYPE);
if (showType.equals(ShowType.TABLE.getDescp())) {
result = markdownTable(alertInfo.getAlertData().getTitle(), alertInfo.getAlertData().getContent());
} else if (showType.equals(ShowType.TEXT.getDescp())) {
result = markdownText(alertInfo.getAlertData().getTitle(), alertInfo.getAlertData().getContent());
}
return result;
}
}

110
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/DingTalkUtilsTest.java

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@PrepareForTest(PropertyUtils.class)
@RunWith(PowerMockRunner.class)
@PowerMockIgnore("javax.net.ssl.*")
public class DingTalkUtilsTest {
Logger logger = LoggerFactory.getLogger(DingTalkUtilsTest.class);
private static final String mockUrl = "https://oapi.dingtalk.com/robot/send?access_token=test";
private static final String mockKeyWords = "onway";
private static final String msg = "ding talk test";
@Before
public void init() {
PowerMockito.mockStatic(PropertyUtils.class);
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_WEBHOOK)).thenReturn(mockUrl);
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_KEYWORD)).thenReturn(mockKeyWords);
Mockito.when(PropertyUtils.getBoolean(Constants.DINGTALK_PROXY_ENABLE)).thenReturn(true);
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_PROXY)).thenReturn("proxy.com.cn");
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_USER)).thenReturn("user");
Mockito.when(PropertyUtils.getString(Constants.DINGTALK_PASSWORD)).thenReturn("pswd");
Mockito.when(PropertyUtils.getInt(Constants.DINGTALK_PORT)).thenReturn(80);
}
@Test
public void testCreateDefaultClient() {
CloseableHttpClient client = DingTalkUtils.getDefaultClient();
try {
Assert.assertNotNull(client);
client.close();
} catch (IOException ex) {
logger.info("close exception", ex.getMessage());
new Throwable();
}
}
@Test
public void testCreateProxyClient() {
CloseableHttpClient client = DingTalkUtils.getProxyClient();
try {
Assert.assertNotNull(client);
client.close();
} catch (IOException ex) {
logger.info("close exception", ex.getMessage());
new Throwable();
}
}
@Test
public void testProxyConfig() {
RequestConfig rc = DingTalkUtils.getProxyConfig();
Assert.assertEquals(rc.getProxy().getPort(), 80);
Assert.assertEquals(rc.getProxy().getHostName(), "proxy.com.cn");
}
@Test
public void testDingTalkMsgToJson() {
String jsonString = DingTalkUtils.textToJsonString("this is test");
logger.info(jsonString);
String expect = "{\"text\":{\"content\":\"this is test\"},\"msgtype\":\"text\"}";
Assert.assertEquals(expect, jsonString);
}
@Test
public void testDingTalkMsgUtf8() {
String msg = DingTalkUtils.textToJsonString("this is test:中文");
logger.info("test support utf8, actual:" + msg);
logger.info("test support utf8, actual:" + DingTalkUtils.isEnableDingTalk);
String expect = "{\"text\":{\"content\":\"this is test:中文\"},\"msgtype\":\"text\"}";
Assert.assertEquals(expect, msg);
}
}

307
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java

@ -1,307 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.spi.alert.AlertConstants;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.ShowType;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.RadioParam;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* Please manually modify the configuration file before testing.
* file: alert.properties
* enterprise.wechat.corp.id
* enterprise.wechat.secret
* enterprise.wechat.token.url
* enterprise.wechat.push.url
* enterprise.wechat.send.msg
* enterprise.wechat.agent.id
* enterprise.wechat.users
*/
@PrepareForTest(PropertyUtils.class)
@RunWith(PowerMockRunner.class)
public class EnterpriseWeChatUtilsTest {
private static final String toParty = "wwc99134b6fc1edb6";
private static final String enterpriseWechatSecret = "Uuv2KFrkdf7SeKOsTDCpsTkpawXBMNRhFy6VKX5FV";
private static final String enterpriseWechatAgentId = "1000004";
private static final String enterpriseWechatUsers = "LiGang,journey";
private static final String msg = "hello world";
private static final String enterpriseWechatTeamSendMsg = "{\\\"toparty\\\":\\\"{toParty}\\\",\\\"agentid\\\":\\\"{agentId}\\\""
+
",\\\"msgtype\\\":\\\"text\\\",\\\"text\\\":{\\\"content\\\":\\\"{msg}\\\"},\\\"safe\\\":\\\"0\\\"}";
private static final String enterpriseWechatUserSendMsg = "{\\\"touser\\\":\\\"{toUser}\\\",\\\"agentid\\\":\\\"{agentId}\\\""
+
",\\\"msgtype\\\":\\\"markdown\\\",\\\"markdown\\\":{\\\"content\\\":\\\"{msg}\\\"}}";
@Before
public void init() {
PowerMockito.mockStatic(PropertyUtils.class);
Mockito.when(PropertyUtils.getBoolean(Constants.ENTERPRISE_WECHAT_ENABLE)).thenReturn(true);
Mockito.when(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG)).thenReturn(enterpriseWechatUserSendMsg);
Mockito.when(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG)).thenReturn(enterpriseWechatTeamSendMsg);
}
@Test
public void testIsEnable() {
Boolean weChartEnable = EnterpriseWeChatUtils.isEnable();
Assert.assertTrue(weChartEnable);
}
@Test
public void testMakeTeamSendMsg1() {
String sendMsg = EnterpriseWeChatUtils.makeTeamSendMsg(toParty, enterpriseWechatSecret, msg);
Assert.assertTrue(sendMsg.contains(toParty));
Assert.assertTrue(sendMsg.contains(enterpriseWechatSecret));
Assert.assertTrue(sendMsg.contains(msg));
}
@Test
public void testMakeTeamSendMsg2() {
List<String> parties = new ArrayList<>();
parties.add(toParty);
parties.add("test1");
String sendMsg = EnterpriseWeChatUtils.makeTeamSendMsg(parties, enterpriseWechatSecret, msg);
Assert.assertTrue(sendMsg.contains(toParty));
Assert.assertTrue(sendMsg.contains(enterpriseWechatSecret));
Assert.assertTrue(sendMsg.contains(msg));
}
@Test
public void tesMakeUserSendMsg1() {
String sendMsg = EnterpriseWeChatUtils.makeUserSendMsg(enterpriseWechatUsers, enterpriseWechatAgentId, msg);
Assert.assertTrue(sendMsg.contains(enterpriseWechatUsers));
Assert.assertTrue(sendMsg.contains(enterpriseWechatAgentId));
Assert.assertTrue(sendMsg.contains(msg));
}
@Test
public void tesMakeUserSendMsg2() {
List<String> users = new ArrayList<>();
users.add("user1");
users.add("user2");
String sendMsg = EnterpriseWeChatUtils.makeUserSendMsg(users, enterpriseWechatAgentId, msg);
Assert.assertTrue(sendMsg.contains(users.get(0)));
Assert.assertTrue(sendMsg.contains(users.get(1)));
Assert.assertTrue(sendMsg.contains(enterpriseWechatAgentId));
Assert.assertTrue(sendMsg.contains(msg));
}
@Test
public void testMarkdownByAlertForText() {
Alert alertForText = createAlertForText();
AlertData alertData = new AlertData();
AlertInfo alertInfo = new AlertInfo();
//TODO:
List<PluginParams> paramsList = new ArrayList<>();
RadioParam showType = new RadioParam.Builder(AlertConstants.SHOW_TYPE, AlertConstants.SHOW_TYPE)
.setValue(ShowType.TEXT)
.build();
paramsList.add(showType);
alertInfo.setAlertParams(PluginParamsTransfer.transferParamsToJson(paramsList));
alertInfo.setAlertData(alertData);
alertData.setTitle(alertForText.getTitle())
//.setShowType(alertForText.getShowType().getDescp())
.setContent(alertForText.getContent());
String result = EnterpriseWeChatUtils.markdownByAlert(alertInfo);
Assert.assertNotNull(result);
}
@Test
public void testMarkdownByAlertForTable() {
Alert alertForText = createAlertForTable();
AlertData alertData = new AlertData();
AlertInfo alertInfo = new AlertInfo();
//TODO:
List<PluginParams> paramsList = new ArrayList<>();
RadioParam showType = new RadioParam.Builder(AlertConstants.SHOW_TYPE, AlertConstants.SHOW_TYPE)
.setValue(ShowType.TABLE)
.build();
paramsList.add(showType);
alertInfo.setAlertParams(PluginParamsTransfer.transferParamsToJson(paramsList));
alertInfo.setAlertData(alertData);
alertData.setTitle(alertForText.getTitle())
//.setShowType(alertForText.getShowType().getDescp())
.setContent(alertForText.getContent());
String result = EnterpriseWeChatUtils.markdownByAlert(alertInfo);
Assert.assertNotNull(result);
}
private Alert createAlertForText() {
String content = "[{\"id\":\"69\","
+
"\"name\":\"UserBehavior-0--1193959466\","
+
"\"Job name\":\"Start workflow\","
+
"\"State\":\"SUCCESS\","
+
"\"Recovery\":\"NO\","
+
"\"Run time\":\"1\","
+
"\"Start time\": \"2018-08-06 10:31:34.0\","
+
"\"End time\": \"2018-08-06 10:31:49.0\","
+
"\"Host\": \"192.168.xx.xx\","
+
"\"Notify group\" :\"4\"}]";
Alert alert = new Alert();
alert.setTitle("Mysql Exception");
//alert.setShowType(ShowType.TEXT);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(4);
return alert;
}
private String list2String() {
LinkedHashMap<String, Object> map1 = new LinkedHashMap<>();
map1.put("mysql service name", "mysql200");
map1.put("mysql address", "192.168.xx.xx");
map1.put("port", "3306");
map1.put("no index of number", "80");
map1.put("database client connections", "190");
LinkedHashMap<String, Object> map2 = new LinkedHashMap<>();
map2.put("mysql service name", "mysql210");
map2.put("mysql address", "192.168.xx.xx");
map2.put("port", "3306");
map2.put("no index of number", "10");
map2.put("database client connections", "90");
List<LinkedHashMap<String, Object>> maps = new ArrayList<>();
maps.add(0, map1);
maps.add(1, map2);
String mapjson = JSONUtils.toJsonString(maps);
return mapjson;
}
private Alert createAlertForTable() {
Alert alert = new Alert();
alert.setTitle("Mysql Exception");
//alert.setShowType(ShowType.TABLE.getDescp());
String content = list2String();
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(1);
return alert;
}
// @Test
// public void testSendSingleTeamWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeTeamSendMsg(partyId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendMultiTeamWeChat() {
//
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeTeamSendMsg(listPartyId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendSingleUserWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId.stream().findFirst().get(), agentId, "your meeting room has been booked and will be synced to the 'mailbox' later \n" +
// ">**matter details** \n" +
// ">matter:<font color='info'>meeting</font> <br>" +
// ">organizer:@miglioguan \n" +
// ">participant:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" +
// "> \n" +
// ">meeting room:<font color='info'>Guangzhou TIT 1st Floor 301</font> \n" +
// ">date:<font color='warning'>May 18, 2018</font> \n" +
// ">time:<font color='comment'>9:00-11:00 am</font> \n" +
// "> \n" +
// ">please attend the meeting on time\n" +
// "> \n" +
// ">to modify the meeting information, please click: [Modify Meeting Information](https://work.weixin.qq.com)\"");
//
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendMultiUserWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
//
// String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
}

2
dolphinscheduler-spi/pom.xml

@ -35,10 +35,12 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>

4
pom.xml

@ -950,6 +950,10 @@
<include>**/plugin/alert/email/ExcelUtilsTest.java</include>
<include>**/plugin/alert/email/MailUtilsTest.java</include>
<include>**/plugin/alert/email/template/DefaultHTMLTemplateTest.java</include>
<include>**/plugin/alert/dingtalk/DingTalkSenderTest.java</include>
<include>**/plugin/alert/dingtalk/DingTalkAlertChannelFactoryTest.java</include>
<include>**/plugin/alert/wechat/WeChatSenderTest.java</include>
<include>**/plugin/alert/wechat/WeChatAlertChannelFactoryTest.java</include>
<include>**/spi/params/PluginParamsTransferTest.java</include>
<include>**/alert/plugin/EmailAlertPluginTest.java</include>
<include>**/alert/plugin/AlertPluginManagerTest.java</include>

Loading…
Cancel
Save