diff --git a/dolphinscheduler-alert/pom.xml b/dolphinscheduler-alert/pom.xml
index e79c9a2b20..ebea0e8716 100644
--- a/dolphinscheduler-alert/pom.xml
+++ b/dolphinscheduler-alert/pom.xml
@@ -35,6 +35,14 @@
org.apache.dolphinscheduler
dolphinscheduler-spi
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-remote
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-common
+
junit
junit
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index ccd359da18..54afc93442 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -17,9 +17,12 @@
package org.apache.dolphinscheduler.alert;
+import static org.apache.dolphinscheduler.common.Constants.ALERT_RPC_PORT;
+
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
+import org.apache.dolphinscheduler.alert.processor.AlertRequestProcessor;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.PropertyUtils;
@@ -28,6 +31,9 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.List;
@@ -63,6 +69,11 @@ public class AlertServer {
public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
+ /**
+ * netty server
+ */
+ private NettyRemotingServer server;
+
private static class AlertServerHolder {
private static final AlertServer INSTANCE = new AlertServer();
}
@@ -96,11 +107,21 @@ public class AlertServer {
}
}
- public void start() {
-
- initPlugin();
+ /**
+ * init netty remoting server
+ */
+ private void initRemoteServer() {
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(ALERT_RPC_PORT);
+ this.server = new NettyRemotingServer(serverConfig);
+ this.server.registerProcessor(CommandType.ALERT_SEND_REQUEST, new AlertRequestProcessor(alertDao, alertPluginManager, pluginDao));
+ this.server.start();
+ }
- logger.info("alert server ready start ");
+ /**
+ * Cyclic alert info sending alert
+ */
+ private void runSender() {
while (Stopper.isRunning()) {
try {
Thread.sleep(Constants.ALERT_SCAN_INTERVAL);
@@ -118,10 +139,33 @@ public class AlertServer {
}
}
+ /**
+ * start
+ */
+ public void start() {
+ initPlugin();
+ initRemoteServer();
+ logger.info("alert server ready start ");
+ runSender();
+ }
+
+ /**
+ * stop
+ */
+ public void stop() {
+ this.server.close();
+ logger.info("alert server shut down");
+ }
+
public static void main(String[] args) {
- System.out.println(System.getProperty("user.dir"));
AlertServer alertServer = AlertServer.getInstance();
alertServer.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ alertServer.stop();
+ }
+ });
}
}
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
new file mode 100644
index 0000000000..5e8a8f89d6
--- /dev/null
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.alert.processor;
+
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.runner.AlertSender;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+/**
+ * alert request processor
+ */
+public class AlertRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
+ private AlertDao alertDao;
+ private PluginDao pluginDao;
+ private AlertPluginManager alertPluginManager;
+
+ public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
+ this.alertDao = alertDao;
+ this.pluginDao = pluginDao;
+ this.alertPluginManager = alertPluginManager;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.ALERT_SEND_REQUEST == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ AlertSendRequestCommand alertSendRequestCommand = JsonSerializer.deserialize(
+ command.getBody(), AlertSendRequestCommand.class);
+ logger.info("received command : {}", alertSendRequestCommand);
+
+ AlertSender alertSender = new AlertSender(alertDao, alertPluginManager, pluginDao);
+ AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getTitle(), alertSendRequestCommand.getContent());
+ channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque()));
+
+ }
+}
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
index 4714a76ff1..2b8fec2f11 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
@@ -19,15 +19,19 @@ package org.apache.dolphinscheduler.alert.runner;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
+import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
@@ -49,6 +53,13 @@ public class AlertSender {
this.alertPluginManager = alertPluginManager;
}
+ public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
+ super();
+ this.alertDao = alertDao;
+ this.pluginDao = pluginDao;
+ this.alertPluginManager = alertPluginManager;
+ }
+
public AlertSender(List alertList, AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
super();
this.alertList = alertList;
@@ -71,33 +82,94 @@ public class AlertSender {
for (AlertPluginInstance instance : alertInstanceList) {
- String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName();
- String pluginInstanceName = instance.getInstanceName();
- AlertInfo alertInfo = new AlertInfo();
- alertInfo.setAlertData(alertData);
- alertInfo.setAlertParams(instance.getPluginInstanceParams());
- AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName);
- if (alertChannel == null) {
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "Alert send error, not found plugin " + pluginName, alert.getId());
- logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName);
- continue;
- }
-
- AlertResult alertResult = alertChannel.process(alertInfo);
-
- if (alertResult == null) {
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId());
- logger.info("Alert Plugin {} send error : return value is null", pluginInstanceName);
- } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(alertResult.getMessage()), alert.getId());
- logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
- } else {
- alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, alertResult.getMessage(), alert.getId());
- logger.info("Alert Plugin {} send success", pluginInstanceName);
- }
+ AlertResult alertResult = this.alertResultHandler(instance, alertData);
+ AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
+ alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId());
+
}
}
}
+ /**
+ * sync send alert handler
+ * @param alertGroupId alertGroupId
+ * @param title title
+ * @param content content
+ * @return AlertSendResponseCommand
+ */
+ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content) {
+
+ List alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
+ AlertData alertData = new AlertData();
+ alertData.setContent(title)
+ .setTitle(content);
+
+ boolean sendResponseStatus = true;
+ List sendResponseResults = new ArrayList<>();
+
+ if (CollectionUtils.isEmpty(alertInstanceList)) {
+ sendResponseStatus = false;
+ AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult();
+ String message = String.format("Alert GroupId %s send error : not found alert instance",alertGroupId);
+ alertSendResponseResult.setStatus(sendResponseStatus);
+ alertSendResponseResult.setMessage(message);
+ sendResponseResults.add(alertSendResponseResult);
+ logger.error("Alert GroupId {} send error : not found alert instance", alertGroupId);
+ return new AlertSendResponseCommand(sendResponseStatus,sendResponseResults);
+ }
+
+ for (AlertPluginInstance instance : alertInstanceList) {
+ AlertResult alertResult = this.alertResultHandler(instance, alertData);
+ AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
+ Boolean.parseBoolean(String.valueOf(alertResult.getStatus())),alertResult.getMessage());
+ sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
+ sendResponseResults.add(alertSendResponseResult);
+ }
+
+ return new AlertSendResponseCommand(sendResponseStatus,sendResponseResults);
+ }
+
+ /**
+ * alert result handler
+ * @param instance instance
+ * @param alertData alertData
+ * @return AlertResult
+ */
+ private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
+ String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName();
+ AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName);
+ AlertResult alertResultExtend = new AlertResult();
+ String pluginInstanceName = instance.getInstanceName();
+ if (alertChannel == null) {
+ String message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName);
+ alertResultExtend.setStatus(String.valueOf(false));
+ alertResultExtend.setMessage(message);
+ logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName);
+ return alertResultExtend;
+ }
+
+ AlertInfo alertInfo = new AlertInfo();
+ alertInfo.setAlertData(alertData);
+ alertInfo.setAlertParams(instance.getPluginInstanceParams());
+ AlertResult alertResult = alertChannel.process(alertInfo);
+
+ if (alertResult == null) {
+ String message = String.format("Alert Plugin %s send error : return alertResult value is null",pluginInstanceName);
+ alertResultExtend.setStatus(String.valueOf(false));
+ alertResultExtend.setMessage(message);
+ logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName);
+ } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
+ alertResultExtend.setStatus(String.valueOf(false));
+ alertResultExtend.setMessage(alertResult.getMessage());
+ logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
+ } else {
+ String message = String.format("Alert Plugin %s send success",pluginInstanceName);
+ alertResultExtend.setStatus(String.valueOf(true));
+ alertResultExtend.setMessage(message);
+ logger.info("Alert Plugin {} send success", pluginInstanceName);
+ }
+ return alertResultExtend;
+ }
+
}
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
new file mode 100644
index 0000000000..37d54c8ef9
--- /dev/null
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.alert;
+
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
+import org.apache.dolphinscheduler.alert.runner.AlertSender;
+import org.apache.dolphinscheduler.alert.utils.Constants;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.DaoFactory;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.spi.alert.AlertChannel;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AlertServer.class,DaoFactory.class})
+public class AlertServerTest {
+
+ @Before
+ public void before() {
+
+ }
+
+ @Test
+ public void testMain() throws Exception {
+ AlertDao alertDao = PowerMockito.mock(AlertDao.class);
+ PowerMockito.mockStatic(DaoFactory.class);
+ PowerMockito.when(DaoFactory.getDaoInstance(AlertDao.class)).thenReturn(alertDao);
+
+ PluginDao pluginDao = PowerMockito.mock(PluginDao.class);
+ PowerMockito.when(DaoFactory.getDaoInstance(PluginDao.class)).thenReturn(pluginDao);
+
+ AlertChannel alertChannelMock = PowerMockito.mock(AlertChannel.class);
+
+ AlertPluginManager alertPluginManager = PowerMockito.mock(AlertPluginManager.class);
+ PowerMockito.whenNew(AlertPluginManager.class).withNoArguments().thenReturn(alertPluginManager);
+ ConcurrentHashMap alertChannelMap = new ConcurrentHashMap<>();
+ alertChannelMap.put("pluginName",alertChannelMock);
+ PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
+
+ DolphinPluginManagerConfig alertPluginManagerConfig = PowerMockito.mock(DolphinPluginManagerConfig.class);
+ PowerMockito.whenNew(DolphinPluginManagerConfig.class).withNoArguments().thenReturn(alertPluginManagerConfig);
+
+ NettyRemotingServer nettyRemotingServer = PowerMockito.mock(NettyRemotingServer.class);
+ PowerMockito.whenNew(NettyRemotingServer.class).withAnyArguments().thenReturn(nettyRemotingServer);
+ AlertSender alertSender = PowerMockito.mock(AlertSender.class);
+ PowerMockito.whenNew(AlertSender.class).withAnyArguments().thenReturn(alertSender);
+
+ AlertServer alertServer = AlertServer.getInstance();
+ Assert.assertNotNull(alertServer);
+
+ new Thread(() -> {
+ alertServer.start(); })
+ .start();
+
+ Thread.sleep(5 * Constants.ALERT_SCAN_INTERVAL);
+
+ alertServer.stop();
+
+ }
+
+}
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
new file mode 100644
index 0000000000..0126eb3dae
--- /dev/null
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.alert.processor;
+
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import io.netty.channel.Channel;
+
+/**
+ * alert request processor test
+ */
+public class AlertRequestProcessorTest {
+
+ private AlertDao alertDao;
+ private PluginDao pluginDao;
+ private AlertPluginManager alertPluginManager;
+
+ private AlertRequestProcessor alertRequestProcessor;
+
+ @Before
+ public void before() {
+ alertDao = PowerMockito.mock(AlertDao.class);
+ pluginDao = PowerMockito.mock(PluginDao.class);
+ alertPluginManager = PowerMockito.mock(AlertPluginManager.class);
+ alertRequestProcessor = new AlertRequestProcessor(alertDao,alertPluginManager,pluginDao);
+ }
+
+ @Test
+ public void testProcess() {
+ Channel channel = PowerMockito.mock(Channel.class);
+ AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1,"title","content");
+ Command reqCommand = alertSendRequestCommand.convert2Command();
+ Assert.assertEquals(CommandType.ALERT_SEND_REQUEST,reqCommand.getType());
+ alertRequestProcessor.process(channel,reqCommand);
+ }
+}
diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
new file mode 100644
index 0000000000..aa358f6380
--- /dev/null
+++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.alert.runner;
+
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.dao.entity.Alert;
+import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
+import org.apache.dolphinscheduler.dao.entity.PluginDefine;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.spi.alert.AlertChannel;
+import org.apache.dolphinscheduler.spi.alert.AlertResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * alert sender test
+ */
+public class AlertSenderTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class);
+
+ private AlertDao alertDao;
+ private PluginDao pluginDao;
+ private AlertPluginManager alertPluginManager;
+
+ private AlertSender alertSender;
+
+ @Before
+ public void before() {
+ alertDao = PowerMockito.mock(AlertDao.class);
+ pluginDao = PowerMockito.mock(PluginDao.class);
+ alertPluginManager = PowerMockito.mock(AlertPluginManager.class);
+
+ }
+
+ @Test
+ public void testSyncHandler() {
+
+ int alertGroupId = 1;
+ String title = "alert mail test title";
+ String content = "alert mail test content";
+ alertSender = new AlertSender(alertDao,alertPluginManager,pluginDao);
+
+ //1.alert instance does not exist
+ PowerMockito.when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
+
+ AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //2.alert plugin does not exist
+ int pluginDefineId = 1;
+ String pluginInstanceParams = "alert-instance-mail-params";
+ String pluginInstanceName = "alert-instance-mail";
+ List alertInstanceList = new ArrayList<>();
+ AlertPluginInstance alertPluginInstance = new AlertPluginInstance(
+ pluginDefineId,pluginInstanceParams,alertGroupId,pluginInstanceName);
+ alertInstanceList.add(alertPluginInstance);
+ PowerMockito.when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(alertInstanceList);
+
+ String pluginName = "alert-plugin-mail";
+ PluginDefine pluginDefine = new PluginDefine(pluginName,"1",null);
+ PowerMockito.when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
+
+ alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //3.alert result value is null
+ AlertChannel alertChannelMock = PowerMockito.mock(AlertChannel.class);
+ PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(null);
+ Map alertChannelMap = new ConcurrentHashMap<>();
+ alertChannelMap.put(pluginName,alertChannelMock);
+ PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
+
+ alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //4.abnormal information inside the alert plug-in code
+ AlertResult alertResult = new AlertResult();
+ alertResult.setStatus(String.valueOf(false));
+ alertResult.setMessage("Abnormal information inside the alert plug-in code");
+ PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
+ alertChannelMap = new ConcurrentHashMap<>();
+ alertChannelMap.put(pluginName,alertChannelMock);
+ PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
+
+ alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //5.alert plugin send success
+ alertResult = new AlertResult();
+ alertResult.setStatus(String.valueOf(true));
+ alertResult.setMessage(String.format("Alert Plugin %s send success",pluginInstanceName));
+ PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
+ alertChannelMap = new ConcurrentHashMap<>();
+ alertChannelMap.put(pluginName,alertChannelMock);
+ PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
+
+ alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
+ Assert.assertTrue(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ }
+
+ @Test
+ public void testRun() {
+ int alertGroupId = 1;
+ String title = "alert mail test title";
+ String content = "alert mail test content";
+ List alertList = new ArrayList<>();
+ Alert alert = new Alert();
+ alert.setAlertGroupId(alertGroupId);
+ alert.setTitle(title);
+ alert.setContent(content);
+ alertList.add(alert);
+
+ alertSender = new AlertSender(alertList,alertDao,alertPluginManager,pluginDao);
+
+ int pluginDefineId = 1;
+ String pluginInstanceParams = "alert-instance-mail-params";
+ String pluginInstanceName = "alert-instance-mail";
+ List alertInstanceList = new ArrayList<>();
+ AlertPluginInstance alertPluginInstance = new AlertPluginInstance(
+ pluginDefineId,pluginInstanceParams,alertGroupId,pluginInstanceName);
+ alertInstanceList.add(alertPluginInstance);
+ PowerMockito.when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(alertInstanceList);
+
+ String pluginName = "alert-plugin-mail";
+ PluginDefine pluginDefine = new PluginDefine(pluginName,"1",null);
+ PowerMockito.when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
+
+ AlertResult alertResult = new AlertResult();
+ alertResult.setStatus(String.valueOf(true));
+ alertResult.setMessage(String.format("Alert Plugin %s send success",pluginInstanceName));
+ AlertChannel alertChannelMock = PowerMockito.mock(AlertChannel.class);
+ PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
+ ConcurrentHashMap alertChannelMap = new ConcurrentHashMap<>();
+ alertChannelMap.put(pluginName,alertChannelMock);
+ PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
+ Assert.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
+ alertSender.run();
+
+ }
+
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index a8d04ced46..5c0ae1d638 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -387,13 +387,17 @@ public final class Constants {
*/
public static final int SEC_2_MINUTES_TIME_UNIT = 60;
-
/***
*
* rpc port
*/
public static final int RPC_PORT = 50051;
+ /***
+ * alert rpc port
+ */
+ public static final int ALERT_RPC_PORT = 50052;
+
/**
* forbid running task
*/
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index d1ffc65f57..753216995e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG,
/**
* alert send request
*/
ALERT_SEND_REQUEST,
/**
* alert send response
*/
ALERT_SEND_RESPONSE;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
new file mode 100644
index 0000000000..da56b0dc6b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import java.io.Serializable;
+
+public class AlertSendRequestCommand implements Serializable {
+
+ private int groupId;
+
+ private String title;
+
+ private String content;
+
+ public int getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(int groupId) {
+ this.groupId = groupId;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ }
+
+ public AlertSendRequestCommand(){
+
+ }
+
+ public AlertSendRequestCommand(int groupId, String title, String content) {
+ this.groupId = groupId;
+ this.title = title;
+ this.content = content;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.ALERT_SEND_REQUEST);
+ byte[] body = JsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java
new file mode 100644
index 0000000000..984cc43c94
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class AlertSendResponseCommand implements Serializable {
+
+ /**
+ * true:All alert are successful,
+ * false:As long as one alert fails
+ */
+ private boolean resStatus;
+
+ private List resResults;
+
+ public boolean getResStatus() {
+ return resStatus;
+ }
+
+ public void setResStatus(boolean resStatus) {
+ this.resStatus = resStatus;
+ }
+
+ public List getResResults() {
+ return resResults;
+ }
+
+ public void setResResults(List resResults) {
+ this.resResults = resResults;
+ }
+
+ public AlertSendResponseCommand() {
+
+ }
+
+ public AlertSendResponseCommand(boolean resStatus, List resResults) {
+ this.resStatus = resStatus;
+ this.resResults = resResults;
+ }
+
+ /**
+ * package response command
+ *
+ * @param opaque request unique identification
+ * @return command
+ */
+ public Command convert2Command(long opaque) {
+ Command command = new Command(opaque);
+ command.setType(CommandType.ALERT_SEND_RESPONSE);
+ byte[] body = JsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseResult.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseResult.java
new file mode 100644
index 0000000000..1263b83a73
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseResult.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import java.io.Serializable;
+
+public class AlertSendResponseResult implements Serializable {
+
+ private boolean status;
+
+ private String message;
+
+ public boolean getStatus() {
+ return status;
+ }
+
+ public void setStatus(boolean status) {
+ this.status = status;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public AlertSendResponseResult() {
+
+ }
+
+ public AlertSendResponseResult(boolean status, String message) {
+ this.status = status;
+ this.message = message;
+ }
+}
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java
new file mode 100644
index 0000000000..79d21316f8
--- /dev/null
+++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AlertSendRequestCommandTest {
+
+ @Test
+ public void testConvert2Command() {
+ int groupId = 1;
+ String title = "test-title";
+ String content = "test-content";
+ AlertSendRequestCommand requestCommand = new AlertSendRequestCommand(groupId,title,content);
+ Command command = requestCommand.convert2Command();
+ Assert.assertEquals(CommandType.ALERT_SEND_REQUEST,command.getType());
+ AlertSendRequestCommand verifyCommand = new AlertSendRequestCommand();
+ verifyCommand.setGroupId(groupId);
+ verifyCommand.setContent(content);
+ verifyCommand.setTitle(title);
+
+ }
+}
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommandTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommandTest.java
new file mode 100644
index 0000000000..41265a5339
--- /dev/null
+++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommandTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AlertSendResponseCommandTest {
+
+ @Test
+ public void testConvert2Command() {
+ AlertSendResponseCommand alertSendResponseCommand = new AlertSendResponseCommand();
+ alertSendResponseCommand.setResStatus(false);
+ List responseResults = new ArrayList<>();
+ AlertSendResponseResult responseResult1 = new AlertSendResponseResult();
+ responseResult1.setStatus(false);
+ responseResult1.setMessage("fail");
+ responseResults.add(responseResult1);
+
+ AlertSendResponseResult responseResult2 = new AlertSendResponseResult(true,"success");
+ responseResults.add(responseResult2);
+ alertSendResponseCommand.setResResults(responseResults);
+
+ Command command = alertSendResponseCommand.convert2Command(1);
+ Assert.assertEquals(CommandType.ALERT_SEND_RESPONSE,command.getType());
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
new file mode 100644
index 0000000000..7839b4a460
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.alert;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertClientService {
+
+ private static final Logger logger = LoggerFactory.getLogger(AlertClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private volatile boolean isRunning;
+
+ /**
+ * request time out
+ */
+ private static final long ALERT_REQUEST_TIMEOUT = 10 * 1000L;
+
+ /**
+ * alert client
+ */
+ public AlertClientService() {
+ this.clientConfig = new NettyClientConfig();
+ this.client = new NettyRemotingClient(clientConfig);
+ this.isRunning = true;
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ this.client.close();
+ this.isRunning = false;
+ logger.info("alter client closed");
+ }
+
+ /**
+ * alert sync send data
+ * @param host host
+ * @param port port
+ * @param groupId groupId
+ * @param title title
+ * @param content content
+ * @return AlertSendResponseCommand
+ */
+ public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content) {
+ logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} ", host, port, groupId, title);
+ AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content);
+ final Host address = new Host(host, port);
+ try {
+ Command command = request.convert2Command();
+ Command response = this.client.sendSync(address, command, ALERT_REQUEST_TIMEOUT);
+ if (response != null) {
+ return JsonSerializer.deserialize(response.getBody(), AlertSendResponseCommand.class);
+ }
+ } catch (Exception e) {
+ logger.error("sync alert send error", e);
+ } finally {
+ this.client.closeChannel(address);
+ }
+ return null;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java
new file mode 100644
index 0000000000..ba44191b42
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.alert;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * alert client service test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AlertClientService.class})
+public class AlertClientServiceTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(AlertClientServiceTest.class);
+
+ private NettyRemotingClient client;
+
+ private AlertClientService alertClient;
+
+ @Before
+ public void before() throws Exception {
+ client = PowerMockito.mock(NettyRemotingClient.class);
+ PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(client);
+ alertClient = new AlertClientService();
+ }
+
+ @Test
+ public void testSendAlert() throws Exception {
+ String host = "127.0.0.1";
+ int port = 50501;
+ int groupId = 1;
+ String title = "test-title";
+ String content = "test-content";
+
+ //1.alter server does not exist
+ AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertNull(alertSendResponseCommand);
+
+ AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(groupId,title,content);
+ Command reqCommand = alertSendRequestCommand.convert2Command();
+ boolean sendResponseStatus;
+ List sendResponseResults = new ArrayList<>();
+
+ //2.alter instance does not exist
+ sendResponseStatus = false;
+ AlertSendResponseResult alertResult = new AlertSendResponseResult();
+ String message = String.format("Alert GroupId %s send error : not found alert instance",groupId);
+ alertResult.setStatus(false);
+ alertResult.setMessage(message);
+ sendResponseResults.add(alertResult);
+ AlertSendResponseCommand alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
+ Command resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
+
+ PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
+ alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //3.alter plugin does not exist
+ sendResponseStatus = false;
+ String pluginInstanceName = "alert-mail";
+ message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName);
+ alertResult.setStatus(false);
+ alertResult.setMessage(message);
+ alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
+ resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
+ PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
+ alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //4.alter result is null
+ sendResponseStatus = false;
+ message = String.format("Alert Plugin %s send error : return result value is null",pluginInstanceName);
+ alertResult.setStatus(false);
+ alertResult.setMessage(message);
+ alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
+ resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
+ PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
+ alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //5.abnormal information inside the alert plug-in code
+ sendResponseStatus = false;
+ alertResult.setStatus(false);
+ alertResult.setMessage("Abnormal information inside the alert plug-in code");
+ alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
+ resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
+ PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
+ alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertFalse(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ //6.alert plugin send success
+ sendResponseStatus = true;
+ message = String.format("Alert Plugin %s send success",pluginInstanceName);
+ alertResult.setStatus(true);
+ alertResult.setMessage(message);
+ alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
+ resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
+ PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
+ alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertTrue(alertSendResponseCommand.getResStatus());
+ alertSendResponseCommand.getResResults().forEach(result ->
+ logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
+
+ if (Objects.nonNull(alertClient) && alertClient.isRunning()) {
+ alertClient.close();
+ }
+
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 80d3be5b96..c0c920d069 100644
--- a/pom.xml
+++ b/pom.xml
@@ -867,6 +867,8 @@
**/remote/NettyRemotingClientTest.java
**/remote/NettyUtilTest.java
**/remote/ResponseFutureTest.java
+ **/remote/command/alert/AlertSendRequestCommandTest.java
+ **/remote/command/alert/AlertSendResponseCommandTest.java
**/server/log/LoggerServerTest.java
**/server/entity/SQLTaskExecutionContextTest.java
**/server/log/MasterLogFilterTest.java
@@ -917,6 +919,7 @@
**/service/zk/ZKServerTest.java
**/service/zk/CuratorZookeeperClientTest.java
**/service/queue/TaskUpdateQueueTest.java
+ **/service/alert/AlertClientServiceTest.java
**/dao/mapper/DataSourceUserMapperTest.java
@@ -961,6 +964,9 @@
**/alert/utils/DingTalkUtilsTest.java
**/alert/utils/EnterpriseWeChatUtilsTest.java
**/alert/utils/FuncUtilsTest.java
+ **/alert/processor/AlertRequestProcessorTest.java
+ **/alert/runner/AlertSenderTest.java
+ **/alert/AlertServerTest.java