Browse Source

[DS-7992][feat] Alert module judging strategy (#8636)

3.0.0/version-upgrade
wangyang 3 years ago committed by GitHub
parent
commit
0d8079a816
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertConstants.java
  2. 29
      dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java
  3. 23
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
  4. 3
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
  5. 68
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java
  6. 3
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
  7. 12
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
  8. 16
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WarningType.java
  9. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java
  10. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java
  11. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml
  12. 3
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  13. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  14. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  15. 3
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql
  16. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/postgresql/dolphinscheduler_ddl.sql
  17. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AlertMapperTest.java
  18. 13
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
  19. 3
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java
  20. 10
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
  21. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java
  22. 15
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java
  23. 11
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

4
dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertConstants.java

@ -27,6 +27,10 @@ public final class AlertConstants {
public static final String NAME_SHOW_TYPE = "showType"; public static final String NAME_SHOW_TYPE = "showType";
public static final String WARNING_TYPE = "warningType";
public static final String NAME_WARNING_TYPE = "WarningType";
private AlertConstants() { private AlertConstants() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
} }

29
dolphinscheduler-alert/dolphinscheduler-alert-api/src/main/java/org/apache/dolphinscheduler/alert/api/AlertData.java

@ -24,12 +24,14 @@ public class AlertData {
private String title; private String title;
private String content; private String content;
private String log; private String log;
private int warnType;
public AlertData(int id, String title, String content, String log) { public AlertData(int id, String title, String content, String log, int warnType) {
this.id = id; this.id = id;
this.title = title; this.title = title;
this.content = content; this.content = content;
this.log = log; this.log = log;
this.warnType = warnType;
} }
public AlertData() { public AlertData() {
@ -75,6 +77,14 @@ public class AlertData {
return this; return this;
} }
public int getWarnType() {
return warnType;
}
public void setWarnType(int warnType) {
this.warnType = warnType;
}
public boolean equals(final Object o) { public boolean equals(final Object o) {
if (o == this) { if (o == this) {
return true; return true;
@ -89,6 +99,9 @@ public class AlertData {
if (this.getId() != other.getId()) { if (this.getId() != other.getId()) {
return false; return false;
} }
if (this.getWarnType() != other.getWarnType()) {
return false;
}
final Object this$title = this.getTitle(); final Object this$title = this.getTitle();
final Object other$title = other.getTitle(); final Object other$title = other.getTitle();
if (this$title == null ? other$title != null : !this$title.equals(other$title)) { if (this$title == null ? other$title != null : !this$title.equals(other$title)) {
@ -115,6 +128,7 @@ public class AlertData {
final int PRIME = 59; final int PRIME = 59;
int result = 1; int result = 1;
result = result * PRIME + this.getId(); result = result * PRIME + this.getId();
result = result * PRIME + this.getWarnType();
final Object $title = this.getTitle(); final Object $title = this.getTitle();
result = result * PRIME + ($title == null ? 43 : $title.hashCode()); result = result * PRIME + ($title == null ? 43 : $title.hashCode());
final Object $content = this.getContent(); final Object $content = this.getContent();
@ -125,7 +139,7 @@ public class AlertData {
} }
public String toString() { public String toString() {
return "AlertData(id=" + this.getId() + ", title=" + this.getTitle() + ", content=" + this.getContent() + ", log=" + this.getLog() + ")"; return "AlertData(id=" + this.getId() + ", title=" + this.getTitle() + ", content=" + this.getContent() + ", log=" + this.getLog() + ", warnType=" + this.getWarnType() + ")";
} }
public static class AlertDataBuilder { public static class AlertDataBuilder {
@ -133,6 +147,7 @@ public class AlertData {
private String title; private String title;
private String content; private String content;
private String log; private String log;
private int warnType;
AlertDataBuilder() { AlertDataBuilder() {
} }
@ -157,12 +172,18 @@ public class AlertData {
return this; return this;
} }
public AlertDataBuilder warnType(int warnType) {
this.warnType = warnType;
return this;
}
public AlertData build() { public AlertData build() {
return new AlertData(id, title, content, log); return new AlertData(id, title, content, log, warnType);
} }
public String toString() { public String toString() {
return "AlertData.AlertDataBuilder(id=" + this.id + ", title=" + this.title + ", content=" + this.content + ", log=" + this.log + ")"; return "AlertData.AlertDataBuilder(id=" + this.id + ", title=" + this.title + ", content=" + this.content + ", log=" + this.log + ", warnType=" + this.warnType + ")";
} }
} }
} }

23
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java

@ -21,12 +21,18 @@ import static java.lang.String.format;
import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertChannelFactory; import org.apache.dolphinscheduler.alert.api.AlertChannelFactory;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.common.enums.PluginType; import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine; import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer; import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
import org.apache.dolphinscheduler.spi.params.base.ParamsOptions;
import org.apache.dolphinscheduler.spi.params.base.PluginParams; import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.params.base.Validate;
import org.apache.dolphinscheduler.spi.params.radio.RadioParam;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -51,10 +57,23 @@ public final class AlertPluginManager {
private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>(); private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
private final PluginParams warningTypeParams = getWarningTypeParams();
public AlertPluginManager(PluginDao pluginDao) { public AlertPluginManager(PluginDao pluginDao) {
this.pluginDao = pluginDao; this.pluginDao = pluginDao;
} }
public PluginParams getWarningTypeParams() {
return
RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
}
@EventListener @EventListener
public void installPlugin(ApplicationReadyEvent readyEvent) { public void installPlugin(ApplicationReadyEvent readyEvent) {
final Set<String> names = new HashSet<>(); final Set<String> names = new HashSet<>();
@ -72,7 +91,9 @@ public final class AlertPluginManager {
logger.info("Registered alert plugin: {}", name); logger.info("Registered alert plugin: {}", name);
final List<PluginParams> params = factory.params(); final List<PluginParams> params = new ArrayList<>(factory.params());
params.add(0, warningTypeParams);
final String paramsJson = PluginParamsTransfer.transferParamsToJson(params); final String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
final PluginDefine pluginDefine = new PluginDefine(name, PluginType.ALERT.getDesc(), paramsJson); final PluginDefine pluginDefine = new PluginDefine(name, PluginType.ALERT.getDesc(), paramsJson);

3
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java

@ -54,7 +54,8 @@ public final class AlertRequestProcessor implements NettyRequestProcessor {
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler( AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(
alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getGroupId(),
alertSendRequestCommand.getTitle(), alertSendRequestCommand.getTitle(),
alertSendRequestCommand.getContent()); alertSendRequestCommand.getContent(),
alertSendRequestCommand.getWarnType());
channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque())); channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque()));
} }
} }

68
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java

@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.alert; package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData; import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo; import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult; import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
@ -66,13 +68,17 @@ public final class AlertSender {
alertData.setId(alert.getId()) alertData.setId(alert.getId())
.setContent(alert.getContent()) .setContent(alert.getContent())
.setLog(alert.getLog()) .setLog(alert.getLog())
.setTitle(alert.getTitle()); .setTitle(alert.getTitle())
.setTitle(alert.getTitle())
.setWarnType(alert.getWarningType().getCode());
for (AlertPluginInstance instance : alertInstanceList) { for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData); AlertResult alertResult = this.alertResultHandler(instance, alertData);
AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE; if (alertResult != null) {
alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId()); AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId());
}
} }
} }
@ -86,11 +92,12 @@ public final class AlertSender {
* @param content content * @param content content
* @return AlertSendResponseCommand * @return AlertSendResponseCommand
*/ */
public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content) { public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content , int warnType) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = new AlertData(); AlertData alertData = new AlertData();
alertData.setContent(content) alertData.setContent(content)
.setTitle(title); .setTitle(title)
.setWarnType(warnType);
boolean sendResponseStatus = true; boolean sendResponseStatus = true;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>(); List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
@ -107,10 +114,12 @@ public final class AlertSender {
for (AlertPluginInstance instance : alertInstanceList) { for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData); AlertResult alertResult = this.alertResultHandler(instance, alertData);
AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult( if (alertResult != null) {
Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage()); AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus(); Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
sendResponseResults.add(alertSendResponseResult); sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
sendResponseResults.add(alertSendResponseResult);
}
} }
return new AlertSendResponseCommand(sendResponseStatus, sendResponseResults); return new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
@ -135,9 +144,48 @@ public final class AlertSender {
return alertResultExtend; return alertResultExtend;
} }
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();
if(paramsMap != null){
instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
}
WarningType warningType = WarningType.of(instanceWarnType);
if (warningType == null) {
String message = String.format("Alert Plugin %s send error : plugin warnType is null", pluginInstanceName);
alertResultExtend.setStatus(String.valueOf(false));
alertResultExtend.setMessage(message);
logger.error("Alert Plugin {} send error : plugin warnType is null", pluginInstanceName);
return alertResultExtend;
}
boolean sendWarning = false;
switch (warningType) {
case ALL:
sendWarning = true;
break;
case SUCCESS:
if (alertData.getWarnType() == WarningType.SUCCESS.getCode()) {
sendWarning = true;
}
break;
case FAILURE:
if (alertData.getWarnType() == WarningType.FAILURE.getCode()) {
sendWarning = true;
}
break;
default:
}
if (!sendWarning) {
logger.info("Alert Plugin {} send ignore warning type not match: plugin warning type is {}, alert data warning type is {}", pluginInstanceName, warningType.getCode(), alertData.getWarnType());
return null;
}
AlertInfo alertInfo = new AlertInfo(); AlertInfo alertInfo = new AlertInfo();
alertInfo.setAlertData(alertData); alertInfo.setAlertData(alertData);
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
alertInfo.setAlertParams(paramsMap); alertInfo.setAlertParams(paramsMap);
AlertResult alertResult; AlertResult alertResult;
try { try {

3
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java

@ -21,6 +21,7 @@ import static org.mockito.Mockito.mock;
import org.apache.dolphinscheduler.alert.AlertRequestProcessor; import org.apache.dolphinscheduler.alert.AlertRequestProcessor;
import org.apache.dolphinscheduler.alert.AlertSender; import org.apache.dolphinscheduler.alert.AlertSender;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -44,7 +45,7 @@ public class AlertRequestProcessorTest {
@Test @Test
public void testProcess() { public void testProcess() {
Channel channel = mock(Channel.class); Channel channel = mock(Channel.class);
AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1, "title", "content"); AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1, "title", "content", WarningType.FAILURE.getCode());
Command reqCommand = alertSendRequestCommand.convert2Command(); Command reqCommand = alertSendRequestCommand.convert2Command();
Assert.assertEquals(CommandType.ALERT_SEND_REQUEST, reqCommand.getType()); Assert.assertEquals(CommandType.ALERT_SEND_REQUEST, reqCommand.getType());
alertRequestProcessor.process(channel, reqCommand); alertRequestProcessor.process(channel, reqCommand);

12
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.alert.AlertPluginManager;
import org.apache.dolphinscheduler.alert.AlertSender; import org.apache.dolphinscheduler.alert.AlertSender;
import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertResult; import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
@ -69,7 +70,7 @@ public class AlertSenderTest {
//1.alert instance does not exist //1.alert instance does not exist
when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null); when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content); AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -88,7 +89,7 @@ public class AlertSenderTest {
PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null); PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null);
when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine); when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content); alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -98,7 +99,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(null); when(alertChannelMock.process(Mockito.any())).thenReturn(null);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content); alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -110,7 +111,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content); alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -122,7 +123,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content); alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertTrue(alertSendResponseCommand.getResStatus()); Assert.assertTrue(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -139,6 +140,7 @@ public class AlertSenderTest {
alert.setAlertGroupId(alertGroupId); alert.setAlertGroupId(alertGroupId);
alert.setTitle(title); alert.setTitle(title);
alert.setContent(content); alert.setContent(content);
alert.setWarningType(WarningType.FAILURE);
alertList.add(alert); alertList.add(alert);
alertSender = new AlertSender(alertDao, alertPluginManager); alertSender = new AlertSender(alertDao, alertPluginManager);

16
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WarningType.java

@ -17,7 +17,13 @@
package org.apache.dolphinscheduler.common.enums; package org.apache.dolphinscheduler.common.enums;
import static java.util.stream.Collectors.toMap;
import java.util.Arrays;
import java.util.Map;
import com.baomidou.mybatisplus.annotation.EnumValue; import com.baomidou.mybatisplus.annotation.EnumValue;
import com.google.common.base.Functions;
/** /**
* types for whether to send warning when process ending; * types for whether to send warning when process ending;
@ -50,4 +56,14 @@ public enum WarningType {
public String getDescp() { public String getDescp() {
return descp; return descp;
} }
private static final Map<String, WarningType> WARNING_TYPE_MAP =
Arrays.stream(WarningType.values()).collect(toMap(WarningType::getDescp, Functions.identity()));
public static WarningType of(String descp) {
if (WARNING_TYPE_MAP.containsKey(descp)) {
return WARNING_TYPE_MAP.get(descp);
}
return null;
}
} }

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/AlertDao.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.enums.AlertEvent; import org.apache.dolphinscheduler.common.enums.AlertEvent;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.AlertWarnLevel; import org.apache.dolphinscheduler.common.enums.AlertWarnLevel;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
@ -101,6 +102,7 @@ public class AlertDao {
Alert alert = new Alert(); Alert alert = new Alert();
alert.setTitle("Fault tolerance warning"); alert.setTitle("Fault tolerance warning");
alert.setWarningType(WarningType.FAILURE);
alert.setAlertStatus(AlertStatus.WAIT_EXECUTION); alert.setAlertStatus(AlertStatus.WAIT_EXECUTION);
alert.setContent(content); alert.setContent(content);
alert.setAlertGroupId(alertGroupId); alert.setAlertGroupId(alertGroupId);
@ -140,6 +142,7 @@ public class AlertDao {
private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) { private void saveTaskTimeoutAlert(Alert alert, String content, int alertGroupId) {
alert.setAlertGroupId(alertGroupId); alert.setAlertGroupId(alertGroupId);
alert.setWarningType(WarningType.FAILURE);
alert.setContent(content); alert.setContent(content);
alert.setCreateTime(new Date()); alert.setCreateTime(new Date());
alert.setUpdateTime(new Date()); alert.setUpdateTime(new Date());

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Alert.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.dao.entity; package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -52,6 +53,13 @@ public class Alert {
*/ */
@TableField(value = "alert_status") @TableField(value = "alert_status")
private AlertStatus alertStatus; private AlertStatus alertStatus;
/**
* warning_type
*/
@TableField(value = "warning_type")
private WarningType warningType;
/** /**
* log * log
*/ */
@ -151,6 +159,14 @@ public class Alert {
this.info = info; this.info = info;
} }
public WarningType getWarningType() {
return warningType;
}
public void setWarningType(WarningType warningType) {
this.warningType = warningType;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
@ -183,6 +199,9 @@ public class Alert {
if (!createTime.equals(alert.createTime)) { if (!createTime.equals(alert.createTime)) {
return false; return false;
} }
if (warningType != alert.warningType) {
return false;
}
return updateTime.equals(alert.updateTime) && info.equals(alert.info); return updateTime.equals(alert.updateTime) && info.equals(alert.info);
} }
@ -193,6 +212,7 @@ public class Alert {
result = 31 * result + title.hashCode(); result = 31 * result + title.hashCode();
result = 31 * result + content.hashCode(); result = 31 * result + content.hashCode();
result = 31 * result + alertStatus.hashCode(); result = 31 * result + alertStatus.hashCode();
result = 31 * result + warningType.hashCode();
result = 31 * result + log.hashCode(); result = 31 * result + log.hashCode();
result = 31 * result + alertGroupId; result = 31 * result + alertGroupId;
result = 31 * result + createTime.hashCode(); result = 31 * result + createTime.hashCode();
@ -213,6 +233,8 @@ public class Alert {
+ '\'' + '\''
+ ", alertStatus=" + ", alertStatus="
+ alertStatus + alertStatus
+ ", warningType="
+ warningType
+ ", log='" + ", log='"
+ log + log
+ '\'' + '\''

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AlertMapper.xml

@ -21,7 +21,7 @@
<sql id="baseSql"> <sql id="baseSql">
id id
, title, content, alert_status, log, , title, content, alert_status, log,
alertgroup_id, create_time, update_time alertgroup_id, create_time, update_time, warning_type
</sql> </sql>
<select id="listAlertByStatus" resultType="org.apache.dolphinscheduler.dao.entity.Alert"> <select id="listAlertByStatus" resultType="org.apache.dolphinscheduler.dao.entity.Alert">
select select
@ -31,8 +31,8 @@
</select> </select>
<insert id="insertAlertWhenServerCrash"> <insert id="insertAlertWhenServerCrash">
insert into t_ds_alert(title, content, alert_status, log, alertgroup_id, create_time, update_time) insert into t_ds_alert(title, content, alert_status, warning_type, log, alertgroup_id, create_time, update_time)
SELECT #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.log}, #{alert.alertGroupId}, SELECT #{alert.title}, #{alert.content}, #{alert.alertStatus.code}, #{alert.warningType.code}, #{alert.log}, #{alert.alertGroupId},
#{alert.createTime}, #{alert.updateTime} #{alert.createTime}, #{alert.updateTime}
from t_ds_alert from t_ds_alert
where content = #{alert.content} and alert_status = #{alert.alertStatus.code} where content = #{alert.content} and alert_status = #{alert.alertStatus.code}

3
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -274,6 +274,7 @@ CREATE TABLE t_ds_alert
title varchar(64) DEFAULT NULL, title varchar(64) DEFAULT NULL,
content text, content text,
alert_status tinyint(4) DEFAULT '0', alert_status tinyint(4) DEFAULT '0',
warning_type tinyint(4) DEFAULT '2',
log text, log text,
alertgroup_id int(11) DEFAULT NULL, alertgroup_id int(11) DEFAULT NULL,
create_time datetime DEFAULT NULL, create_time datetime DEFAULT NULL,
@ -1908,4 +1909,4 @@ CREATE TABLE t_ds_k8s_namespace (
-- Records of t_ds_k8s_namespace -- Records of t_ds_k8s_namespace
-- ---------------------------- -- ----------------------------
INSERT INTO t_ds_k8s_namespace INSERT INTO t_ds_k8s_namespace
VALUES (1, 10000, 'default', 99, 'owner',1,NULL,1,'test',NULL,'default',null,null); VALUES (1, 10000, 'default', 99, 'owner',1,NULL,1,'test',NULL,'default',null,null);

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -281,6 +281,7 @@ CREATE TABLE `t_ds_alert` (
`title` varchar(64) DEFAULT NULL COMMENT 'title', `title` varchar(64) DEFAULT NULL COMMENT 'title',
`content` text COMMENT 'Message content (can be email, can be SMS. Mail is stored in JSON map, and SMS is string)', `content` text COMMENT 'Message content (can be email, can be SMS. Mail is stored in JSON map, and SMS is string)',
`alert_status` tinyint(4) DEFAULT '0' COMMENT '0:wait running,1:success,2:failed', `alert_status` tinyint(4) DEFAULT '0' COMMENT '0:wait running,1:success,2:failed',
`warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed',
`log` text COMMENT 'log', `log` text COMMENT 'log',
`alertgroup_id` int(11) DEFAULT NULL COMMENT 'alert group id', `alertgroup_id` int(11) DEFAULT NULL COMMENT 'alert group id',
`create_time` datetime DEFAULT NULL COMMENT 'create time', `create_time` datetime DEFAULT NULL COMMENT 'create time',

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -210,6 +210,7 @@ CREATE TABLE t_ds_alert (
title varchar(64) DEFAULT NULL , title varchar(64) DEFAULT NULL ,
content text , content text ,
alert_status int DEFAULT '0' , alert_status int DEFAULT '0' ,
warning_type int DEFAULT '2' ,
log text , log text ,
alertgroup_id int DEFAULT NULL , alertgroup_id int DEFAULT NULL ,
create_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL ,

3
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/mysql/dolphinscheduler_ddl.sql

@ -30,6 +30,7 @@ alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL COMMEN
alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0' COMMENT 'task group id' AFTER `task_group_id`; alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT '0' COMMENT 'task group id' AFTER `task_group_id`;
ALTER TABLE `t_ds_user` ADD COLUMN `time_zone` varchar(32) DEFAULT NULL COMMENT 'time zone'; ALTER TABLE `t_ds_user` ADD COLUMN `time_zone` varchar(32) DEFAULT NULL COMMENT 'time zone';
ALTER TABLE `t_ds_alert` ADD COLUMN `warning_type` tinyint(4) DEFAULT '2' COMMENT '1 process is successfully, 2 process/task is failed';
ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE; ALTER TABLE `t_ds_alert` ADD INDEX `idx_status` (`alert_status`) USING BTREE;
@ -209,4 +210,4 @@ CREATE TABLE `t_ds_k8s_namespace` (
`update_time` datetime DEFAULT NULL COMMENT 'update time', `update_time` datetime DEFAULT NULL COMMENT 'update time',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`) UNIQUE KEY `k8s_namespace_unique` (`namespace`,`k8s`)
) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8; ) ENGINE= INNODB AUTO_INCREMENT= 1 DEFAULT CHARSET= utf8;

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/2.1.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -46,6 +46,8 @@ EXECUTE 'CREATE INDEX IF NOT EXISTS idx_task_definition_log_code_version ON ' ||
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_user ADD COLUMN IF NOT EXISTS "time_zone" varchar(32) DEFAULT NULL'; EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_user ADD COLUMN IF NOT EXISTS "time_zone" varchar(32) DEFAULT NULL';
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_alert ADD COLUMN IF NOT EXISTS "warning_type" int DEFAULT 2';
EXECUTE 'CREATE TABLE IF NOT EXISTS' || quote_ident(v_schema) ||'."t_ds_dq_comparison_type" ( EXECUTE 'CREATE TABLE IF NOT EXISTS' || quote_ident(v_schema) ||'."t_ds_dq_comparison_type" (
id serial NOT NULL, id serial NOT NULL,
"type" varchar NOT NULL, "type" varchar NOT NULL,

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AlertMapperTest.java

@ -23,6 +23,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.BaseDaoTest;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
@ -156,6 +157,7 @@ public class AlertMapperTest extends BaseDaoTest {
alert.setTitle("test alert"); alert.setTitle("test alert");
alert.setContent("[{'type':'WORKER','host':'192.168.xx.xx','event':'server down','warning level':'serious'}]"); alert.setContent("[{'type':'WORKER','host':'192.168.xx.xx','event':'server down','warning level':'serious'}]");
alert.setAlertStatus(alertStatus); alert.setAlertStatus(alertStatus);
alert.setWarningType(WarningType.FAILURE);
alert.setLog("success"); alert.setLog("success");
alert.setCreateTime(DateUtils.getCurrentDate()); alert.setCreateTime(DateUtils.getCurrentDate());
alert.setUpdateTime(DateUtils.getCurrentDate()); alert.setUpdateTime(DateUtils.getCurrentDate());
@ -163,4 +165,4 @@ public class AlertMapperTest extends BaseDaoTest {
alertMapper.insert(alert); alertMapper.insert(alert);
return alert; return alert;
} }
} }

13
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java

@ -31,6 +31,8 @@ public class AlertSendRequestCommand implements Serializable {
private String content; private String content;
private int warnType;
public int getGroupId() { public int getGroupId() {
return groupId; return groupId;
} }
@ -55,14 +57,23 @@ public class AlertSendRequestCommand implements Serializable {
this.content = content; this.content = content;
} }
public int getWarnType() {
return warnType;
}
public void setWarnType(int warnType) {
this.warnType = warnType;
}
public AlertSendRequestCommand(){ public AlertSendRequestCommand(){
} }
public AlertSendRequestCommand(int groupId, String title, String content) { public AlertSendRequestCommand(int groupId, String title, String content, int warnType) {
this.groupId = groupId; this.groupId = groupId;
this.title = title; this.title = title;
this.content = content; this.content = content;
this.warnType = warnType;
} }
/** /**

3
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.remote.command.alert; package org.apache.dolphinscheduler.remote.command.alert;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
@ -30,7 +31,7 @@ public class AlertSendRequestCommandTest {
int groupId = 1; int groupId = 1;
String title = "test-title"; String title = "test-title";
String content = "test-content"; String content = "test-content";
AlertSendRequestCommand requestCommand = new AlertSendRequestCommand(groupId,title,content); AlertSendRequestCommand requestCommand = new AlertSendRequestCommand(groupId,title,content,WarningType.FAILURE.getCode());
Command command = requestCommand.convert2Command(); Command command = requestCommand.convert2Command();
Assert.assertEquals(CommandType.ALERT_SEND_REQUEST,command.getType()); Assert.assertEquals(CommandType.ALERT_SEND_REQUEST,command.getType());
AlertSendRequestCommand verifyCommand = new AlertSendRequestCommand(); AlertSendRequestCommand verifyCommand = new AlertSendRequestCommand();

10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java

@ -84,8 +84,8 @@ public class AlertClientService implements AutoCloseable {
* @param content * @param content
* @return * @return
*/ */
public AlertSendResponseCommand sendAlert(int groupId, String title, String content) { public AlertSendResponseCommand sendAlert(int groupId, String title, String content, int strategy) {
return this.sendAlert(this.host,this.port,groupId,title,content); return this.sendAlert(this.host,this.port,groupId,title,content,strategy);
} }
/** /**
@ -97,9 +97,9 @@ public class AlertClientService implements AutoCloseable {
* @param content content * @param content content
* @return AlertSendResponseCommand * @return AlertSendResponseCommand
*/ */
public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content) { public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content, int strategy) {
logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} ", host, port, groupId, title); logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} , strategy : {} ", host, port, groupId, title, strategy);
AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content); AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content, strategy);
final Host address = new Host(host, port); final Host address = new Host(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ProcessAlertManager.java

@ -193,6 +193,7 @@ public class ProcessAlertManager {
alert.setTitle("worker fault tolerance"); alert.setTitle("worker fault tolerance");
String content = getWorkerToleranceContent(processInstance, toleranceTaskList); String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
alert.setContent(content); alert.setContent(content);
alert.setWarningType(WarningType.FAILURE);
alert.setCreateTime(new Date()); alert.setCreateTime(new Date());
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1 : processInstance.getWarningGroupId());
alertDao.addAlert(alert); alertDao.addAlert(alert);
@ -223,6 +224,7 @@ public class ProcessAlertManager {
String cmdName = getCommandCnName(processInstance.getCommandType()); String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "success" : "failed"; String success = processInstance.getState().typeIsSuccess() ? "success" : "failed";
alert.setTitle(cmdName + " " + success); alert.setTitle(cmdName + " " + success);
alert.setWarningType(processInstance.getState().typeIsSuccess() ? WarningType.SUCCESS : WarningType.FAILURE);
String content = getContentProcessInstance(processInstance, taskInstances,projectUser); String content = getContentProcessInstance(processInstance, taskInstances,projectUser);
alert.setContent(content); alert.setContent(content);
alert.setAlertGroupId(processInstance.getWarningGroupId()); alert.setAlertGroupId(processInstance.getWarningGroupId());

15
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.alert; package org.apache.dolphinscheduler.service.alert;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand; import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
@ -67,10 +68,10 @@ public class AlertClientServiceTest {
String content = "test-content"; String content = "test-content";
//1.alter server does not exist //1.alter server does not exist
AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content, WarningType.FAILURE.getCode());
Assert.assertNull(alertSendResponseCommand); Assert.assertNull(alertSendResponseCommand);
AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(groupId,title,content); AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(groupId,title,content, WarningType.FAILURE.getCode());
Command reqCommand = alertSendRequestCommand.convert2Command(); Command reqCommand = alertSendRequestCommand.convert2Command();
boolean sendResponseStatus; boolean sendResponseStatus;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>(); List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
@ -86,7 +87,7 @@ public class AlertClientServiceTest {
Command resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque()); Command resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand); PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content, WarningType.FAILURE.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage())); logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
@ -100,7 +101,7 @@ public class AlertClientServiceTest {
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults); alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque()); resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand); PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content, WarningType.FAILURE.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage())); logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
@ -113,7 +114,7 @@ public class AlertClientServiceTest {
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults); alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque()); resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand); PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content, WarningType.FAILURE.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage())); logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
@ -125,7 +126,7 @@ public class AlertClientServiceTest {
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults); alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque()); resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand); PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content, WarningType.FAILURE.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage())); logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
@ -138,7 +139,7 @@ public class AlertClientServiceTest {
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults); alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque()); resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand); PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content); alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content, WarningType.FAILURE.getCode());
Assert.assertTrue(alertSendResponseCommand.getResStatus()); Assert.assertTrue(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result -> alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage())); logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));

11
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
@ -189,12 +190,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
// task handle // task handle
this.task.handle(); this.task.handle();
responseCommand.setStatus(this.task.getExitStatus().getCode());
// task result process // task result process
if (this.task.getNeedAlert()) { if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo()); sendAlert(this.task.getTaskAlertInfo(), responseCommand.getStatus());
} }
responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId()); responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds()); responseCommand.setAppIds(this.task.getAppIds());
@ -215,8 +217,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
} }
} }
private void sendAlert(TaskAlertInfo taskAlertInfo) { private void sendAlert(TaskAlertInfo taskAlertInfo, int status) {
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent()); int strategy = status == ExecutionStatus.SUCCESS.getCode() ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
} }
/** /**

Loading…
Cancel
Save