Browse Source

Add alert server into standalone-server as well and some minor polish (#6087)

2.0.7-release
kezhenxu94 3 years ago committed by GitHub
parent
commit
dc85e1a73c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      .asf.yaml
  2. 2
      dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/MailSender.java
  3. 85
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  4. 2
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java
  5. 7
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java
  6. 5
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
  7. 47
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java
  8. 60
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/FuncUtilsTest.java
  9. 4
      dolphinscheduler-standalone-server/pom.xml
  10. 67
      dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java
  11. 1153
      sql/dolphinscheduler_h2.sql

5
.asf.yaml

@ -16,10 +16,7 @@
#
github:
description: >
Apache DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG
visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing
various types of jobs available out of box.
description: Apache DolphinScheduler is a distributed and extensible workflow scheduler platform with powerful DAG visual interfaces, dedicated to solving complex job dependencies in the data pipeline and providing various types of jobs available out of box.
homepage: https://dolphinscheduler.apache.org/
labels:
- airflow

2
dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/src/main/java/org/apache/dolphinscheduler/plugin/alert/email/MailSender.java

@ -80,7 +80,7 @@ public class MailSender {
private String sslTrust;
private String showType;
private AlertTemplate alertTemplate;
private String mustNotNull = "must not be null";
private String mustNotNull = " must not be null";
public MailSender(Map<String, String> config) {

85
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -24,8 +24,6 @@ import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.processor.AlertRequestProcessor;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
@ -35,6 +33,8 @@ import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.spi.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.List;
@ -44,45 +44,29 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
/**
* alert of start
*/
public class AlertServer {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);
/**
* Plugin Dao
*/
private PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
private final PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
/**
* Alert Dao
*/
private AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
private AlertSender alertSender;
private final AlertDao alertDao = DaoFactory.getDaoInstance(AlertDao.class);
private AlertPluginManager alertPluginManager;
private DolphinPluginManagerConfig alertPluginManagerConfig;
public static final String ALERT_PLUGIN_BINDING = "alert.plugin.binding";
public static final String ALERT_PLUGIN_DIR = "alert.plugin.dir";
public static final String MAVEN_LOCAL_REPOSITORY = "maven.local.repository";
/**
* netty server
*/
private NettyRemotingServer server;
private static class AlertServerHolder {
private static final AlertServer INSTANCE = new AlertServer();
}
public static final AlertServer getInstance() {
public static AlertServer getInstance() {
return AlertServerHolder.INSTANCE;
}
@ -98,8 +82,7 @@ public class AlertServer {
}
private void initPlugin() {
alertPluginManager = new AlertPluginManager();
alertPluginManagerConfig = new DolphinPluginManagerConfig();
DolphinPluginManagerConfig alertPluginManagerConfig = new DolphinPluginManagerConfig();
alertPluginManagerConfig.setPlugins(PropertyUtils.getString(ALERT_PLUGIN_BINDING));
if (StringUtils.isNotBlank(PropertyUtils.getString(ALERT_PLUGIN_DIR))) {
alertPluginManagerConfig.setInstalledPluginsDir(PropertyUtils.getString(ALERT_PLUGIN_DIR, Constants.ALERT_PLUGIN_PATH).trim());
@ -109,6 +92,7 @@ public class AlertServer {
alertPluginManagerConfig.setMavenLocalRepository(PropertyUtils.getString(MAVEN_LOCAL_REPOSITORY).trim());
}
alertPluginManager = new AlertPluginManager();
DolphinPluginLoader alertPluginLoader = new DolphinPluginLoader(alertPluginManagerConfig, ImmutableList.of(alertPluginManager));
try {
alertPluginLoader.loadPlugins();
@ -117,9 +101,6 @@ public class AlertServer {
}
}
/**
* init netty remoting server
*/
private void initRemoteServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(ALERT_RPC_PORT);
@ -128,30 +109,10 @@ public class AlertServer {
this.server.start();
}
/**
* Cyclic alert info sending alert
*/
private void runSender() {
while (Stopper.isRunning()) {
try {
Thread.sleep(Constants.ALERT_SCAN_INTERVAL);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
if (alertPluginManager == null || alertPluginManager.getAlertChannelMap().size() == 0) {
logger.warn("No Alert Plugin . Cannot send alert info. ");
} else {
List<Alert> alerts = alertDao.listWaitExecutionAlert();
alertSender = new AlertSender(alerts, alertDao, alertPluginManager);
alertSender.run();
}
}
new Thread(new Sender()).start();
}
/**
* start
*/
public void start() {
PropertyUtils.loadPropertyFile(ALERT_PROPERTIES_PATH);
checkTable();
@ -161,23 +122,35 @@ public class AlertServer {
runSender();
}
/**
* stop
*/
public void stop() {
this.server.close();
logger.info("alert server shut down");
}
final class Sender implements Runnable {
@Override
public void run() {
while (Stopper.isRunning()) {
try {
Thread.sleep(Constants.ALERT_SCAN_INTERVAL);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
if (alertPluginManager == null || alertPluginManager.getAlertChannelMap().size() == 0) {
logger.warn("No Alert Plugin . Cannot send alert info. ");
} else {
List<Alert> alerts = alertDao.listWaitExecutionAlert();
new AlertSender(alerts, alertDao, alertPluginManager).run();
}
}
}
}
public static void main(String[] args) {
AlertServer alertServer = AlertServer.getInstance();
alertServer.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
alertServer.stop();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(alertServer::stop));
}
}

2
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/AlertPluginManager.java

@ -56,7 +56,7 @@ public class AlertPluginManager extends AbstractDolphinPluginManager {
*/
private final Map<Integer, String> pluginDefineMap = new HashMap<>();
private PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
private final PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
private void addAlertChannelFactory(AlertChannelFactory alertChannelFactory) {
requireNonNull(alertChannelFactory, "alertChannelFactory is null");

7
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessor.java

@ -33,14 +33,11 @@ import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/**
* alert request processor
*/
public class AlertRequestProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);
private AlertDao alertDao;
private AlertPluginManager alertPluginManager;
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
public AlertRequestProcessor(AlertDao alertDao, AlertPluginManager alertPluginManager) {
this.alertDao = alertDao;

5
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java

@ -38,16 +38,13 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* alert sender
*/
public class AlertSender {
private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
private List<Alert> alertList;
private AlertDao alertDao;
private AlertPluginManager alertPluginManager;
private final AlertPluginManager alertPluginManager;
public AlertSender(AlertPluginManager alertPluginManager) {
this.alertPluginManager = alertPluginManager;

47
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/FuncUtils.java

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
public class FuncUtils {
private FuncUtils() {
throw new IllegalStateException(FuncUtils.class.getName());
}
public static String mkString(Iterable<String> list, String split) {
if (null == list || StringUtils.isEmpty(split)) {
return null;
}
StringBuilder sb = new StringBuilder();
boolean first = true;
for (String item : list) {
if (first) {
first = false;
} else {
sb.append(split);
}
sb.append(item);
}
return sb.toString();
}
}

60
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/FuncUtilsTest.java

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.alert.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.Arrays;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FuncUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(FuncUtilsTest.class);
/**
* Test mkString
*/
@Test
public void testMKString() {
//Define users list
Iterable<String> users = Arrays.asList("user1", "user2", "user3");
//Define split
String split = "|";
//Invoke mkString with correctParams
String result = FuncUtils.mkString(users, split);
logger.info(result);
//Expected result string
assertEquals("user1|user2|user3", result);
//Null list expected return null
result = FuncUtils.mkString(null, split);
assertNull(result);
//Null split expected return null
result = FuncUtils.mkString(users, null);
assertNull(result);
}
}

4
dolphinscheduler-standalone-server/pom.xml

@ -47,6 +47,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-alert</artifactId>
</dependency>
</dependencies>
</project>

67
dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java

@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_PAS
import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_URL;
import static org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_USERNAME;
import org.apache.dolphinscheduler.alert.AlertServer;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
@ -31,9 +32,11 @@ import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.curator.test.TestingServer;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.SQLException;
import javax.sql.DataSource;
@ -48,8 +51,50 @@ public class StandaloneServer {
private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneServer.class);
public static void main(String[] args) throws Exception {
Thread.currentThread().setName("Standalone-Server");
System.setProperty("spring.profiles.active", "api");
startDatabase();
startRegistry();
startAlertServer();
new SpringApplicationBuilder(
ApiApplicationServer.class,
MasterServer.class,
WorkerServer.class
).run(args);
}
private static void startAlertServer() {
final Path alertPluginPath = Paths.get(
StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
"../../../dolphinscheduler-alert-plugin/dolphinscheduler-alert-email/pom.xml"
).toAbsolutePath();
if (Files.exists(alertPluginPath)) {
System.setProperty("alert.plugin.binding", alertPluginPath.toString());
System.setProperty("alert.plugin.dir", "");
}
AlertServer.getInstance().start();
}
private static void startRegistry() throws Exception {
final TestingServer server = new TestingServer(true);
System.setProperty("registry.servers", server.getConnectString());
final Path registryPath = Paths.get(
StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
"../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml"
).toAbsolutePath();
if (Files.exists(registryPath)) {
System.setProperty("registry.plugin.binding", registryPath.toString());
System.setProperty("registry.plugin.dir", "");
}
}
private static void startDatabase() throws IOException, SQLException {
final Path temp = Files.createTempDirectory("dolphinscheduler_");
LOGGER.info("H2 database directory: {}", temp);
System.setProperty(
@ -58,7 +103,7 @@ public class StandaloneServer {
);
System.setProperty(
SPRING_DATASOURCE_URL,
String.format("jdbc:h2:tcp://localhost/%s", temp.toAbsolutePath())
String.format("jdbc:h2:tcp://localhost/%s;MODE=MySQL;DATABASE_TO_LOWER=true", temp.toAbsolutePath())
);
System.setProperty(SPRING_DATASOURCE_USERNAME, "sa");
System.setProperty(SPRING_DATASOURCE_PASSWORD, "");
@ -68,25 +113,5 @@ public class StandaloneServer {
final DataSource ds = ConnectionFactory.getInstance().getDataSource();
final ScriptRunner runner = new ScriptRunner(ds.getConnection(), true, true);
runner.runScript(new FileReader("sql/dolphinscheduler_h2.sql"));
final TestingServer server = new TestingServer(true);
System.setProperty("registry.servers", server.getConnectString());
final Path registryPath = Paths.get(
StandaloneServer.class.getProtectionDomain().getCodeSource().getLocation().getPath(),
"../../../dolphinscheduler-registry-plugin/dolphinscheduler-registry-zookeeper/pom.xml"
).toAbsolutePath();
if (Files.exists(registryPath)) {
System.setProperty("registry.plugin.binding", registryPath.toString());
System.setProperty("registry.plugin.dir", "");
}
Thread.currentThread().setName("Standalone-Server");
new SpringApplicationBuilder(
ApiApplicationServer.class,
MasterServer.class,
WorkerServer.class
).run(args);
}
}

1153
sql/dolphinscheduler_h2.sql

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save