Browse Source

Merge pull request #3979 from zhuangchong/alert_spi_support_sync_service

[Feature-3754][Alert] Alert module support service for other service invoke
pull/3/MERGE
Kirs 4 years ago committed by GitHub
parent
commit
c61e536801
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-alert/pom.xml
  2. 54
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  3. 67
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
  4. 98
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
  5. 87
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
  6. 61
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
  7. 181
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java
  8. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  9. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java
  10. 80
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java
  11. 75
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java
  12. 52
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseResult.java
  13. 42
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java
  14. 48
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommandTest.java
  15. 94
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
  16. 152
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java
  17. 6
      pom.xml

8
dolphinscheduler-alert/pom.xml

@ -35,6 +35,14 @@
<groupId>org.apache.dolphinscheduler</groupId> <groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId> <artifactId>dolphinscheduler-spi</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-remote</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

54
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -17,9 +17,12 @@
package org.apache.dolphinscheduler.alert; package org.apache.dolphinscheduler.alert;
import static org.apache.dolphinscheduler.common.Constants.ALERT_RPC_PORT;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader; import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig; import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.alert.processor.AlertRequestProcessor;
import org.apache.dolphinscheduler.alert.runner.AlertSender; import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants; import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.alert.utils.PropertyUtils; import org.apache.dolphinscheduler.alert.utils.PropertyUtils;
@ -28,6 +31,9 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.List; import java.util.List;
@ -63,6 +69,11 @@ public class AlertServer {
public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository"; public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
/**
* netty server
*/
private NettyRemotingServer server;
private static class AlertServerHolder { private static class AlertServerHolder {
private static final AlertServer INSTANCE = new AlertServer(); private static final AlertServer INSTANCE = new AlertServer();
} }
@ -96,11 +107,21 @@ public class AlertServer {
} }
} }
public void start() { /**
* init netty remoting server
initPlugin(); */
private void initRemoteServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(ALERT_RPC_PORT);
this.server = new NettyRemotingServer(serverConfig);
this.server.registerProcessor(CommandType.ALERT_SEND_REQUEST, new AlertRequestProcessor(alertDao, alertPluginManager, pluginDao));
this.server.start();
}
logger.info("alert server ready start "); /**
* Cyclic alert info sending alert
*/
private void runSender() {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
Thread.sleep(Constants.ALERT_SCAN_INTERVAL); Thread.sleep(Constants.ALERT_SCAN_INTERVAL);
@ -118,10 +139,33 @@ public class AlertServer {
} }
} }
/**
* start
*/
public void start() {
initPlugin();
initRemoteServer();
logger.info("alert server ready start ");
runSender();
}
/**
* stop
*/
public void stop() {
this.server.close();
logger.info("alert server shut down");
}
public static void main(String[] args) { public static void main(String[] args) {
System.out.println(System.getProperty("user.dir"));
AlertServer alertServer = AlertServer.getInstance(); AlertServer alertServer = AlertServer.getInstance();
alertServer.start(); alertServer.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
alertServer.stop();
}
});
} }
} }

67
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.processor;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
* alert request processor
*/
public class AlertRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
private AlertDao alertDao;
private PluginDao pluginDao;
private AlertPluginManager alertPluginManager;
public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
this.alertDao = alertDao;
this.pluginDao = pluginDao;
this.alertPluginManager = alertPluginManager;
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.ALERT_SEND_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
AlertSendRequestCommand alertSendRequestCommand = JsonSerializer.deserialize(
command.getBody(), AlertSendRequestCommand.class);
logger.info("received command : {}", alertSendRequestCommand);
AlertSender alertSender = new AlertSender(alertDao, alertPluginManager, pluginDao);
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getTitle(), alertSendRequestCommand.getContent());
channel.writeAndFlush(alertSendResponseCommand.convert2Command(command.getOpaque()));
}
}

98
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java

