diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java index f7c9daaa7f..2e2eff448f 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertConfig.java @@ -25,6 +25,8 @@ import org.springframework.stereotype.Component; public final class AlertConfig { private int port; + private int waitTimeout; + public int getPort() { return port; } @@ -32,4 +34,13 @@ public final class AlertConfig { public void setPort(final int port) { this.port = port; } + + public int getWaitTimeout() { + return waitTimeout; + } + + public void setWaitTimeout(final int waitTimeout) { + this.waitTimeout = waitTimeout; + } + } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java index 455b797760..ad92862ba9 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java @@ -40,6 +40,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,10 +53,12 @@ public final class AlertSenderService extends Thread { private final AlertDao alertDao; private final AlertPluginManager alertPluginManager; + private final AlertConfig alertConfig; - public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) { + public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager, AlertConfig alertConfig) { this.alertDao = alertDao; this.alertPluginManager = alertPluginManager; + this.alertConfig = alertConfig; } @Override @@ -221,9 +225,20 @@ public final class AlertSenderService extends Thread { AlertInfo alertInfo = new AlertInfo(); alertInfo.setAlertData(alertData); alertInfo.setAlertParams(paramsMap); + int waitTimeout = alertConfig.getWaitTimeout(); AlertResult alertResult; try { - alertResult = alertChannel.get().process(alertInfo); + if (waitTimeout <= 0) { + alertResult = alertChannel.get().process(alertInfo); + } else { + CompletableFuture future = + CompletableFuture.supplyAsync(() -> alertChannel.get().process(alertInfo)); + alertResult = future.get(waitTimeout, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + alertResult = new AlertResult("false", e.getMessage()); + logger.error("send alert error alert data id :{},", alertData.getId(), e); + Thread.currentThread().interrupt(); } catch (Exception e) { alertResult = new AlertResult("false", e.getMessage()); logger.error("send alert error alert data id :{},", alertData.getId(), e); diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml index 11625901e4..e707f96041 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/resources/application.yaml @@ -54,6 +54,9 @@ management: alert: port: 50052 + # Mark each alert of alert server if late after x milliseconds as failed. + # Define value is (0 = infinite), and alert server would be waiting alert result. + wait-timeout: 0 metrics: enabled: true diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java index 7b7853daf5..cdc2f83443 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.alert.runner; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import org.apache.dolphinscheduler.alert.AlertConfig; import org.apache.dolphinscheduler.alert.AlertPluginManager; import org.apache.dolphinscheduler.alert.AlertSenderService; import org.apache.dolphinscheduler.alert.api.AlertChannel; @@ -55,6 +56,8 @@ public class AlertSenderServiceTest { private PluginDao pluginDao; @Mock private AlertPluginManager alertPluginManager; + @Mock + private AlertConfig alertConfig; @InjectMocks private AlertSenderService alertSenderService; @@ -73,6 +76,7 @@ public class AlertSenderServiceTest { //1.alert instance does not exist when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null); + when(alertConfig.getWaitTimeout()).thenReturn(0); AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertFalse(alertSendResponseCommand.getResStatus()); @@ -102,6 +106,7 @@ public class AlertSenderServiceTest { AlertChannel alertChannelMock = mock(AlertChannel.class); when(alertChannelMock.process(Mockito.any())).thenReturn(null); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); + when(alertConfig.getWaitTimeout()).thenReturn(0); alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertFalse(alertSendResponseCommand.getResStatus()); @@ -126,6 +131,7 @@ public class AlertSenderServiceTest { alertResult.setMessage(String.format("Alert Plugin %s send success", pluginInstanceName)); when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); + when(alertConfig.getWaitTimeout()).thenReturn(5000); alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertTrue(alertSendResponseCommand.getResStatus()); diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 4640ce71ba..c50883da62 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -154,6 +154,9 @@ worker: alert: port: 50052 + # Mark each alert of alert server if late after x milliseconds as failed. + # Define value is (0 = infinite), and alert server would be waiting alert result. + wait-timeout: 0 python-gateway: # Weather enable python gateway server or not. The default value is true.