Browse Source

[Fix-9174] [Alert] Fix deduplication of alarm information (#9371)

* feat(issue #9174):

Fix-9174
3.0.0/version-upgrade
czeming 3 years ago committed by GitHub
parent
commit
706cdb6a8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
  2. 8
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  3. 52
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  4. 120
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java
  5. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java
  6. 19
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml
  7. 4
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  8. 4
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  9. 5
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  10. 21
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.6_schema/mysql/dolphinscheduler_ddl.sql
  11. 4
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.6_schema/postgresql/dolphinscheduler_ddl.sql
  12. 6
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java
  13. 27
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AlertMapperTest.java
  14. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

27
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.alert; package org.apache.dolphinscheduler.alert;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants; import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData; import org.apache.dolphinscheduler.alert.api.AlertData;
@ -34,16 +33,17 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service @Service
public final class AlertSenderService extends Thread { public final class AlertSenderService extends Thread {
@ -77,19 +77,19 @@ public final class AlertSenderService extends Thread {
} }
} }
public void send(List<Alert> alerts) { public void send(List<Alert> alerts) {
for (Alert alert : alerts) { for (Alert alert : alerts) {
//get alert group from alert //get alert group from alert
int alertGroupId = alert.getAlertGroupId(); int alertId = Optional.ofNullable(alert.getId()).orElse(0);
int alertGroupId = Optional.ofNullable(alert.getAlertGroupId()).orElse(0);
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
if (CollectionUtils.isEmpty(alertInstanceList)) { if (CollectionUtils.isEmpty(alertInstanceList)) {
logger.error("send alert msg fail,no bind plugin instance."); logger.error("send alert msg fail,no bind plugin instance.");
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", alert.getId()); alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "no bind plugin instance", alertId);
continue; continue;
} }
AlertData alertData = new AlertData(); AlertData alertData = new AlertData();
alertData.setId(alert.getId()) alertData.setId(alertId)
.setContent(alert.getContent()) .setContent(alert.getContent())
.setLog(alert.getLog()) .setLog(alert.getLog())
.setTitle(alert.getTitle()) .setTitle(alert.getTitle())
@ -101,7 +101,7 @@ public final class AlertSenderService extends Thread {
AlertResult alertResult = this.alertResultHandler(instance, alertData); AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) { if (alertResult != null) {
AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; AlertStatus sendStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alert.getId(),instance.getId()); alertDao.addAlertSendStatus(sendStatus,alertResult.getMessage(),alertId,instance.getId());
if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) { if (sendStatus.equals(AlertStatus.EXECUTION_SUCCESS)) {
sendSuccessCount++; sendSuccessCount++;
} }
@ -113,7 +113,7 @@ public final class AlertSenderService extends Thread {
} else if (sendSuccessCount < alertInstanceList.size()) { } else if (sendSuccessCount < alertInstanceList.size()) {
alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS; alertStatus = AlertStatus.EXECUTION_PARTIAL_SUCCESS;
} }
alertDao.updateAlert(alertStatus, "", alert.getId()); alertDao.updateAlert(alertStatus, "", alertId);
} }
} }
@ -213,7 +213,8 @@ public final class AlertSenderService extends Thread {
} }
if (!sendWarning) { if (!sendWarning) {
logger.info("Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}", pluginInstanceName, warningType.getCode(), alertData.getWarnType()); logger.info("Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}",
pluginInstanceName, warningType.getCode(), alertData.getWarnType());
return null; return null;
} }