@ -19,15 +19,19 @@ package org.apache.dolphinscheduler.alert.runner;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao; import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.apache.dolphinscheduler.spi.alert.AlertChannel; import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertData; import org.apache.dolphinscheduler.spi.alert.AlertData;
import org.apache.dolphinscheduler.spi.alert.AlertInfo; import org.apache.dolphinscheduler.spi.alert.AlertInfo;
import org.apache.dolphinscheduler.spi.alert.AlertResult; import org.apache.dolphinscheduler.spi.alert.AlertResult;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -49,6 +53,13 @@ public class AlertSender {
this.alertPluginManager = alertPluginManager; this.alertPluginManager = alertPluginManager;
} }
public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
super();
this.alertDao = alertDao;
this.pluginDao = pluginDao;
this.alertPluginManager = alertPluginManager;
}
public AlertSender(List<Alert> alertList, AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) { public AlertSender(List<Alert> alertList, AlertDao alertDao, AlertPluginManager alertPluginManager, PluginDao pluginDao) {
super(); super();
this.alertList = alertList; this.alertList = alertList;
@ -71,33 +82,94 @@ public class AlertSender {
for (AlertPluginInstance instance : alertInstanceList) { for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
AlertStatus alertStatus = Boolean.parseBoolean(String.valueOf(alertResult.getStatus())) ? AlertStatus.EXECUTION_SUCCESS : AlertStatus.EXECUTION_FAILURE;
alertDao.updateAlert(alertStatus, alertResult.getMessage(), alert.getId());
}
}
}
/**
* sync send alert handler
* @param alertGroupId alertGroupId
* @param title title
* @param content content
* @return AlertSendResponseCommand
*/
public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = new AlertData();
alertData.setContent(title)
.setTitle(content);
boolean sendResponseStatus = true;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
if (CollectionUtils.isEmpty(alertInstanceList)) {
sendResponseStatus = false;
AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult();
String message = String.format("Alert GroupId %s send error : not found alert instance",alertGroupId);
alertSendResponseResult.setStatus(sendResponseStatus);
alertSendResponseResult.setMessage(message);
sendResponseResults.add(alertSendResponseResult);
logger.error("Alert GroupId {} send error : not found alert instance", alertGroupId);
return new AlertSendResponseCommand(sendResponseStatus,sendResponseResults);
}
for (AlertPluginInstance instance : alertInstanceList) {
AlertResult alertResult = this.alertResultHandler(instance, alertData);
AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
Boolean.parseBoolean(String.valueOf(alertResult.getStatus())),alertResult.getMessage());
sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
sendResponseResults.add(alertSendResponseResult);
}
return new AlertSendResponseCommand(sendResponseStatus,sendResponseResults);
}
/**
* alert result handler
* @param instance instance
* @param alertData alertData
* @return AlertResult
*/
private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData alertData) {
String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName(); String pluginName = pluginDao.getPluginDefineById(instance.getPluginDefineId()).getPluginName();
String pluginInstanceName = instance.getInstanceName();
AlertInfo alertInfo = new AlertInfo();
alertInfo.setAlertData(alertData);
alertInfo.setAlertParams(instance.getPluginInstanceParams());
AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName); AlertChannel alertChannel = alertPluginManager.getAlertChannelMap().get(pluginName);
AlertResult alertResultExtend = new AlertResult();
String pluginInstanceName = instance.getInstanceName();
if (alertChannel == null) { if (alertChannel == null) {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "Alert send error, not found plugin " + pluginName, alert.getId()); String message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName);
alertResultExtend.setStatus(String.valueOf(false));
alertResultExtend.setMessage(message);
logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName); logger.error("Alert Plugin {} send error : not found plugin {}", pluginInstanceName, pluginName);
continue; return alertResultExtend;
} }
AlertInfo alertInfo = new AlertInfo();
alertInfo.setAlertData(alertData);
alertInfo.setAlertParams(instance.getPluginInstanceParams());
AlertResult alertResult = alertChannel.process(alertInfo); AlertResult alertResult = alertChannel.process(alertInfo);
if (alertResult == null) { if (alertResult == null) {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId()); String message = String.format("Alert Plugin %s send error : return alertResult value is null",pluginInstanceName);
logger.info("Alert Plugin {} send error : return value is null", pluginInstanceName); alertResultExtend.setStatus(String.valueOf(false));
alertResultExtend.setMessage(message);
logger.info("Alert Plugin {} send error : return alertResult value is null", pluginInstanceName);
} else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) { } else if (!Boolean.parseBoolean(String.valueOf(alertResult.getStatus()))) {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(alertResult.getMessage()), alert.getId()); alertResultExtend.setStatus(String.valueOf(false));
alertResultExtend.setMessage(alertResult.getMessage());
logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage()); logger.info("Alert Plugin {} send error : {}", pluginInstanceName, alertResult.getMessage());
} else { } else {
alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, alertResult.getMessage(), alert.getId()); String message = String.format("Alert Plugin %s send success",pluginInstanceName);
alertResultExtend.setStatus(String.valueOf(true));
alertResultExtend.setMessage(message);
logger.info("Alert Plugin {} send success", pluginInstanceName); logger.info("Alert Plugin {} send success", pluginInstanceName);
} }
} return alertResultExtend;
}
} }
} }

