Browse Source

[Fix-9221] [alert-server] optimization and gracefully close (#9246)

* [Fix-9221] [alert-server] optimization and gracefully close

This closes #9221

* [Fix-9221] [alert-server] remove unused mock data

This closes #9221

* [Fix-9221] [alert-server] remove unused mock data

This closes #9221

* [Fix-9221] [alert-server] remove unnecessary Mockito stubbings

* [Fix-9221] [alert-server] init AlertPluginManager in AlertServer

* [Fix-9221] [alert-server] AlertServerTest add AlertPluginManager installPlugin

* [Fix-9221] [alert-server] replace @Eventlistener with @PostConstruct

* [Fix-9221] [alert-server] sonar check solution

* [Improvement-9221] [alert] update constructor injection and replace IStoppable with Closeable

Co-authored-by: guoshupei <guoshupei@lixiang.com>
3.0.0/version-upgrade
guoshupei 3 years ago committed by GitHub
parent
commit
ca95d2f928
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java
  2. 8
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java
  3. 43
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
  4. 107
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  5. 17
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
  6. 21
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java
  7. 35
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
  8. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  9. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java

10
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java

@ -41,8 +41,6 @@ import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
@ -55,14 +53,14 @@ public final class AlertPluginManager {
private final PluginDao pluginDao;
private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
private final PluginParams warningTypeParams = getWarningTypeParams();
public AlertPluginManager(PluginDao pluginDao) {
this.pluginDao = pluginDao;
}
private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();
private final PluginParams warningTypeParams = getWarningTypeParams();
public PluginParams getWarningTypeParams() {
return
RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)

8
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java

@ -36,10 +36,10 @@ import io.netty.channel.Channel;
public final class AlertRequestProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
private final AlertSender alertSender;
private final AlertSenderService alertSenderService;
public AlertRequestProcessor(AlertSender alertSender) {
this.alertSender = alertSender;
public AlertRequestProcessor(AlertSenderService alertSenderService) {
this.alertSenderService = alertSenderService;
}
@Override
@ -51,7 +51,7 @@ public final class AlertRequestProcessor implements NettyRequestProcessor {
logger.info("Received command : {}", alertSendRequestCommand);
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(
AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(
alertSendRequestCommand.getGroupId(),
alertSendRequestCommand.getTitle(),
alertSendRequestCommand.getContent(),

43
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java

@ -17,21 +17,26 @@
package org.apache.dolphinscheduler.alert;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashSet;
@ -40,22 +45,39 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public final class AlertSender {
private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
@Service
public final class AlertSenderService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager) {
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) {
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
}
@Override
public synchronized void start() {
super.setName("AlertSenderService");
super.start();
}
@Override
public void run() {
logger.info("alert sender started");
while (Stopper.isRunning()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
}
}
}
public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
//get alert group from alert
@ -93,7 +115,6 @@ public final class AlertSender {
}
alertDao.updateAlert(alertStatus, "", alert.getId());
}
}
/**

107
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -17,73 +17,95 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
import javax.annotation.PreDestroy;
import java.io.Closeable;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
public class AlertServer implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
private final PluginDao pluginDao;
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
private final AlertSender alertSender;
private final AlertSenderService alertSenderService;
private final AlertRequestProcessor alertRequestProcessor;
private final AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;
private NettyRemotingServer server;
@Autowired
private AlertConfig config;
public AlertServer(PluginDao pluginDao, AlertDao alertDao, AlertPluginManager alertPluginManager, AlertSender alertSender, AlertRequestProcessor alertRequestProcessor) {
public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
this.pluginDao = pluginDao;
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
this.alertSender = alertSender;
this.alertSenderService = alertSenderService;
this.alertRequestProcessor = alertRequestProcessor;
this.alertConfig = alertConfig;
}
/**
* alert server startup, not use web service
*
* @param args arguments
*/
public static void main(String[] args) {
SpringApplication.run(AlertServer.class, args);
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
new SpringApplicationBuilder(AlertServer.class).web(WebApplicationType.NONE).run(args);
}
@EventListener
public void start(ApplicationReadyEvent readyEvent) {
logger.info("Starting Alert server");
public void run(ApplicationReadyEvent readyEvent) {
logger.info("alert server starting...");
checkTable();
startServer();
Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(new Sender(), 5, 5, TimeUnit.SECONDS);
alertSenderService.start();
}
@Override
@PreDestroy
public void close() {
server.close();
destroy("alert server destroy");
}
/**
* gracefully stop
*
* @param cause stop cause
*/
public void destroy(String cause) {
try {
// execute only once
if (Stopper.isStopped()) {
return;
}
logger.info("alert server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(3000L);
// close
this.nettyRemotingServer.close();
} catch (Exception e) {
logger.error("alert server stop exception ", e);
}
}
private void checkTable() {
@ -95,26 +117,11 @@ public class AlertServer implements Closeable {
private void startServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(config.getPort());
serverConfig.setListenPort(alertConfig.getPort());
server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
server.start();
nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
nettyRemotingServer.start();
}
final class Sender implements Runnable {
@Override
public void run() {
if (!Stopper.isRunning()) {
return;
}
try {
final List<Alert> alerts = alertDao.listPendingAlerts();
alertSender.send(alerts);
} catch (Exception e) {
logger.error("Failed to send alert", e);
}
}
}
}

17
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java

@ -18,7 +18,9 @@
package org.apache.dolphinscheduler.alert;
import junit.framework.TestCase;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.junit.Assert;
@ -27,9 +29,13 @@ import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
import java.util.List;
@RunWith(MockitoJUnitRunner.class)
public class AlertServerTest extends TestCase {
@ -43,18 +49,25 @@ public class AlertServerTest extends TestCase {
@Mock
private AlertConfig alertConfig;
@Mock
private AlertSenderService alertSenderService;
@Test
public void testStart() {
Mockito.when(pluginDao.checkPluginDefineTableExist()).thenReturn(true);
Mockito.when(alertConfig.getPort()).thenReturn(50053);
alertServer.start(null);
Mockito.doNothing().when(alertSenderService).start();
alertServer.run(null);
NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "server");
NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "nettyRemotingServer");
NettyServerConfig nettyServerConfig = Whitebox.getInternalState(nettyRemotingServer, "serverConfig");
Assert.assertEquals(50053, nettyServerConfig.getListenPort());
}
}

21
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java

@ -20,30 +20,35 @@ package org.apache.dolphinscheduler.alert.processor;
import static org.mockito.Mockito.mock;
import org.apache.dolphinscheduler.alert.AlertRequestProcessor;
import org.apache.dolphinscheduler.alert.AlertSender;
import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import io.netty.channel.Channel;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class AlertRequestProcessorTest {
@InjectMocks
private AlertRequestProcessor alertRequestProcessor;
@Before
public void before() {
final AlertDao alertDao = mock(AlertDao.class);
alertRequestProcessor = new AlertRequestProcessor(new AlertSender(alertDao, null));
}
@Mock
private AlertSenderService alertSenderService;
@Test
public void testProcess() {
Mockito.when(alertSenderService.syncHandler(1, "title", "content", WarningType.FAILURE.getCode())).thenReturn(new AlertSendResponseCommand());
Channel channel = mock(Channel.class);
AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1, "title", "content", WarningType.FAILURE.getCode());
Command reqCommand = alertSendRequestCommand.convert2Command();

35
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java → dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java

@ -21,7 +21,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.alert.AlertPluginManager;
import org.apache.dolphinscheduler.alert.AlertSender;
import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.enums.WarningType;
@ -39,24 +39,29 @@ import java.util.Optional;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlertSenderTest {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class);
public class AlertSenderServiceTest {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderServiceTest.class);
@Mock
private AlertDao alertDao;
@Mock
private PluginDao pluginDao;
@Mock
private AlertPluginManager alertPluginManager;
private AlertSender alertSender;
@InjectMocks
private AlertSenderService alertSenderService;
@Before
public void before() {
alertDao = mock(AlertDao.class);
pluginDao = mock(PluginDao.class);
alertPluginManager = mock(AlertPluginManager.class);
MockitoAnnotations.initMocks(this);
}
@Test
@ -65,12 +70,11 @@ public class AlertSenderTest {
int alertGroupId = 1;
String title = "alert mail test title";
String content = "alert mail test content";
alertSender = new AlertSender(alertDao, alertPluginManager);
//1.alert instance does not exist
when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -89,7 +93,7 @@ public class AlertSenderTest {
PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null);
when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine);
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -99,7 +103,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(null);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -111,7 +115,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -123,7 +127,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertTrue(alertSendResponseCommand.getResStatus());
alertSendResponseCommand.getResResults().forEach(result ->
logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage()));
@ -143,7 +147,7 @@ public class AlertSenderTest {
alert.setWarningType(WarningType.FAILURE);
alertList.add(alert);
alertSender = new AlertSender(alertDao, alertPluginManager);
// alertSenderService = new AlertSenderService();
int pluginDefineId = 1;
String pluginInstanceParams = "alert-instance-mail-params";
@ -165,6 +169,7 @@ public class AlertSenderTest {
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
Assert.assertTrue(Boolean.parseBoolean(alertResult.getStatus()));
alertSender.send(alertList);
when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>());
alertSenderService.send(alertList);
}
}

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -327,6 +327,7 @@ public final class Constants {
public static final String NULL = "NULL";
public static final String THREAD_NAME_MASTER_SERVER = "Master-Server";
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server";
/**
* command parameter keys

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java

@ -88,4 +88,14 @@ public class AlertSendRequestCommand implements Serializable {
command.setBody(body);
return command;
}
@Override
public String toString() {
return "AlertSendRequestCommand{" +
"groupId=" + groupId +
", title='" + title + '\'' +
", content='" + content + '\'' +
", warnType=" + warnType +
'}';
}
}

Loading…
Cancel
Save