8
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -24,6 +24,11 @@ import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import java.io.Closeable;
import javax.annotation.PreDestroy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.boot.WebApplicationType; import org.springframework.boot.WebApplicationType;
@ -33,9 +38,6 @@ import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import javax.annotation.PreDestroy;
import java.io.Closeable;
@SpringBootApplication @SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler") @ComponentScan("org.apache.dolphinscheduler")
public class AlertServer implements Closeable { public class AlertServer implements Closeable {

52
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -19,9 +19,9 @@ package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.enums.AlertEvent; import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.enums.AlertWarnLevel; import org.apache.dolphinscheduler.common.enums.AlertWarnLevel;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.AlertType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
@ -36,21 +36,31 @@ import org.apache.dolphinscheduler.dao.mapper.AlertMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper; import org.apache.dolphinscheduler.dao.mapper.AlertSendStatusMapper;
import org.apache.commons.lang.StringUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@Component @Component
public class AlertDao { public class AlertDao {
@Value("${alert.alarm-suppression.crash:60}")
private Integer crashAlarmSuppression;
@Autowired @Autowired
private AlertMapper alertMapper; private AlertMapper alertMapper;
@ -70,6 +80,8 @@ public class AlertDao {
* @return add alert result * @return add alert result
*/ */
public int addAlert(Alert alert) { public int addAlert(Alert alert) {
String sign = generateSign(alert);
alert.setSign(sign);
return alertMapper.insert(alert); return alertMapper.insert(alert);
} }
@ -82,13 +94,28 @@ public class AlertDao {
* @return update alert result * @return update alert result
*/ */
public int updateAlert(AlertStatus alertStatus, String log, int id) { public int updateAlert(AlertStatus alertStatus, String log, int id) {
Alert alert = alertMapper.selectById(id); Alert alert = new Alert();
alert.setId(id);
alert.setAlertStatus(alertStatus); alert.setAlertStatus(alertStatus);
alert.setUpdateTime(new Date()); alert.setUpdateTime(new Date());
alert.setLog(log); alert.setLog(log);
return alertMapper.updateById(alert); return alertMapper.updateById(alert);
} }
/**
* generate sign for alert
*
* @param alert alert
* @return sign's str
*/
private String generateSign (Alert alert) {
return Optional.of(alert)
.map(Alert::getContent)
.map(DigestUtils::sha1Hex)
.map(String::toLowerCase)
.orElse(StringUtils.EMPTY);
}
/** /**
* add AlertSendStatus * add AlertSendStatus
* *
@ -109,13 +136,13 @@ public class AlertDao {
} }
/** /**
* MasterServer or WorkerServer stoped * MasterServer or WorkerServer stopped
* *
* @param alertGroupId alertGroupId * @param alertGroupId alertGroupId
* @param host host * @param host host
* @param serverType serverType * @param serverType serverType
*/ */
public void sendServerStopedAlert(int alertGroupId, String host, String serverType) { public void sendServerStoppedAlert(int alertGroupId, String host, String serverType) {
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder(). ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().
type(serverType) type(serverType)
.host(host) .host(host)
@ -133,8 +160,11 @@ public class AlertDao {
alert.setCreateTime(new Date()); alert.setCreateTime(new Date());
alert.setUpdateTime(new Date()); alert.setUpdateTime(new Date());
alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING); alert.setAlertType(AlertType.FAULT_TOLERANCE_WARNING);
alert.setSign(generateSign(alert));
// we use this method to avoid insert duplicate alert(issue #5525) // we use this method to avoid insert duplicate alert(issue #5525)
alertMapper.insertAlertWhenServerCrash(alert); // we modified this method to optimize performance(issue #9174)
Date crashAlarmSuppressionStartTime = DateTime.now().plusMinutes(-crashAlarmSuppression).toDate();
alertMapper.insertAlertWhenServerCrash(alert, crashAlarmSuppressionStartTime);
} }
/** /**
@ -178,6 +208,8 @@ public class AlertDao {
alert.setContent(content); alert.setContent(content);
alert.setCreateTime(new Date()); alert.setCreateTime(new Date());
alert.setUpdateTime(new Date()); alert.setUpdateTime(new Date());
String sign = generateSign(alert);
alert.setSign(sign);
alertMapper.insert(alert); alertMapper.insert(alert);
} }
@ -220,7 +252,9 @@ public class AlertDao {
* List alerts that are pending for execution * List alerts that are pending for execution
*/ */
public List<Alert> listPendingAlerts() { public List<Alert> listPendingAlerts() {
return alertMapper.listAlertByStatus(AlertStatus.WAIT_EXECUTION); LambdaQueryWrapper<Alert> wrapper = new QueryWrapper<>(new Alert()).lambda()
.eq(Alert::getAlertStatus, AlertStatus.WAIT_EXECUTION);
return alertMapper.selectList(wrapper);
} }
/** /**
@ -265,4 +299,8 @@ public class AlertDao {
public void setAlertGroupMapper(AlertGroupMapper alertGroupMapper) { public void setAlertGroupMapper(AlertGroupMapper alertGroupMapper) {
this.alertGroupMapper = alertGroupMapper; this.alertGroupMapper = alertGroupMapper;
} }
public void setCrashAlarmSuppression(Integer crashAlarmSuppression) {
this.crashAlarmSuppression = crashAlarmSuppression;
}
} }

120
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
@ -36,7 +37,12 @@ public class Alert {
* primary key * primary key
*/ */
@TableId(value = "id", type = IdType.AUTO) @TableId(value = "id", type = IdType.AUTO)
private int id; private Integer id;
/**
* sign
*/
@TableField(value = "sign")
private String sign;
/** /**
* title * title
*/ */
@ -67,11 +73,11 @@ public class Alert {
@TableField(value = "log") @TableField(value = "log")
private String log; private String log;
/**g /**
* alertgroup_id * alertgroup_id
*/ */
@TableField("alertgroup_id") @TableField("alertgroup_id")
private int alertGroupId; private Integer alertGroupId;
/** /**
* create_time * create_time
@ -100,7 +106,7 @@ public class Alert {
* process_instance_id * process_instance_id
*/ */
@TableField("process_instance_id") @TableField("process_instance_id")
private int processInstanceId; private Integer processInstanceId;
/** /**
* alert_type * alert_type
@ -114,14 +120,22 @@ public class Alert {
public Alert() { public Alert() {
} }
public int getId() { public Integer getId() {
return id; return id;
} }
public void setId(int id) { public void setId(Integer id) {
this.id = id; this.id = id;
} }
public String getSign() {
return sign;
}
public void setSign(String sign) {
this.sign = sign;
}
public String getTitle() { public String getTitle() {
return title; return title;
} }
@ -154,11 +168,11 @@ public class Alert {
this.log = log; this.log = log;
} }
public int getAlertGroupId() { public Integer getAlertGroupId() {
return alertGroupId; return alertGroupId;
} }
public void setAlertGroupId(int alertGroupId) { public void setAlertGroupId(Integer alertGroupId) {
this.alertGroupId = alertGroupId; this.alertGroupId = alertGroupId;
} }
@ -210,11 +224,11 @@ public class Alert {
this.processDefinitionCode = processDefinitionCode; this.processDefinitionCode = processDefinitionCode;
} }
public int getProcessInstanceId() { public Integer getProcessInstanceId() {
return processInstanceId; return processInstanceId;
} }
public void setProcessInstanceId(int processInstanceId) { public void setProcessInstanceId(Integer processInstanceId) {
this.processInstanceId = processInstanceId; this.processInstanceId = processInstanceId;
} }
@ -234,78 +248,40 @@ public class Alert {
if (o == null || getClass() != o.getClass()) { if (o == null || getClass() != o.getClass()) {
return false; return false;
} }
Alert alert = (Alert) o; Alert alert = (Alert) o;
return Objects.equals(id, alert.id)
if (id != alert.id) { && Objects.equals(alertGroupId, alert.alertGroupId)
return false; && Objects.equals(sign, alert.sign)
} && Objects.equals(title, alert.title)
if (alertGroupId != alert.alertGroupId) { && Objects.equals(content, alert.content)
return false; && alertStatus == alert.alertStatus
} && warningType == alert.warningType
if (!title.equals(alert.title)) { && Objects.equals(log, alert.log)
return false; && Objects.equals(createTime, alert.createTime)
} && Objects.equals(updateTime, alert.updateTime)
if (!content.equals(alert.content)) { && Objects.equals(info, alert.info)
return false; ;
}
if (alertStatus != alert.alertStatus) {
return false;
}
if (!log.equals(alert.log)) {
return false;
}
if (!createTime.equals(alert.createTime)) {
return false;
}
if (warningType != alert.warningType) {
return false;
}
return updateTime.equals(alert.updateTime) && info.equals(alert.info);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = id; return Objects.hash(id, sign, title, content, alertStatus, warningType, log, alertGroupId, createTime, updateTime, info);
result = 31 * result + title.hashCode();
result = 31 * result + content.hashCode();
result = 31 * result + alertStatus.hashCode();
result = 31 * result + warningType.hashCode();
result = 31 * result + log.hashCode();
result = 31 * result + alertGroupId;
result = 31 * result + createTime.hashCode();
result = 31 * result + updateTime.hashCode();
result = 31 * result + info.hashCode();
return result;
} }
@Override @Override
public String toString() { public String toString() {
return "Alert{" return "Alert{"
+ "id=" + "id=" + id
+ id + ", sign='" + sign + '\''
+ ", title='" + ", title='" + title + '\''
+ title + '\'' + ", content='" + content + '\''
+ ", content='" + ", alertStatus=" + alertStatus
+ content + ", warningType=" + warningType
+ '\'' + ", log='" + log + '\''
+ ", alertStatus=" + ", alertGroupId=" + alertGroupId
+ alertStatus + ", createTime=" + createTime
+ ", warningType=" + ", updateTime=" + updateTime
+ warningType + ", info=" + info
+ ", log='"
+ log
+ '\''
+ ", alertGroupId="
+ alertGroupId
+ '\''
+ ", createTime="
+ createTime
+ ", updateTime="
+ updateTime
+ ", info="
+ info
+ '}'; + '}';
} }
} }

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java

@ -17,31 +17,25 @@
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.Date;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/** /**
* alert mapper interface * alert mapper interface
*/ */
@Mapper
public interface AlertMapper extends BaseMapper<Alert> { public interface AlertMapper extends BaseMapper<Alert> {
/**
* list alert by status
* @param alertStatus alertStatus
* @return alert list
*/
List<Alert> listAlertByStatus(@Param("alertStatus") AlertStatus alertStatus);
/** /**
* Insert server crash alert * Insert server crash alert
* <p>This method will ensure that there is at most one unsent alert which has the same content in the database. * <p>This method will ensure that there is at most one unsent alert which has the same content in the database.
*/ */
void insertAlertWhenServerCrash(@Param("alert") Alert alert); void insertAlertWhenServerCrash(@Param("alert") Alert alert, @Param("crashAlarmSuppressionStartTime") Date crashAlarmSuppressionStartTime);
} }

19
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml

@ -18,24 +18,13 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.AlertMapper"> <mapper namespace="org.apache.dolphinscheduler.dao.mapper.AlertMapper">
<sql id="baseSql">
id
, title, content, alert_status, log,
alertgroup_id, create_time, update_time, warning_type
</sql>
<select id="listAlertByStatus" resultType="org.apache.dolphinscheduler.dao.entity.Alert">
select
<include refid="baseSql"/>
from t_ds_alert
where alert_status = #{alertStatus}
</select>
<insert id="insertAlertWhenServerCrash"> <insert id="insertAlertWhenServerCrash">
insert into t_ds_alert(title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time) insert into t_ds_alert(sign, title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time)
SELECT #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.warningType.code}, #{alert.log}, #{alert.alertGroupId}, SELECT #{alert.sign}, #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.warningType.code},
#{alert.createTime}, #{alert.updateTime} #{alert.log}, #{alert.alertGroupId}, #{alert.createTime}, #{alert.updateTime}
from t_ds_alert from t_ds_alert
where content = #{alert.content} and alert_status = #{alert.alertStatus.code} where create_time >= #{crashAlarmSuppressionStartTime} and sign = #{alert.sign} and alert_status = #{alert.alertStatus.code}
having count(*) = 0 having count(*) = 0
</insert> </insert>
</mapper> </mapper>

4
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -272,6 +272,7 @@ CREATE TABLE t_ds_alert
( (
id int(11) NOT NULL AUTO_INCREMENT, id int(11) NOT NULL AUTO_INCREMENT,
title varchar(64) DEFAULT NULL, title varchar(64) DEFAULT NULL,
sign char(40) NOT NULL DEFAULT '',
content text, content text,
alert_status tinyint(4) DEFAULT '0', alert_status tinyint(4) DEFAULT '0',
warning_type tinyint(4) DEFAULT '2', warning_type tinyint(4) DEFAULT '2',
@ -283,7 +284,8 @@ CREATE TABLE t_ds_alert
process_definition_code bigint(20) DEFAULT NULL, process_definition_code bigint(20) DEFAULT NULL,
process_instance_id int(11) DEFAULT NULL, process_instance_id int(11) DEFAULT NULL,
alert_type int(11) DEFAULT NULL, alert_type int(11) DEFAULT NULL,
PRIMARY KEY (id) PRIMARY KEY (id),
KEY idx_sign (sign)
); );
-- ---------------------------- -- ----------------------------

4
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -279,6 +279,7 @@ DROP TABLE IF EXISTS `t_ds_alert`;
CREATE TABLE `t_ds_alert` ( CREATE TABLE `t_ds_alert` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key',
`title` varchar(64) DEFAULT NULL COMMENT 'title', `title` varchar(64) DEFAULT NULL COMMENT 'title',
`sign` char(40) NOT NULL DEFAULT '' COMMENT 'sign=sha1(content)',
`content` text COMMENT 'Message content (can be email, can be SMS. Mail is stored in JSON map, and SMS is string)', `content` text COMMENT 'Message content (can be email, can be SMS. Mail is stored in JSON map, and SMS is string)',
`alert_status` tinyint(4) DEFAULT '0' COMMENT '0:wait running,1:success,2:failed', `alert_status` tinyint(4) DEFAULT '0' COMMENT '0:wait running,1:success,2:failed',
`warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed', `warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed',
@ -291,7 +292,8 @@ CREATE TABLE `t_ds_alert` (
`process_instance_id` int(11) DEFAULT NULL COMMENT 'process_instance_id', `process_instance_id` int(11) DEFAULT NULL COMMENT 'process_instance_id',
`alert_type` int(11) DEFAULT NULL COMMENT 'alert_type', `alert_type` int(11) DEFAULT NULL COMMENT 'alert_type',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
KEY `idx_status` (`alert_status`) USING BTREE KEY `idx_status` (`alert_status`) USING BTREE,
KEY `idx_sign` (`sign`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
-- ---------------------------- -- ----------------------------

5
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -208,6 +208,7 @@ DROP TABLE IF EXISTS t_ds_alert;
CREATE TABLE t_ds_alert ( CREATE TABLE t_ds_alert (
id int NOT NULL , id int NOT NULL ,
title varchar(64) DEFAULT NULL , title varchar(64) DEFAULT NULL ,
sign varchar(40) NOT NULL DEFAULT '',
content text , content text ,
alert_status int DEFAULT '0' , alert_status int DEFAULT '0' ,
warning_type int DEFAULT '2' , warning_type int DEFAULT '2' ,
@ -220,9 +221,11 @@ CREATE TABLE t_ds_alert (
process_instance_id int DEFAULT NULL , process_instance_id int DEFAULT NULL ,
alert_type int DEFAULT NULL , alert_type int DEFAULT NULL ,
PRIMARY KEY (id) PRIMARY KEY (id)
) ; );
comment on column t_ds_alert.sign is 'sign=sha1(content)';
create index idx_status on t_ds_alert (alert_status); create index idx_status on t_ds_alert (alert_status);
create index idx_sign on t_ds_alert (sign);
-- --
-- Table structure for table t_ds_alertgroup -- Table structure for table t_ds_alertgroup

21
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.6_schema/mysql/dolphinscheduler_ddl.sql

@ -36,3 +36,24 @@ d//
delimiter ; delimiter ;
CALL uc_dolphin_T_t_ds_resources_R_full_name; CALL uc_dolphin_T_t_ds_resources_R_full_name;
DROP PROCEDURE uc_dolphin_T_t_ds_resources_R_full_name; DROP PROCEDURE uc_dolphin_T_t_ds_resources_R_full_name;
-- uc_dolphin_T_t_ds_alert_R_sign
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_alert_R_sign;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_alert_R_sign()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_alert'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='sign')
THEN
ALTER TABLE `t_ds_alert` ADD COLUMN `sign` char(40) NOT NULL DEFAULT '' COMMENT 'sign=sha1(content)' after `id`;
ALTER TABLE `t_ds_alert` ADD INDEX `idx_sign` (`sign`) USING BTREE;
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_alert_R_sign;
DROP PROCEDURE uc_dolphin_T_t_ds_alert_R_sign;

4
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.6_schema/postgresql/dolphinscheduler_ddl.sql

@ -32,6 +32,10 @@ BEGIN
--- alter column --- alter column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_resources ALTER COLUMN full_name Type varchar(128)'; EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_resources ALTER COLUMN full_name Type varchar(128)';
--- add column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_alert ADD COLUMN IF NOT EXISTS sign varchar(40) NOT NULL DEFAULT '''' ';
EXECUTE 'comment on column ' || quote_ident(v_schema) ||'.t_ds_alert.sign is ''sign=sha1(content)''';
return 'Success!'; return 'Success!';
exception when others then exception when others then
---Raise EXCEPTION '(%)',SQLERRM; ---Raise EXCEPTION '(%)',SQLERRM;

6
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java

@ -68,12 +68,12 @@ public class AlertDaoTest {
} }
@Test @Test
public void testSendServerStopedAlert() { public void testSendServerStoppedAlert() {
int alertGroupId = 1; int alertGroupId = 1;
String host = "127.0.0.998165432"; String host = "127.0.0.998165432";
String serverType = "Master"; String serverType = "Master";
alertDao.sendServerStopedAlert(alertGroupId, host, serverType); alertDao.sendServerStoppedAlert(alertGroupId, host, serverType);
alertDao.sendServerStopedAlert(alertGroupId, host, serverType); alertDao.sendServerStoppedAlert(alertGroupId, host, serverType);
long count = alertDao.listPendingAlerts() long count = alertDao.listPendingAlerts()
.stream() .stream()
.filter(alert -> alert.getContent().contains(host)) .filter(alert -> alert.getContent().contains(host))

27
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AlertMapperTest.java

@ -28,8 +28,9 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.commons.codec.digest.DigestUtils;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.junit.Test; import org.junit.Test;
@ -99,26 +100,6 @@ public class AlertMapperTest extends BaseDaoTest {
assertNull(actualAlert); assertNull(actualAlert);
} }
/**
* test list alert by status
*/
@Test
public void testListAlertByStatus() {
Integer count = 10;
AlertStatus waitExecution = AlertStatus.WAIT_EXECUTION;
Map<Integer, Alert> expectedAlertMap = createAlertMap(count, waitExecution);
List<Alert> actualAlerts = alertMapper.listAlertByStatus(waitExecution);
for (Alert actualAlert : actualAlerts) {
Alert expectedAlert = expectedAlertMap.get(actualAlert.getId());
if (expectedAlert != null) {
assertEquals(expectedAlert, actualAlert);
}
}
}
/** /**
* create alert map * create alert map
* *
@ -153,9 +134,11 @@ public class AlertMapperTest extends BaseDaoTest {
* @return alert * @return alert
*/ */
private Alert createAlert(AlertStatus alertStatus) { private Alert createAlert(AlertStatus alertStatus) {
String content = "[{'type':'WORKER','host':'192.168.xx.xx','event':'server down','warning level':'serious'}]";
Alert alert = new Alert(); Alert alert = new Alert();
alert.setTitle("test alert"); alert.setTitle("test alert");
alert.setContent("[{'type':'WORKER','host':'192.168.xx.xx','event':'server down','warning level':'serious'}]"); alert.setContent(content);
alert.setSign(DigestUtils.sha1Hex(content));
alert.setAlertStatus(alertStatus); alert.setAlertStatus(alertStatus);
alert.setWarningType(WarningType.FAILURE); alert.setWarningType(WarningType.FAILURE);
alert.setLog("success"); alert.setLog("success");

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -246,7 +246,7 @@ public class ServerNodeManager implements InitializingBean {
String group = parseGroup(path); String group = parseGroup(path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group); Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes); syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER"); alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) { } else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data); logger.debug("worker group node : {} update, data: {}", path, data);
String group = parseGroup(path); String group = parseGroup(path);
@ -296,7 +296,7 @@ public class ServerNodeManager implements InitializingBean {
if (type.equals(Type.REMOVE)) { if (type.equals(Type.REMOVE)) {
logger.info("master node : {} down.", path); logger.info("master node : {} down.", path);
updateMasterNodes(); updateMasterNodes();
alertDao.sendServerStopedAlert(1, path, "MASTER"); alertDao.sendServerStoppedAlert(1, path, "MASTER");
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("MasterNodeListener capture data change and get data failed.", ex); logger.error("MasterNodeListener capture data change and get data failed.", ex);

Loading…
Cancel
Save