87
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({AlertServer.class,DaoFactory.class})
public class AlertServerTest {
@Before
public void before() {
}
@Test
public void testMain() throws Exception {
AlertDao alertDao = PowerMockito.mock(AlertDao.class);
PowerMockito.mockStatic(DaoFactory.class);
PowerMockito.when(DaoFactory.getDaoInstance(AlertDao.class)).thenReturn(alertDao);
PluginDao pluginDao = PowerMockito.mock(PluginDao.class);
PowerMockito.when(DaoFactory.getDaoInstance(PluginDao.class)).thenReturn(pluginDao);
AlertChannel alertChannelMock = PowerMockito.mock(AlertChannel.class);
AlertPluginManager alertPluginManager = PowerMockito.mock(AlertPluginManager.class);
PowerMockito.whenNew(AlertPluginManager.class).withNoArguments().thenReturn(alertPluginManager);
ConcurrentHashMap alertChannelMap = new ConcurrentHashMap<>();
alertChannelMap.put("pluginName",alertChannelMock);
PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
DolphinPluginManagerConfig alertPluginManagerConfig = PowerMockito.mock(DolphinPluginManagerConfig.class);
PowerMockito.whenNew(DolphinPluginManagerConfig.class).withNoArguments().thenReturn(alertPluginManagerConfig);
NettyRemotingServer nettyRemotingServer = PowerMockito.mock(NettyRemotingServer.class);
PowerMockito.whenNew(NettyRemotingServer.class).withAnyArguments().thenReturn(nettyRemotingServer);
AlertSender alertSender = PowerMockito.mock(AlertSender.class);
PowerMockito.whenNew(AlertSender.class).withAnyArguments().thenReturn(alertSender);
AlertServer alertServer = AlertServer.getInstance();
Assert.assertNotNull(alertServer);
new Thread(() -> {
alertServer.start(); })
.start();
Thread.sleep(5 * Constants.ALERT_SCAN_INTERVAL);
alertServer.stop();
}
}

61
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.processor;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;
import io.netty.channel.Channel;
/**
* alert request processor test
*/
public class AlertRequestProcessorTest {
private AlertDao alertDao;
private PluginDao pluginDao;
private AlertPluginManager alertPluginManager;
private AlertRequestProcessor alertRequestProcessor;
@Before
public void before() {
alertDao = PowerMockito.mock(AlertDao.class);
pluginDao = PowerMockito.mock(PluginDao.class);
alertPluginManager = PowerMockito.mock(AlertPluginManager.class);
alertRequestProcessor = new AlertRequestProcessor(alertDao,alertPluginManager,pluginDao);
}
@Test
public void testProcess() {
Channel channel = PowerMockito.mock(Channel.class);
AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1,"title","content");
Command reqCommand = alertSendRequestCommand.convert2Command();
Assert.assertEquals(CommandType.ALERT_SEND_REQUEST,reqCommand.getType());
alertRequestProcessor.process(channel,reqCommand);
}
}

