Browse Source

[Improvement][Alert] server down will send repetitive message #5525 (#5529)

* [Improvement][Alert] server down will send repetitive message #5525

* add ut
pull/3/MERGE
ruanwenjun 4 years ago committed by GitHub
parent
commit
f8ecb536b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  2. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java
  3. 9
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml
  4. 17
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -44,6 +44,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.google.common.collect.Lists;
@Component @Component
public class AlertDao extends AbstractBaseDao { public class AlertDao extends AbstractBaseDao {
@ -99,15 +101,23 @@ public class AlertDao extends AbstractBaseDao {
* @param serverType serverType * @param serverType serverType
*/ */
public void sendServerStopedAlert(int alertGroupId, String host, String serverType) { public void sendServerStopedAlert(int alertGroupId, String host, String serverType) {
Alert alert = new Alert();
List<ServerAlertContent> serverAlertContents = new ArrayList<>(1);
ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder(). ServerAlertContent serverStopAlertContent = ServerAlertContent.newBuilder().
type(serverType).host(host).event(AlertEvent.SERVER_DOWN).warningLevel(AlertWarnLevel.SERIOUS). type(serverType)
.host(host)
.event(AlertEvent.SERVER_DOWN)
.warningLevel(AlertWarnLevel.SERIOUS).
build(); build();
serverAlertContents.add(serverStopAlertContent); String content = JSONUtils.toJsonString(Lists.newArrayList(serverStopAlertContent));
String content = JSONUtils.toJsonString(serverAlertContents);
Alert alert = new Alert();
alert.setTitle("Fault tolerance warning"); alert.setTitle("Fault tolerance warning");
saveTaskTimeoutAlert(alert, content, alertGroupId); alert.setAlertStatus(AlertStatus.WAIT_EXECUTION);
alert.setContent(content);
alert.setAlertGroupId(alertGroupId);
alert.setCreateTime(new Date());
alert.setUpdateTime(new Date());
// we use this method to avoid insert duplicate alert(issue #5525)
alertMapper.insertAlertWhenServerCrash(alert);
} }
/** /**

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AlertMapper.java

@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/** /**
* alert mapper interface * alert mapper interface
*/ */
@ -35,4 +38,10 @@ public interface AlertMapper extends BaseMapper<Alert> {
*/ */
List<Alert> listAlertByStatus(@Param("alertStatus") AlertStatus alertStatus); List<Alert> listAlertByStatus(@Param("alertStatus") AlertStatus alertStatus);
/**
* Insert server crash alert
* <p>This method will ensure that there is at most one unsent alert which has the same content in the database.
*/
void insertAlertWhenServerCrash(@Param("alert") Alert alert);
} }

9
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml

@ -29,4 +29,13 @@
from t_ds_alert from t_ds_alert
where alert_status = #{alertStatus} where alert_status = #{alertStatus}
</select> </select>
<insert id="insertAlertWhenServerCrash">
insert into t_ds_alert(title, content, alert_status, log, alertgroup_id, create_time, update_time)
SELECT #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.log}, #{alert.alertGroupId},
#{alert.createTime}, #{alert.updateTime}
from t_ds_alert
where content = #{alert.content} and alert_status = #{alert.alertStatus.code}
having count(*) = 0
</insert>
</mapper> </mapper>

17
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/AlertDaoTest.java

@ -24,7 +24,9 @@ import java.util.List;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.springframework.transaction.annotation.Transactional;
@Transactional
public class AlertDaoTest { public class AlertDaoTest {
@Test @Test
@ -42,4 +44,19 @@ public class AlertDaoTest {
Assert.assertNotNull(alerts); Assert.assertNotNull(alerts);
Assert.assertNotEquals(0, alerts.size()); Assert.assertNotEquals(0, alerts.size());
} }
@Test
public void testSendServerStopedAlert() {
AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
int alertGroupId = 1;
String host = "127.0.0.998165432";
String serverType = "Master";
alertDao.sendServerStopedAlert(alertGroupId, host, serverType);
alertDao.sendServerStopedAlert(alertGroupId, host, serverType);
long count = alertDao.listWaitExecutionAlert()
.stream()
.filter(alert -> alert.getContent().contains(host))
.count();
Assert.assertEquals(1L, count);
}
} }

Loading…
Cancel
Save