Browse Source

[FIX-10784][Bug] [ALERT-SERVER] FEISHU Plugin might block the whole alert process loop (#10888)

* closed 10784 [Bug] [ALERT-SERVER] FEISHU Plugin might block the whole alert process loop
3.0.0/version-upgrade
pinkhello 2 years ago committed by caishunfeng
parent
commit
bc690501c4
  1. 11
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java
  2. 17
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java
  3. 3
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml
  4. 6
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java
  5. 3
      dolphinscheduler-standalone-server/src/main/resources/application.yaml

11
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java

@ -25,6 +25,8 @@ import org.springframework.stereotype.Component;
public final class AlertConfig { public final class AlertConfig {
private int port; private int port;
private int waitTimeout;
public int getPort() { public int getPort() {
return port; return port;
} }
@ -32,4 +34,13 @@ public final class AlertConfig {
public void setPort(final int port) { public void setPort(final int port) {
this.port = port; this.port = port;
} }
public int getWaitTimeout() {
return waitTimeout;
}
public void setWaitTimeout(final int waitTimeout) {
this.waitTimeout = waitTimeout;
}
} }

17
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java

@ -40,6 +40,8 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,10 +53,12 @@ public final class AlertSenderService extends Thread {
private final AlertDao alertDao; private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager; private final AlertPluginManager alertPluginManager;
private final AlertConfig alertConfig;
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) { public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager, AlertConfig alertConfig) {
this.alertDao = alertDao; this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager; this.alertPluginManager = alertPluginManager;
this.alertConfig = alertConfig;
} }
@Override @Override
@ -221,9 +225,20 @@ public final class AlertSenderService extends Thread {
AlertInfo alertInfo = new AlertInfo(); AlertInfo alertInfo = new AlertInfo();
alertInfo.setAlertData(alertData); alertInfo.setAlertData(alertData);
alertInfo.setAlertParams(paramsMap); alertInfo.setAlertParams(paramsMap);
int waitTimeout = alertConfig.getWaitTimeout();
AlertResult alertResult; AlertResult alertResult;
try { try {
if (waitTimeout <= 0) {
alertResult = alertChannel.get().process(alertInfo); alertResult = alertChannel.get().process(alertInfo);
} else {
CompletableFuture<AlertResult> future =
CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo));
alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
alertResult = new AlertResult("false", e.getMessage());
logger.error("send alert error alert data id :{},", alertData.getId(), e);
Thread.currentThread().interrupt();
} catch (Exception e) { } catch (Exception e) {
alertResult = new AlertResult("false", e.getMessage()); alertResult = new AlertResult("false", e.getMessage());
logger.error("send alert error alert data id :{},", alertData.getId(), e); logger.error("send alert error alert data id :{},", alertData.getId(), e);

3
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml

@ -54,6 +54,9 @@ management:
alert: alert:
port: 50052 port: 50052
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
metrics: metrics:
enabled: true enabled: true

6
dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert.runner;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.alert.AlertConfig;
import org.apache.dolphinscheduler.alert.AlertPluginManager; import org.apache.dolphinscheduler.alert.AlertPluginManager;
import org.apache.dolphinscheduler.alert.AlertSenderService; import org.apache.dolphinscheduler.alert.AlertSenderService;
import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertChannel;
@ -55,6 +56,8 @@ public class AlertSenderServiceTest {
private PluginDao pluginDao; private PluginDao pluginDao;
@Mock @Mock
private AlertPluginManager alertPluginManager; private AlertPluginManager alertPluginManager;
@Mock
private AlertConfig alertConfig;
@InjectMocks @InjectMocks
private AlertSenderService alertSenderService; private AlertSenderService alertSenderService;
@ -73,6 +76,7 @@ public class AlertSenderServiceTest {
//1.alert instance does not exist //1.alert instance does not exist
when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null); when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null);
when(alertConfig.getWaitTimeout()).thenReturn(0);
AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
@ -102,6 +106,7 @@ public class AlertSenderServiceTest {
AlertChannel alertChannelMock = mock(AlertChannel.class); AlertChannel alertChannelMock = mock(AlertChannel.class);
when(alertChannelMock.process(Mockito.any())).thenReturn(null); when(alertChannelMock.process(Mockito.any())).thenReturn(null);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
when(alertConfig.getWaitTimeout()).thenReturn(0);
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertFalse(alertSendResponseCommand.getResStatus()); Assert.assertFalse(alertSendResponseCommand.getResStatus());
@ -126,6 +131,7 @@ public class AlertSenderServiceTest {
alertResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName)); alertResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName));
when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult);
when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock));
when(alertConfig.getWaitTimeout()).thenReturn(5000);
alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode());
Assert.assertTrue(alertSendResponseCommand.getResStatus()); Assert.assertTrue(alertSendResponseCommand.getResStatus());

3
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -154,6 +154,9 @@ worker:
alert: alert:
port: 50052 port: 50052
# Mark each alert of alert server if late after x milliseconds as failed.
# Define value is (0 = infinite), and alert server would be waiting alert result.
wait-timeout: 0
python-gateway: python-gateway:
# Weather enable python gateway server or not. The default value is true. # Weather enable python gateway server or not. The default value is true.

Loading…
Cancel
Save