181
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.runner;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.spi.alert.AlertChannel;
import org.apache.dolphinscheduler.spi.alert.AlertResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* alert sender test
*/
public class AlertSenderTest {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class);
private AlertDao alertDao;
private PluginDao pluginDao;
private AlertPluginManager alertPluginManager;
private AlertSender alertSender;
@Before
public void before() {
alertDao = PowerMockito.mock(AlertDao.class);
pluginDao = PowerMockito.mock(PluginDao.class);
alertPluginManager = PowerMockito.mock(AlertPluginManager.class);
}
@Test
public void testSyncHandler() {
int alertGroupId = 1;
String title = "alert mail test title";
String content = "alert mail test content";
alertSender = new AlertSender(alertDao,alertPluginManager,pluginDao);
//1.alert instance does not exist
PowerMockito.when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//2.alert plugin does not exist
int pluginDefineId = 1;
String pluginInstanceParams = "alert-instance-mail-params";
String pluginInstanceName = "alert-instance-mail";
List<AlertPluginInstance> alertInstanceList = new ArrayList<>();
AlertPluginInstance alertPluginInstance = new AlertPluginInstance(
pluginDefineId,pluginInstanceParams,alertGroupId,pluginInstanceName);
alertInstanceList.add(alertPluginInstance);
PowerMockito.when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(alertInstanceList);
String pluginName = "alert-plugin-mail";
PluginDefine pluginDefine = new PluginDefine(pluginName,"1",null);
PowerMockito.when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//3.alert result value is null
AlertChannel alertChannelMock = PowerMockito.mock(AlertChannel.class);
PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(null);
Map<String, AlertChannel> alertChannelMap = new ConcurrentHashMap<>();
alertChannelMap.put(pluginName,alertChannelMock);
PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//4.abnormal information inside the alert plug-in code
AlertResult alertResult = new AlertResult();
alertResult.setStatus(String.valueOf(false));
alertResult.setMessage("Abnormal information inside the alert plug-in code");
PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
alertChannelMap = new ConcurrentHashMap<>();
alertChannelMap.put(pluginName,alertChannelMock);
PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//5.alert plugin send success
alertResult = new AlertResult();
alertResult.setStatus(String.valueOf(true));
alertResult.setMessage(String.format("Alert Plugin %s send success",pluginInstanceName));
PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
alertChannelMap = new ConcurrentHashMap<>();
alertChannelMap.put(pluginName,alertChannelMock);
PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content);
Assert.assertTrue(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
}
@Test
public void testRun() {
int alertGroupId = 1;
String title = "alert mail test title";
String content = "alert mail test content";
List<Alert> alertList = new ArrayList<>();
Alert alert = new Alert();
alert.setAlertGroupId(alertGroupId);
alert.setTitle(title);
alert.setContent(content);
alertList.add(alert);
alertSender = new AlertSender(alertList,alertDao,alertPluginManager,pluginDao);
int pluginDefineId = 1;
String pluginInstanceParams = "alert-instance-mail-params";
String pluginInstanceName = "alert-instance-mail";
List<AlertPluginInstance> alertInstanceList = new ArrayList<>();
AlertPluginInstance alertPluginInstance = new AlertPluginInstance(
pluginDefineId,pluginInstanceParams,alertGroupId,pluginInstanceName);
alertInstanceList.add(alertPluginInstance);
PowerMockito.when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(alertInstanceList);
String pluginName = "alert-plugin-mail";
PluginDefine pluginDefine = new PluginDefine(pluginName,"1",null);
PowerMockito.when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
AlertResult alertResult = new AlertResult();
alertResult.setStatus(String.valueOf(true));
alertResult.setMessage(String.format("Alert Plugin %s send success",pluginInstanceName));
AlertChannel alertChannelMock = PowerMockito.mock(AlertChannel.class);
PowerMockito.when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
ConcurrentHashMap alertChannelMap = new ConcurrentHashMap<>();
alertChannelMap.put(pluginName,alertChannelMock);
PowerMockito.when(alertPluginManager.getAlertChannelMap()).thenReturn(alertChannelMap);
Assert.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
alertSender.run();
}
}

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -387,13 +387,17 @@ public final class Constants {
*/ */
public static final int SEC_2_MINUTES_TIME_UNIT = 60; public static final int SEC_2_MINUTES_TIME_UNIT = 60;
/*** /***
* *
* rpc port * rpc port
*/ */
public static final int RPC_PORT = 50051; public static final int RPC_PORT = 50051;
/***
* alert rpc port
*/
public static final int ALERT_RPC_PORT = 50052;
/** /**
* forbid running task * forbid running task
*/ */

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * remove task log request, */ REMOVE_TAK_LOG_REQUEST, /** * remove task log response */ REMOVE_TAK_LOG_RESPONSE, /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG, /** * alert send request */ ALERT_SEND_REQUEST, /** * alert send response */ ALERT_SEND_RESPONSE; }

80
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.alert;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
public class AlertSendRequestCommand implements Serializable {
private int groupId;
private String title;
private String content;
public int getGroupId() {
return groupId;
}
public void setGroupId(int groupId) {
this.groupId = groupId;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public AlertSendRequestCommand(){
}
public AlertSendRequestCommand(int groupId, String title, String content) {
this.groupId = groupId;
this.title = title;
this.content = content;
}
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.ALERT_SEND_REQUEST);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}

