diff --git a/dolphinscheduler-alert/pom.xml b/dolphinscheduler-alert/pom.xml
index e79c9a2b20..abae84515a 100644
--- a/dolphinscheduler-alert/pom.xml
+++ b/dolphinscheduler-alert/pom.xml
@@ -35,6 +35,10 @@
org.apache.dolphinscheduler
dolphinscheduler-spi
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-remote
+
junit
junit
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index ccd359da18..45184f03a4 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
+import org.apache.dolphinscheduler.alert.processor.AlertRequestProcessor;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.PropertyUtils;
@@ -28,6 +29,9 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.List;
@@ -63,6 +67,11 @@ public class AlertServer {
public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
+ /**
+ * netty server
+ */
+ private NettyRemotingServer server;
+
private static class AlertServerHolder {
private static final AlertServer INSTANCE = new AlertServer();
}
@@ -96,11 +105,21 @@ public class AlertServer {
}
}
- public void start() {
-
- initPlugin();
+ /**
+ * init netty remoting server
+ */
+ private void initRemoteServer() {
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(50501);
+ this.server = new NettyRemotingServer(serverConfig);
+ this.server.registerProcessor(CommandType.ALERT_SEND_REQUEST, new AlertRequestProcessor(alertDao, alertPluginManager, pluginDao));
+ this.server.start();
+ }
- logger.info("alert server ready start ");
+ /**
+ * Cyclic alert info sending alert
+ */
+ private void runSender() {
while (Stopper.isRunning()) {
try {
Thread.sleep(Constants.ALERT_SCAN_INTERVAL);
@@ -118,10 +137,35 @@ public class AlertServer {
}
}
+ /**
+ * start
+ */
+ public void start() {
+ initPlugin();
+ initRemoteServer();
+ logger.info("alert server ready start ");
+ runSender();
+ }
+
+ /**
+ * stop
+ */
+ public void stop() {
+ this.server.close();
+ logger.info("alert server shut down");
+ }
+
public static void main(String[] args) {
System.out.println(System.getProperty("user.dir"));
AlertServer alertServer = AlertServer.getInstance();
alertServer.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ alertServer.stop();
+ }
+ });
+
}
}
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
new file mode 100644
index 0000000000..5e8a8f89d6
--- /dev/null
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.alert.processor;
+
+import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
+import org.apache.dolphinscheduler.alert.runner.AlertSender;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
+import org.apache.dolphinscheduler.dao.AlertDao;
+import org.apache.dolphinscheduler.dao.PluginDao;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.channel.Channel;
+
+/**
+ * alert request processor
+ */
+public class AlertRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
+ private AlertDao alertDao;
+ private PluginDao pluginDao;
+ private AlertPluginManager alertPluginManager;
+
+ public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
+ this.alertDao = alertDao;
+ this.pluginDao = pluginDao;
+ this.alertPluginManager = alertPluginManager;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ Preconditions.checkArgument(CommandType.ALERT_SEND_REQUEST == command.getType(),
+ String.format("invalid command type : %s", command.getType()));
+
+ AlertSendRequestCommand alertSendRequestCommand = JsonSerializer.deserialize(
+ command.getBody(), AlertSendRequestCommand.class);
+ logger.info("received command : {}", alertSendRequestCommand);
+
+ AlertSender alertSender = new AlertSender(alertDao, alertPluginManager, pluginDao);
+ AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getTitle(), alertSendRequestCommand.getContent());
+ channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque()));
+
+ }
+}
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
index 4714a76ff1..341cd5f88a 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
@@ -23,11 +23,13 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
+import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
@@ -49,6 +51,13 @@ public class AlertSender {
this.alertPluginManager = alertPluginManager;
}
+ public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
+ super();
+ this.alertDao = alertDao;
+ this.pluginDao = pluginDao;
+ this.alertPluginManager = alertPluginManager;
+ }
+
public AlertSender(List alertList, AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
super();
this.alertList = alertList;
@@ -71,33 +80,91 @@ public class AlertSender {
for (AlertPluginInstance instance : alertInstanceList) {
- String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName();
- String pluginInstanceName = instance.getInstanceName();
- AlertInfo alertInfo = new AlertInfo();
- alertInfo.setAlertData(alertData);
- alertInfo.setAlertParams(instance.getPluginInstanceParams());
- AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName);
- if (alertChannel == null) {
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "Alert send error, not found plugin " + pluginName, alert.getId());
- logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName);
- continue;
- }
-
- AlertResult alertResult = alertChannel.process(alertInfo);
-
- if (alertResult == null) {
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId());
- logger.info("Alert Plugin {} send error : return value is null", pluginInstanceName);
- } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
- alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(alertResult.getMessage()), alert.getId());
- logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
- } else {
- alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, alertResult.getMessage(), alert.getId());
- logger.info("Alert Plugin {} send success", pluginInstanceName);
- }
+ AlertResult alertResult = getAlertResult(instance, alertData);
+ AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
+ alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId());
+
}
}
}
+ /**
+ * sync send alert handler
+ * @param alertGroupId alertGroupId
+ * @param title title
+ * @param content content
+ * @return AlertSendResponseCommand
+ */
+ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content) {
+
+ List alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
+ AlertData alertData = new AlertData();
+ alertData.setContent(title)
+ .setTitle(content);
+
+ boolean sendResponseStatus = true;
+ List sendResponseResults = new ArrayList<>();
+
+ if (alertInstanceList == null || alertInstanceList.size() == 0) {
+ sendResponseStatus = false;
+ AlertResult alertResult = new AlertResult();
+ String message = String.format("Alert GroupId %s send error : not found alert instance",alertGroupId);
+ alertResult.setStatus("false");
+ alertResult.setMessage(message);
+ sendResponseResults.add(alertResult);
+ logger.error("Alert GroupId {} send error : not found alert instance", alertGroupId);
+ }
+
+ for (AlertPluginInstance instance : alertInstanceList) {
+
+ AlertResult alertResult = getAlertResult(instance, alertData);
+ sendResponseStatus = sendResponseStatus && Boolean.parseBoolean(String.valueOf(alertResult.getStatus()));
+ sendResponseResults.add(alertResult);
+ }
+
+ return new AlertSendResponseCommand(sendResponseStatus,sendResponseResults);
+ }
+
+ /**
+ * alert result expansion
+ * @param instance instance
+ * @param alertData alertData
+ * @return AlertResult
+ */
+ private AlertResult getAlertResult(AlertPluginInstance instance, AlertData alertData) {
+ String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName();
+ AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName);
+ AlertResult alertResultExtend = new AlertResult();
+ String pluginInstanceName = instance.getInstanceName();
+ if (alertChannel == null) {
+ String message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName);
+ alertResultExtend.setStatus("false");
+ alertResultExtend.setMessage(message);
+ logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName);
+ }
+
+ AlertInfo alertInfo = new AlertInfo();
+ alertInfo.setAlertData(alertData);
+ alertInfo.setAlertParams(instance.getPluginInstanceParams());
+ AlertResult alertResult = alertChannel.process(alertInfo);
+
+ if (alertResult == null) {
+ String message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName);
+ alertResultExtend.setStatus("false");
+ alertResultExtend.setMessage(message);
+ logger.info("Alert Plugin {} send error : return value is null", pluginInstanceName);
+ } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
+ alertResultExtend.setStatus("false");
+ alertResultExtend.setMessage(alertResult.getMessage());
+ logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
+ } else {
+ String message = String.format("Alert Plugin %s send success",pluginInstanceName);
+ alertResultExtend.setStatus("true");
+ alertResultExtend.setMessage(message);
+ logger.info("Alert Plugin {} send success", pluginInstanceName);
+ }
+ return alertResultExtend;
+ }
+
}
diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml
index 4d398f3069..bc71241076 100644
--- a/dolphinscheduler-remote/pom.xml
+++ b/dolphinscheduler-remote/pom.xml
@@ -35,6 +35,10 @@
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-spi
+
io.netty
netty-all
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
index d1ffc65f57..753216995e 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
@@ -1 +1 @@
-/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG;
}
\ No newline at end of file
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* remove task log request,
*/
REMOVE_TAK_LOG_REQUEST,
/**
* remove task log response
*/
REMOVE_TAK_LOG_RESPONSE,
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
TASK_EXECUTE_REQUEST,
/**
* execute task ack
*/
TASK_EXECUTE_ACK,
/**
* execute task response
*/
TASK_EXECUTE_RESPONSE,
/**
* kill task
*/
TASK_KILL_REQUEST,
/**
* kill task response
*/
TASK_KILL_RESPONSE,
/**
* ping
*/
PING,
/**
* pong
*/
PONG,
/**
* alert send request
*/
ALERT_SEND_REQUEST,
/**
* alert send response
*/
ALERT_SEND_RESPONSE;
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
new file mode 100644
index 0000000000..da56b0dc6b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import java.io.Serializable;
+
+public class AlertSendRequestCommand implements Serializable {
+
+ private int groupId;
+
+ private String title;
+
+ private String content;
+
+ public int getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(int groupId) {
+ this.groupId = groupId;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public void setContent(String content) {
+ this.content = content;
+ }
+
+ public AlertSendRequestCommand(){
+
+ }
+
+ public AlertSendRequestCommand(int groupId, String title, String content) {
+ this.groupId = groupId;
+ this.title = title;
+ this.content = content;
+ }
+
+ /**
+ * package request command
+ *
+ * @return command
+ */
+ public Command convert2Command() {
+ Command command = new Command();
+ command.setType(CommandType.ALERT_SEND_REQUEST);
+ byte[] body = JsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java
new file mode 100644
index 0000000000..fbde4cfe51
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.alert;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+import org.apache.dolphinscheduler.spi.alert.AlertResult;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class AlertSendResponseCommand implements Serializable {
+
+ /**
+ * true:All alert are successful,
+ * false:As long as one alert fails
+ */
+ private boolean alertStatus;
+
+ private List alertResults;
+
+ public boolean getAlertStatus() {
+ return alertStatus;
+ }
+
+ public void setAlertStatus(boolean alertStatus) {
+ this.alertStatus = alertStatus;
+ }
+
+ public List getAlertResults() {
+ return alertResults;
+ }
+
+ public void setAlertResults(List alertResults) {
+ this.alertResults = alertResults;
+ }
+
+ public AlertSendResponseCommand() {
+
+ }
+
+ public AlertSendResponseCommand(boolean alertStatus, List alertResults) {
+ this.alertStatus = alertStatus;
+ this.alertResults = alertResults;
+ }
+
+ /**
+ * package response command
+ *
+ * @param opaque request unique identification
+ * @return command
+ */
+ public Command convert2Command(long opaque) {
+ Command command = new Command(opaque);
+ command.setType(CommandType.ALERT_SEND_RESPONSE);
+ byte[] body = JsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
new file mode 100644
index 0000000000..9581413c39
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.alert;
+
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertClientService {
+
+ private static final Logger logger = LoggerFactory.getLogger(AlertClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private volatile boolean isRunning;
+
+ /**
+ * request time out
+ */
+ private static final long ALERT_REQUEST_TIMEOUT = 10 * 1000L;
+
+ /**
+ * alert client
+ */
+ public AlertClientService() {
+ this.clientConfig = new NettyClientConfig();
+ this.client = new NettyRemotingClient(clientConfig);
+ this.isRunning = true;
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ this.client.close();
+ this.isRunning = false;
+ logger.info("alter client closed");
+ }
+
+ /**
+ * alert sync send data
+ * @param host host
+ * @param port port
+ * @param groupId groupId
+ * @param title title
+ * @param content content
+ * @return AlertSendResponseCommand
+ */
+ public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content) {
+ logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} ", host, port, groupId, title);
+ AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content);
+ final Host address = new Host(host, port);
+ try {
+ Command command = request.convert2Command();
+ Command response = this.client.sendSync(address, command, ALERT_REQUEST_TIMEOUT);
+ if (response != null) {
+ AlertSendResponseCommand sendResponse = JsonSerializer.deserialize(
+ response.getBody(), AlertSendResponseCommand.class);
+ return sendResponse;
+ }
+ } catch (Exception e) {
+ logger.error("sync alert send error", e);
+ } finally {
+ this.client.closeChannel(address);
+ }
+ return null;
+ }
+
+ public boolean isRunning() {
+ return isRunning;
+ }
+}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java
new file mode 100644
index 0000000000..98f2b14d54
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.alert;
+
+import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+/**
+ * alert client service test
+ */
+public class AlertClientServiceTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(AlertClientServiceTest.class);
+
+
+ @Test
+ public void testSendAlert(){
+ String host;
+ int port = 50501;
+ int groupId = 1;
+ String title = "test-title";
+ String content = "test-content";
+ AlertClientService alertClient = new AlertClientService();
+
+ // alter server does not exist
+ host = "128.0.10.1";
+ AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
+ Assert.assertNull(alertSendResponseCommand);
+
+ host = "127.0.0.1";
+ AlertSendResponseCommand alertSendResponseCommand_1 = alertClient.sendAlert(host, port, groupId, title, content);
+
+ if (Objects.nonNull(alertClient) && alertClient.isRunning()) {
+ alertClient.close();
+ }
+
+ }
+}
diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java
index a327d09403..f91db97651 100644
--- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java
+++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/alert/AlertResult.java
@@ -38,4 +38,6 @@ public class AlertResult {
public void setMessage(String message) {
this.message = message;
}
+
+
}