Browse Source

[Improvement][alert] Optimize alarm information (#3559)

* #3299  Json string parsing problem caused by non-standard json format.

* #3299  Json string parsing problem caused by non-standard json format.

* #3299  Json string parsing problem caused by non-standard json format. fix  code style

* #3299  Json string parsing problem caused by non-standard json format. fix  code style

* Optimize alarm information

* fix code style

* add license

* fix code smell

* Update dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java

Co-authored-by: Yichao Yang <1048262223@qq.com>

* fix code smell

* fix code smell

* 修改ut 和replace问题。

* 修改ut

Co-authored-by: wangjianda <Felix@thinkingdata.com>
Co-authored-by: Yichao Yang <1048262223@qq.com>
pull/3/MERGE
felix.wang 4 years ago committed by GitHub
parent
commit
2ba529a96c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 89
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java
  2. 233
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java
  3. 23
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertEvent.java
  4. 23
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertWarnLevel.java
  5. 58
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  6. 236
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessAlertContent.java
  7. 85
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ServerAlertContent.java
  8. 71
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java

89
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtils.java

@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.plugin.model.AlertData;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@ -29,11 +30,17 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Enterprise WeChat utils
@ -41,25 +48,21 @@ import java.util.*;
public class EnterpriseWeChatUtils {
public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class);
public static final String ENTERPRISE_WE_CHAT_AGENT_ID = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_AGENT_ID);
public static final String ENTERPRISE_WE_CHAT_USERS = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USERS);
private static final String ENTERPRISE_WE_CHAT_CORP_ID = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_CORP_ID);
private static final String ENTERPRISE_WE_CHAT_SECRET = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_SECRET);
private static final String ENTERPRISE_WE_CHAT_TOKEN_URL = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_TOKEN_URL);
private static final String ENTERPRISE_WE_CHAT_TOKEN_URL_REPLACE = ENTERPRISE_WE_CHAT_TOKEN_URL == null ? null : ENTERPRISE_WE_CHAT_TOKEN_URL
.replaceAll("\\{corpId\\}", ENTERPRISE_WE_CHAT_CORP_ID)
.replaceAll("\\{secret\\}", ENTERPRISE_WE_CHAT_SECRET);
.replaceAll("\\{corpId}", ENTERPRISE_WE_CHAT_CORP_ID)
.replaceAll("\\{secret}", ENTERPRISE_WE_CHAT_SECRET);
private static final String ENTERPRISE_WE_CHAT_PUSH_URL = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_PUSH_URL);
private static final String ENTERPRISE_WE_CHAT_TEAM_SEND_MSG = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_TEAM_SEND_MSG);
private static final String ENTERPRISE_WE_CHAT_USER_SEND_MSG = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG);
public static final String ENTERPRISE_WE_CHAT_AGENT_ID = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_AGENT_ID);
public static final String ENTERPRISE_WE_CHAT_USERS = PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USERS);
private static final String agentIdRegExp = "\\{agentId}";
private static final String msgRegExp = "\\{msg}";
private static final String userRegExp = "\\{toUser}";
/**
* get Enterprise WeChat is enable
@ -120,9 +123,9 @@ public class EnterpriseWeChatUtils {
* @return Enterprise WeChat send message
*/
public static String makeTeamSendMsg(String toParty, String agentId, String msg) {
return ENTERPRISE_WE_CHAT_TEAM_SEND_MSG.replaceAll("\\{toParty\\}", toParty)
.replaceAll("\\{agentId\\}", agentId)
.replaceAll("\\{msg\\}", msg);
return ENTERPRISE_WE_CHAT_TEAM_SEND_MSG.replaceAll("\\{toParty}", toParty)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
@ -135,9 +138,9 @@ public class EnterpriseWeChatUtils {
*/
public static String makeTeamSendMsg(Collection<String> toParty, String agentId, String msg) {
String listParty = FuncUtils.mkString(toParty, "|");
return ENTERPRISE_WE_CHAT_TEAM_SEND_MSG.replaceAll("\\{toParty\\}", listParty)
.replaceAll("\\{agentId\\}", agentId)
.replaceAll("\\{msg\\}", msg);
return ENTERPRISE_WE_CHAT_TEAM_SEND_MSG.replaceAll("\\{toParty}", listParty)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
@ -149,9 +152,9 @@ public class EnterpriseWeChatUtils {
* @return Enterprise WeChat send message
*/
public static String makeUserSendMsg(String toUser, String agentId, String msg) {
return ENTERPRISE_WE_CHAT_USER_SEND_MSG.replaceAll("\\{toUser\\}", toUser)
.replaceAll("\\{agentId\\}", agentId)
.replaceAll("\\{msg\\}", msg);
return ENTERPRISE_WE_CHAT_USER_SEND_MSG.replaceAll("\\{toUser}", toUser)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
@ -164,9 +167,9 @@ public class EnterpriseWeChatUtils {
*/
public static String makeUserSendMsg(Collection<String> toUser, String agentId, String msg) {
String listUser = FuncUtils.mkString(toUser, "|");
return ENTERPRISE_WE_CHAT_USER_SEND_MSG.replaceAll("\\{toUser\\}", listUser)
.replaceAll("\\{agentId\\}", agentId)
.replaceAll("\\{msg\\}", msg);
return ENTERPRISE_WE_CHAT_USER_SEND_MSG.replaceAll(userRegExp, listUser)
.replaceAll(agentIdRegExp, agentId)
.replaceAll(msgRegExp, msg);
}
/**
@ -179,7 +182,7 @@ public class EnterpriseWeChatUtils {
* @throws IOException the IOException
*/
public static String sendEnterpriseWeChat(String charset, String data, String token) throws IOException {
String enterpriseWeChatPushUrlReplace = ENTERPRISE_WE_CHAT_PUSH_URL.replaceAll("\\{token\\}", token);
String enterpriseWeChatPushUrlReplace = ENTERPRISE_WE_CHAT_PUSH_URL.replaceAll("\\{token}", token);
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
@ -215,13 +218,13 @@ public class EnterpriseWeChatUtils {
if (null != mapItemsList) {
for (LinkedHashMap mapItems : mapItemsList) {
Set<Map.Entry<String, String>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, String>> iterator = entries.iterator();
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
StringBuilder t = new StringBuilder(String.format("`%s`%s", title, Constants.MARKDOWN_ENTER));
while (iterator.hasNext()) {
Map.Entry<String, String> entry = iterator.next();
Map.Entry<String, Object> entry = iterator.next();
t.append(Constants.MARKDOWN_QUOTE);
t.append(entry.getKey()).append(":").append(entry.getValue());
t.append(Constants.MARKDOWN_ENTER);
@ -241,23 +244,24 @@ public class EnterpriseWeChatUtils {
*/
public static String markdownText(String title, String content) {
if (StringUtils.isNotEmpty(content)) {
List<String> list;
try {
list = JSONUtils.toList(content, String.class);
} catch (Exception e) {
logger.error("json format exception", e);
return null;
}
List<LinkedHashMap> mapItemsList = JSONUtils.toList(content, LinkedHashMap.class);
if (null != mapItemsList) {
StringBuilder contents = new StringBuilder(100);
contents.append(String.format("`%s`%n", title));
for (String str : list) {
for (LinkedHashMap mapItems : mapItemsList) {
Set<Map.Entry<String, Object>> entries = mapItems.entrySet();
Iterator<Map.Entry<String, Object>> iterator = entries.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
contents.append(Constants.MARKDOWN_QUOTE);
contents.append(str);
contents.append(entry.getKey()).append(":").append(entry.getValue());
contents.append(Constants.MARKDOWN_ENTER);
}
}
return contents.toString();
}
}
return null;
@ -278,4 +282,5 @@ public class EnterpriseWeChatUtils {
return result;
}
}

233
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/EnterpriseWeChatUtilsTest.java

@ -14,26 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.plugin.model.AlertData;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.util.*;
import org.apache.dolphinscheduler.common.utils.*;
/**
* Please manually modify the configuration file before testing.
* file: alert.properties
@ -52,14 +54,18 @@ public class EnterpriseWeChatUtilsTest {
private static final String toParty = "wwc99134b6fc1edb6";
private static final String enterpriseWechatSecret = "Uuv2KFrkdf7SeKOsTDCpsTkpawXBMNRhFy6VKX5FV";
private static final String enterpriseWechatAgentId = "1000004";
private static final String enterpriseWechatUsers="LiGang,journey";
private static final String enterpriseWechatUsers = "LiGang,journey";
private static final String msg = "hello world";
private static final String enterpriseWechatTeamSendMsg = "{\\\"toparty\\\":\\\"{toParty}\\\",\\\"agentid\\\":\\\"{agentId}\\\",\\\"msgtype\\\":\\\"text\\\",\\\"text\\\":{\\\"content\\\":\\\"{msg}\\\"},\\\"safe\\\":\\\"0\\\"}";
private static final String enterpriseWechatUserSendMsg = "{\\\"touser\\\":\\\"{toUser}\\\",\\\"agentid\\\":\\\"{agentId}\\\",\\\"msgtype\\\":\\\"markdown\\\",\\\"markdown\\\":{\\\"content\\\":\\\"{msg}\\\"}}";
private static final String enterpriseWechatTeamSendMsg = "{\\\"toparty\\\":\\\"{toParty}\\\",\\\"agentid\\\":\\\"{agentId}\\\""
+
",\\\"msgtype\\\":\\\"text\\\",\\\"text\\\":{\\\"content\\\":\\\"{msg}\\\"},\\\"safe\\\":\\\"0\\\"}";
private static final String enterpriseWechatUserSendMsg = "{\\\"touser\\\":\\\"{toUser}\\\",\\\"agentid\\\":\\\"{agentId}\\\""
+
",\\\"msgtype\\\":\\\"markdown\\\",\\\"markdown\\\":{\\\"content\\\":\\\"{msg}\\\"}}";
@Before
public void init(){
public void init() {
PowerMockito.mockStatic(PropertyUtils.class);
Mockito.when(PropertyUtils.getBoolean(Constants.ENTERPRISE_WECHAT_ENABLE)).thenReturn(true);
Mockito.when(PropertyUtils.getString(Constants.ENTERPRISE_WECHAT_USER_SEND_MSG)).thenReturn(enterpriseWechatUserSendMsg);
@ -67,14 +73,13 @@ public class EnterpriseWeChatUtilsTest {
}
@Test
public void testIsEnable(){
public void testIsEnable() {
Boolean weChartEnable = EnterpriseWeChatUtils.isEnable();
Assert.assertTrue(weChartEnable);
}
@Test
public void testMakeTeamSendMsg1(){
public void testMakeTeamSendMsg1() {
String sendMsg = EnterpriseWeChatUtils.makeTeamSendMsg(toParty, enterpriseWechatSecret, msg);
Assert.assertTrue(sendMsg.contains(toParty));
Assert.assertTrue(sendMsg.contains(enterpriseWechatSecret));
@ -82,9 +87,8 @@ public class EnterpriseWeChatUtilsTest {
}
@Test
public void testMakeTeamSendMsg2(){
public void testMakeTeamSendMsg2() {
List<String> parties = new ArrayList<>();
parties.add(toParty);
parties.add("test1");
@ -96,7 +100,7 @@ public class EnterpriseWeChatUtilsTest {
}
@Test
public void tesMakeUserSendMsg1(){
public void tesMakeUserSendMsg1() {
String sendMsg = EnterpriseWeChatUtils.makeUserSendMsg(enterpriseWechatUsers, enterpriseWechatAgentId, msg);
Assert.assertTrue(sendMsg.contains(enterpriseWechatUsers));
@ -105,7 +109,7 @@ public class EnterpriseWeChatUtilsTest {
}
@Test
public void tesMakeUserSendMsg2(){
public void tesMakeUserSendMsg2() {
List<String> users = new ArrayList<>();
users.add("user1");
users.add("user2");
@ -118,7 +122,7 @@ public class EnterpriseWeChatUtilsTest {
}
@Test
public void testMarkdownByAlertForText(){
public void testMarkdownByAlertForText() {
Alert alertForText = createAlertForText();
AlertData alertData = new AlertData();
alertData.setTitle(alertForText.getTitle())
@ -129,7 +133,7 @@ public class EnterpriseWeChatUtilsTest {
}
@Test
public void testMarkdownByAlertForTable(){
public void testMarkdownByAlertForTable() {
Alert alertForText = createAlertForTable();
AlertData alertData = new AlertData();
alertData.setTitle(alertForText.getTitle())
@ -139,17 +143,26 @@ public class EnterpriseWeChatUtilsTest {
Assert.assertNotNull(result);
}
private Alert createAlertForText(){
String content ="[\"id:69\"," +
"\"name:UserBehavior-0--1193959466\"," +
"\"Job name: Start workflow\"," +
"\"State: SUCCESS\"," +
"\"Recovery:NO\"," +
"\"Run time: 1\"," +
"\"Start time: 2018-08-06 10:31:34.0\"," +
"\"End time: 2018-08-06 10:31:49.0\"," +
"\"Host: 192.168.xx.xx\"," +
"\"Notify group :4\"]";
private Alert createAlertForText() {
String content = "[{\"id\":\"69\","
+
"\"name\":\"UserBehavior-0--1193959466\","
+
"\"Job name\":\"Start workflow\","
+
"\"State\":\"SUCCESS\","
+
"\"Recovery\":\"NO\","
+
"\"Run time\":\"1\","
+
"\"Start time\": \"2018-08-06 10:31:34.0\","
+
"\"End time\": \"2018-08-06 10:31:49.0\","
+
"\"Host\": \"192.168.xx.xx\","
+
"\"Notify group\" :\"4\"}]";
Alert alert = new Alert();
alert.setTitle("Mysql Exception");
@ -161,18 +174,18 @@ public class EnterpriseWeChatUtilsTest {
return alert;
}
private String list2String(){
private String list2String() {
LinkedHashMap<String, Object> map1 = new LinkedHashMap<>();
map1.put("mysql service name","mysql200");
map1.put("mysql address","192.168.xx.xx");
map1.put("port","3306");
map1.put("no index of number","80");
map1.put("database client connections","190");
map1.put("mysql service name", "mysql200");
map1.put("mysql address", "192.168.xx.xx");
map1.put("port", "3306");
map1.put("no index of number", "80");
map1.put("database client connections", "190");
LinkedHashMap<String, Object> map2 = new LinkedHashMap<>();
map2.put("mysql service name","mysql210");
map2.put("mysql address","192.168.xx.xx");
map2.put("mysql service name", "mysql210");
map2.put("mysql address", "192.168.xx.xx");
map2.put("port", "3306");
map2.put("no index of number", "10");
map2.put("database client connections", "90");
@ -184,11 +197,11 @@ public class EnterpriseWeChatUtilsTest {
return mapjson;
}
private Alert createAlertForTable(){
private Alert createAlertForTable() {
Alert alert = new Alert();
alert.setTitle("Mysql Exception");
alert.setShowType(ShowType.TABLE);
String content= list2String();
String content = list2String();
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(1);
@ -196,77 +209,75 @@ public class EnterpriseWeChatUtilsTest {
}
// @Test
// public void testSendSingleTeamWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeTeamSendMsg(partyId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendMultiTeamWeChat() {
//
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeTeamSendMsg(listPartyId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendSingleUserWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId.stream().findFirst().get(), agentId, "your meeting room has been booked and will be synced to the 'mailbox' later \n" +
// ">**matter details** \n" +
// ">matter:<font color='info'>meeting</font> <br>" +
// ">organizer:@miglioguan \n" +
// ">participant:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" +
// "> \n" +
// ">meeting room:<font color='info'>Guangzhou TIT 1st Floor 301</font> \n" +
// ">date:<font color='warning'>May 18, 2018</font> \n" +
// ">time:<font color='comment'>9:00-11:00 am</font> \n" +
// "> \n" +
// ">please attend the meeting on time\n" +
// "> \n" +
// ">to modify the meeting information, please click: [Modify Meeting Information](https://work.weixin.qq.com)\"");
//
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendMultiUserWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
//
// String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
// @Test
// public void testSendSingleTeamWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeTeamSendMsg(partyId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendMultiTeamWeChat() {
//
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeTeamSendMsg(listPartyId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendSingleUserWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
// String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId.stream().findFirst().get(), agentId, "your meeting room has been booked and will be synced to the 'mailbox' later \n" +
// ">**matter details** \n" +
// ">matter:<font color='info'>meeting</font> <br>" +
// ">organizer:@miglioguan \n" +
// ">participant:@miglioguan、@kunliu、@jamdeezhou、@kanexiong、@kisonwang \n" +
// "> \n" +
// ">meeting room:<font color='info'>Guangzhou TIT 1st Floor 301</font> \n" +
// ">date:<font color='warning'>May 18, 2018</font> \n" +
// ">time:<font color='comment'>9:00-11:00 am</font> \n" +
// "> \n" +
// ">please attend the meeting on time\n" +
// "> \n" +
// ">to modify the meeting information, please click: [Modify Meeting Information](https://work.weixin.qq.com)\"");
//
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
//
// @Test
// public void testSendMultiUserWeChat() {
// try {
// String token = EnterpriseWeChatUtils.getToken();
//
// String msg = EnterpriseWeChatUtils.makeUserSendMsg(listUserId, agentId, "hello world");
// String resp = EnterpriseWeChatUtils.sendEnterpriseWeChat("utf-8", msg, token);
//
// String errmsg = JSONUtils.parseObject(resp).getString("errmsg");
// Assert.assertEquals("ok",errmsg);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
}

23
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertEvent.java

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum AlertEvent {
SERVER_DOWN,TIME_OUT
}

23
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AlertWarnLevel.java

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum AlertWarnLevel {
MIDDLE,SERIOUS
}

58
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -17,22 +17,25 @@
package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.AlertWarnLevel;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ServerAlertContent;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.mapper.UserAlertGroupMapper;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
@ -103,14 +106,12 @@ public class AlertDao extends AbstractBaseDao {
*/
public void sendServerStopedAlert(int alertgroupId, String host, String serverType) {
Alert alert = new Alert();
List<LinkedHashMap> serverStopList = new ArrayList<>(1);
LinkedHashMap<String, String> serverStopedMap = new LinkedHashMap();
serverStopedMap.put("type", serverType);
serverStopedMap.put("host", host);
serverStopedMap.put("event", "server down");
serverStopedMap.put("warning level", "serious");
serverStopList.add(serverStopedMap);
String content = JSONUtils.toJsonString(serverStopList);
List<ServerAlertContent> serverAlertContents = new ArrayList<>(1);
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().
type(serverType).host(host).event(AlertEvent.SERVER_DOWN).warningLevel(AlertWarnLevel.SERIOUS).
build();
serverAlertContents.add(serverStopAlertContent);
String content = JSONUtils.toJsonString(serverAlertContents);
alert.setTitle("Fault tolerance warning");
saveTaskTimeoutAlert(alert, content, alertgroupId, null, null);
}
@ -126,14 +127,15 @@ public class AlertDao extends AbstractBaseDao {
String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc();
Alert alert = new Alert();
List<LinkedHashMap> processTimeoutList = new ArrayList<>(1);
LinkedHashMap<String, String> processTimeoutMap = new LinkedHashMap();
processTimeoutMap.put("id", String.valueOf(processInstance.getId()));
processTimeoutMap.put("name", processInstance.getName());
processTimeoutMap.put("event", "timeout");
processTimeoutMap.put("warnLevel", "middle");
processTimeoutList.add(processTimeoutMap);
String content = JSONUtils.toJsonString(processTimeoutList);
List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.processId(processInstance.getId())
.processName(processInstance.getName())
.event(AlertEvent.TIME_OUT)
.warningLevel(AlertWarnLevel.MIDDLE)
.build();
processAlertContentList.add(processAlertContent);
String content = JSONUtils.toJsonString(processAlertContentList);
alert.setTitle("Process Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc);
}
@ -169,16 +171,17 @@ public class AlertDao extends AbstractBaseDao {
public void sendTaskTimeoutAlert(int alertgroupId, String receivers, String receiversCc, int processInstanceId,
String processInstanceName, int taskId, String taskName) {
Alert alert = new Alert();
List<LinkedHashMap> taskTimeoutList = new ArrayList<>(1);
LinkedHashMap<String, String> taskTimeoutMap = new LinkedHashMap();
taskTimeoutMap.put("process instance id", String.valueOf(processInstanceId));
taskTimeoutMap.put("process name", processInstanceName);
taskTimeoutMap.put("task id", String.valueOf(taskId));
taskTimeoutMap.put("task name", taskName);
taskTimeoutMap.put("event", "timeout");
taskTimeoutMap.put("warnLevel", "middle");
taskTimeoutList.add(taskTimeoutMap);
String content = JSONUtils.toJsonString(taskTimeoutList);
List<ProcessAlertContent> processAlertContentList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.processId(processInstanceId)
.processName(processInstanceName)
.taskId(taskId)
.taskName(taskName)
.event(AlertEvent.TIME_OUT)
.warningLevel(AlertWarnLevel.MIDDLE)
.build();
processAlertContentList.add(processAlertContent);
String content = JSONUtils.toJsonString(processAlertContentList);
alert.setTitle("Task Timeout Warn");
saveTaskTimeoutAlert(alert, content, alertgroupId, receivers, receiversCc);
}
@ -210,4 +213,5 @@ public class AlertDao extends AbstractBaseDao {
public AlertMapper getAlertMapper() {
return alertMapper;
}
}

236
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessAlertContent.java

@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertWarnLevel;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import java.io.Serializable;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonInclude(Include.NON_NULL)
public class ProcessAlertContent implements Serializable {
@JsonProperty("processId")
private int processId;
@JsonProperty("processName")
private String processName;
@JsonProperty("processType")
private CommandType processType;
@JsonProperty("processState")
private ExecutionStatus processState;
@JsonProperty("recovery")
private Flag recovery;
@JsonProperty("runTimes")
private int runTimes;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@JsonProperty("processStartTime")
private Date processStartTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@JsonProperty("processEndTime")
private Date processEndTime;
@JsonProperty("processHost")
private String processHost;
@JsonProperty("taskId")
private int taskId;
@JsonProperty("taskName")
private String taskName;
@JsonProperty("event")
private AlertEvent event;
@JsonProperty("warnLevel")
private AlertWarnLevel warnLevel;
@JsonProperty("taskType")
private String taskType;
@JsonProperty("retryTimes")
private int retryTimes;
@JsonProperty("taskState")
private ExecutionStatus taskState;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@JsonProperty("taskStartTime")
private Date taskStartTime;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@JsonProperty("taskEndTime")
private Date taskEndTime;
@JsonProperty("taskHost")
private String taskHost;
@JsonProperty("logPath")
private String logPath;
private ProcessAlertContent(Builder builder) {
this.processId = builder.processId;
this.processName = builder.processName;
this.processType = builder.processType;
this.recovery = builder.recovery;
this.processState = builder.processState;
this.runTimes = builder.runTimes;
this.processStartTime = builder.processStartTime;
this.processEndTime = builder.processEndTime;
this.processHost = builder.processHost;
this.taskId = builder.taskId;
this.taskName = builder.taskName;
this.event = builder.event;
this.warnLevel = builder.warnLevel;
this.taskType = builder.taskType;
this.taskState = builder.taskState;
this.taskStartTime = builder.taskStartTime;
this.taskEndTime = builder.taskEndTime;
this.taskHost = builder.taskHost;
this.logPath = builder.logPath;
this.retryTimes = builder.retryTimes;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private int processId;
private String processName;
private CommandType processType;
private Flag recovery;
private ExecutionStatus processState;
private int runTimes;
private Date processStartTime;
private Date processEndTime;
private String processHost;
private int taskId;
private String taskName;
private AlertEvent event;
private AlertWarnLevel warnLevel;
private String taskType;
private int retryTimes;
private ExecutionStatus taskState;
private Date taskStartTime;
private Date taskEndTime;
private String taskHost;
private String logPath;
public Builder processId(int processId) {
this.processId = processId;
return this;
}
public Builder processName(String processName) {
this.processName = processName;
return this;
}
public Builder processType(CommandType processType) {
this.processType = processType;
return this;
}
public Builder recovery(Flag recovery) {
this.recovery = recovery;
return this;
}
public Builder processState(ExecutionStatus processState) {
this.processState = processState;
return this;
}
public Builder runTimes(int runTimes) {
this.runTimes = runTimes;
return this;
}
public Builder processStartTime(Date processStartTime) {
this.processStartTime = processStartTime;
return this;
}
public Builder processEndTime(Date processEndTime) {
this.processEndTime = processEndTime;
return this;
}
public Builder processHost(String processHost) {
this.processHost = processHost;
return this;
}
public Builder taskId(int taskId) {
this.taskId = taskId;
return this;
}
public Builder taskName(String taskName) {
this.taskName = taskName;
return this;
}
public Builder event(AlertEvent event) {
this.event = event;
return this;
}
public Builder warningLevel(AlertWarnLevel warnLevel) {
this.warnLevel = warnLevel;
return this;
}
public Builder taskType(String taskType) {
this.taskType = taskType;
return this;
}
public Builder retryTimes(int retryTimes) {
this.retryTimes = retryTimes;
return this;
}
public Builder taskState(ExecutionStatus taskState) {
this.taskState = taskState;
return this;
}
public Builder taskStartTime(Date taskStartTime) {
this.taskStartTime = taskStartTime;
return this;
}
public Builder taskEndTime(Date taskEndTime) {
this.taskEndTime = taskEndTime;
return this;
}
public Builder taskHost(String taskHost) {
this.taskHost = taskHost;
return this;
}
public Builder logPath(String logPath) {
this.logPath = logPath;
return this;
}
public ProcessAlertContent build() {
return new ProcessAlertContent(this);
}
}
}

85
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ServerAlertContent.java

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertWarnLevel;
import com.fasterxml.jackson.annotation.JsonProperty;
public class ServerAlertContent {
/**
* server type :master or worker
*/
@JsonProperty("type")
final String type;
@JsonProperty("host")
final String host;
@JsonProperty("event")
final AlertEvent event;
@JsonProperty("warningLevel")
final AlertWarnLevel warningLevel;
private ServerAlertContent(Builder builder) {
this.type = builder.type;
this.host = builder.host;
this.event = builder.event;
this.warningLevel = builder.warningLevel;
}
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private String type;
private String host;
private AlertEvent event;
private AlertWarnLevel warningLevel;
public Builder type(String type) {
this.type = type;
return this;
}
public Builder host(String host) {
this.host = host;
return this;
}
public Builder event(AlertEvent event) {
this.event = event;
return this;
}
public Builder warningLevel(AlertWarnLevel warningLevel) {
this.warningLevel = warningLevel;
return this;
}
public ServerAlertContent build() {
return new ServerAlertContent(this);
}
}
}

71
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/AlertManager.java

@ -21,18 +21,17 @@ import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.ProcessAlertContent;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
@ -98,39 +97,40 @@ public class AlertManager {
String res = "";
if (processInstance.getState().typeIsSuccess()) {
List<LinkedHashMap> successTaskList = new ArrayList<>(1);
LinkedHashMap<String, String> successTaskMap = new LinkedHashMap();
successTaskMap.put("id", String.valueOf(processInstance.getId()));
successTaskMap.put("name", processInstance.getName());
successTaskMap.put("job type", getCommandCnName(processInstance.getCommandType()));
successTaskMap.put("state", processInstance.getState().toString());
successTaskMap.put("recovery", processInstance.getRecovery().toString());
successTaskMap.put("run time", String.valueOf(processInstance.getRunTimes()));
successTaskMap.put("start time", DateUtils.dateToString(processInstance.getStartTime()));
successTaskMap.put("end time", DateUtils.dateToString(processInstance.getEndTime()));
successTaskMap.put("host", processInstance.getHost());
successTaskList.add(successTaskMap);
List<ProcessAlertContent> successTaskList = new ArrayList<>(1);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.processId(processInstance.getId())
.processName(processInstance.getName())
.processType(processInstance.getCommandType())
.processState(processInstance.getState())
.recovery(processInstance.getRecovery())
.runTimes(processInstance.getRunTimes())
.processStartTime(processInstance.getStartTime())
.processEndTime(processInstance.getEndTime())
.processHost(processInstance.getHost())
.build();
successTaskList.add(processAlertContent);
res = JSONUtils.toJsonString(successTaskList);
} else if (processInstance.getState().typeIsFailure()) {
List<LinkedHashMap> failedTaskList = new ArrayList<>();
List<ProcessAlertContent> failedTaskList = new ArrayList<>();
for (TaskInstance task : taskInstances) {
if (task.getState().typeIsSuccess()) {
continue;
}
LinkedHashMap<String, String> failedTaskMap = new LinkedHashMap();
failedTaskMap.put("process instance id", String.valueOf(processInstance.getId()));
failedTaskMap.put("process instance name", processInstance.getName());
failedTaskMap.put("task id", String.valueOf(task.getId()));
failedTaskMap.put("task name", task.getName());
failedTaskMap.put("task type", task.getTaskType());
failedTaskMap.put("task state", task.getState().toString());
failedTaskMap.put("task start time", DateUtils.dateToString(task.getStartTime()));
failedTaskMap.put("task end time", DateUtils.dateToString(task.getEndTime()));
failedTaskMap.put("host", task.getHost());
failedTaskMap.put("log path", task.getLogPath());
failedTaskList.add(failedTaskMap);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.processId(processInstance.getId())
.processName(processInstance.getName())
.taskId(task.getId())
.taskName(task.getName())
.taskType(task.getTaskType())
.taskState(task.getState())
.taskStartTime(task.getStartTime())
.taskEndTime(task.getEndTime())
.taskHost(task.getHost())
.logPath(task.getLogPath())
.build();
failedTaskList.add(processAlertContent);
}
res = JSONUtils.toJsonString(failedTaskList);
}
@ -147,15 +147,16 @@ public class AlertManager {
*/
private String getWorkerToleranceContent(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList) {
List<LinkedHashMap<String, String>> toleranceTaskInstanceList = new ArrayList<>();
List<ProcessAlertContent> toleranceTaskInstanceList = new ArrayList<>();
for (TaskInstance taskInstance : toleranceTaskList) {
LinkedHashMap<String, String> toleranceWorkerContentMap = new LinkedHashMap();
toleranceWorkerContentMap.put("process name", processInstance.getName());
toleranceWorkerContentMap.put("task name", taskInstance.getName());
toleranceWorkerContentMap.put("host", taskInstance.getHost());
toleranceWorkerContentMap.put("task retry times", String.valueOf(taskInstance.getRetryTimes()));
toleranceTaskInstanceList.add(toleranceWorkerContentMap);
ProcessAlertContent processAlertContent = ProcessAlertContent.newBuilder()
.processName(processInstance.getName())
.taskName(taskInstance.getName())
.taskHost(taskInstance.getHost())
.retryTimes(taskInstance.getRetryTimes())
.build();
toleranceTaskInstanceList.add(processAlertContent);
}
return JSONUtils.toJsonString(toleranceTaskInstanceList);
}

Loading…
Cancel
Save