75
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommand.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.alert;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable;
import java.util.List;
public class AlertSendResponseCommand implements Serializable {
/**
* true:All alert are successful,
* false:As long as one alert fails
*/
private boolean resStatus;
private List<AlertSendResponseResult> resResults;
public boolean getResStatus() {
return resStatus;
}
public void setResStatus(boolean resStatus) {
this.resStatus = resStatus;
}
public List<AlertSendResponseResult> getResResults() {
return resResults;
}
public void setResResults(List<AlertSendResponseResult> resResults) {
this.resResults = resResults;
}
public AlertSendResponseCommand() {
}
public AlertSendResponseCommand(boolean resStatus, List<AlertSendResponseResult> resResults) {
this.resStatus = resStatus;
this.resResults = resResults;
}
/**
* package response command
*
* @param opaque request unique identification
* @return command
*/
public Command convert2Command(long opaque) {
Command command = new Command(opaque);
command.setType(CommandType.ALERT_SEND_RESPONSE);
byte[] body = JsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}

52
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseResult.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.alert;
import java.io.Serializable;
public class AlertSendResponseResult implements Serializable {
private boolean status;
private String message;
public boolean getStatus() {
return status;
}
public void setStatus(boolean status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public AlertSendResponseResult() {
}
public AlertSendResponseResult(boolean status, String message) {
this.status = status;
this.message = message;
}
}

42
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommandTest.java

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.alert;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.junit.Assert;
import org.junit.Test;
public class AlertSendRequestCommandTest {
@Test
public void testConvert2Command() {
int groupId = 1;
String title = "test-title";
String content = "test-content";
AlertSendRequestCommand requestCommand = new AlertSendRequestCommand(groupId,title,content);
Command command = requestCommand.convert2Command();
Assert.assertEquals(CommandType.ALERT_SEND_REQUEST,command.getType());
AlertSendRequestCommand verifyCommand = new AlertSendRequestCommand();
verifyCommand.setGroupId(groupId);
verifyCommand.setContent(content);
verifyCommand.setTitle(title);
}
}

48
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendResponseCommandTest.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command.alert;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class AlertSendResponseCommandTest {
@Test
public void testConvert2Command() {
AlertSendResponseCommand alertSendResponseCommand = new AlertSendResponseCommand();
alertSendResponseCommand.setResStatus(false);
List<AlertSendResponseResult> responseResults = new ArrayList<>();
AlertSendResponseResult responseResult1 = new AlertSendResponseResult();
responseResult1.setStatus(false);
responseResult1.setMessage("fail");
responseResults.add(responseResult1);
AlertSendResponseResult responseResult2 = new AlertSendResponseResult(true,"success");
responseResults.add(responseResult2);
alertSendResponseCommand.setResResults(responseResults);
Command command = alertSendResponseCommand.convert2Command(1);
Assert.assertEquals(CommandType.ALERT_SEND_RESPONSE,command.getType());
}
}

94
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.alert;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlertClientService {
private static final Logger logger = LoggerFactory.getLogger(AlertClientService.class);
private final NettyClientConfig clientConfig;
private final NettyRemotingClient client;
private volatile boolean isRunning;
/**
* request time out
*/
private static final long ALERT_REQUEST_TIMEOUT = 10 * 1000L;
/**
* alert client
*/
public AlertClientService() {
this.clientConfig = new NettyClientConfig();
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
}
/**
* close
*/
public void close() {
this.client.close();
this.isRunning = false;
logger.info("alter client closed");
}
/**
* alert sync send data
* @param host host
* @param port port
* @param groupId groupId
* @param title title
* @param content content
* @return AlertSendResponseCommand
*/
public AlertSendResponseCommand sendAlert(String host, int port, int groupId, String title, String content) {
logger.info("sync alert send, host : {}, port : {}, groupId : {}, title : {} ", host, port, groupId, title);
AlertSendRequestCommand request = new AlertSendRequestCommand(groupId, title, content);
final Host address = new Host(host, port);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, ALERT_REQUEST_TIMEOUT);
if (response != null) {
return JsonSerializer.deserialize(response.getBody(), AlertSendResponseCommand.class);
}
} catch (Exception e) {
logger.error("sync alert send error", e);
} finally {
this.client.closeChannel(address);
}
return null;
}
public boolean isRunning() {
return isRunning;
}
}

152
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/AlertClientServiceTest.java

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.alert;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* alert client service test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({AlertClientService.class})
public class AlertClientServiceTest {
private static final Logger logger = LoggerFactory.getLogger(AlertClientServiceTest.class);
private NettyRemotingClient client;
private AlertClientService alertClient;
@Before
public void before() throws Exception {
client = PowerMockito.mock(NettyRemotingClient.class);
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(client);
alertClient = new AlertClientService();
}
@Test
public void testSendAlert() throws Exception {
String host = "127.0.0.1";
int port = 50501;
int groupId = 1;
String title = "test-title";
String content = "test-content";
//1.alter server does not exist
AlertSendResponseCommand alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
Assert.assertNull(alertSendResponseCommand);
AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(groupId,title,content);
Command reqCommand = alertSendRequestCommand.convert2Command();
boolean sendResponseStatus;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
//2.alter instance does not exist
sendResponseStatus = false;
AlertSendResponseResult alertResult = new AlertSendResponseResult();
String message = String.format("Alert GroupId %s send error : not found alert instance",groupId);
alertResult.setStatus(false);
alertResult.setMessage(message);
sendResponseResults.add(alertResult);
AlertSendResponseCommand alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
Command resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//3.alter plugin does not exist
sendResponseStatus = false;
String pluginInstanceName = "alert-mail";
message = String.format("Alert Plugin %s send error : return value is null",pluginInstanceName);
alertResult.setStatus(false);
alertResult.setMessage(message);
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//4.alter result is null
sendResponseStatus = false;
message = String.format("Alert Plugin %s send error : return result value is null",pluginInstanceName);
alertResult.setStatus(false);
alertResult.setMessage(message);
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//5.abnormal information inside the alert plug-in code
sendResponseStatus = false;
alertResult.setStatus(false);
alertResult.setMessage("Abnormal information inside the alert plug-in code");
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
//6.alert plugin send success
sendResponseStatus = true;
message = String.format("Alert Plugin %s send success",pluginInstanceName);
alertResult.setStatus(true);
alertResult.setMessage(message);
alertSendResponseCommandData = new AlertSendResponseCommand(sendResponseStatus, sendResponseResults);
resCommand = alertSendResponseCommandData.convert2Command(reqCommand.getOpaque());
PowerMockito.when(client.sendSync(Mockito.any(), Mockito.any(), Mockito.anyLong())).thenReturn(resCommand);
alertSendResponseCommand = alertClient.sendAlert(host, port, groupId, title, content);
Assert.assertTrue(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}",result.getStatus(),result.getMessage()));
if (Objects.nonNull(alertClient) && alertClient.isRunning()) {
alertClient.close();
}
}
}

6
pom.xml

@ -867,6 +867,8 @@
<include>**/remote/NettyRemotingClientTest.java</include> <include>**/remote/NettyRemotingClientTest.java</include>
<include>**/remote/NettyUtilTest.java</include> <include>**/remote/NettyUtilTest.java</include>
<include>**/remote/ResponseFutureTest.java</include> <include>**/remote/ResponseFutureTest.java</include>
<include>**/remote/command/alert/AlertSendRequestCommandTest.java</include>
<include>**/remote/command/alert/AlertSendResponseCommandTest.java</include>
<include>**/server/log/LoggerServerTest.java</include> <include>**/server/log/LoggerServerTest.java</include>
<include>**/server/entity/SQLTaskExecutionContextTest.java</include> <include>**/server/entity/SQLTaskExecutionContextTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include> <include>**/server/log/MasterLogFilterTest.java</include>
@ -917,6 +919,7 @@
<include>**/service/zk/ZKServerTest.java</include> <include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include> <include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include> <include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/alert/AlertClientServiceTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include> <include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>--> <!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->
@ -961,6 +964,9 @@
<include>**/alert/utils/DingTalkUtilsTest.java</include> <include>**/alert/utils/DingTalkUtilsTest.java</include>
<include>**/alert/utils/EnterpriseWeChatUtilsTest.java</include> <include>**/alert/utils/EnterpriseWeChatUtilsTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include> <include>**/alert/utils/FuncUtilsTest.java</include>
<include>**/alert/processor/AlertRequestProcessorTest.java</include>
<include>**/alert/runner/AlertSenderTest.java</include>
<include>**/alert/AlertServerTest.java</include>
</includes> </includes>
<!-- <skip>true</skip> --> <!-- <skip>true</skip> -->
</configuration> </configuration>

Loading…
Cancel
Save