Browse Source

[Feature-3749][Alert-SPI] SqlTask should send notifications by alert server api (#4080)

* add sqltask send sync alert server.

* update alert-sms license.

* update AlertServer test.

* remote EmailAlertPluginTest.

* update sqltask.

* update test class.
pull/3/MERGE
zhuangchong 4 years ago committed by GitHub
parent
commit
169a4ace77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java
  2. 2
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java
  3. 67
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java
  4. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  5. 19
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  6. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  7. 29
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  8. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  9. 65
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  10. 3
      dolphinscheduler-server/src/main/resources/worker.properties
  11. 11
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java
  12. 27
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java
  13. 111
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java
  14. 26
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java
  15. 64
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue
  16. 3
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  17. 3
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  18. 2
      install.sh
  19. 2
      pom.xml

4
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader;
import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig;
import org.apache.dolphinscheduler.alert.runner.AlertSender;
import org.apache.dolphinscheduler.alert.utils.Constants;
@ -71,6 +72,9 @@ public class AlertServerTest {
AlertSender alertSender = PowerMockito.mock(AlertSender.class);
PowerMockito.whenNew(AlertSender.class).withAnyArguments().thenReturn(alertSender);
DolphinPluginLoader dolphinPluginLoader = PowerMockito.mock(DolphinPluginLoader.class);
PowerMockito.whenNew(DolphinPluginLoader.class).withAnyArguments().thenReturn(dolphinPluginLoader);
AlertServer alertServer = AlertServer.getInstance();
Assert.assertNotNull(alertServer);

2
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java

@ -49,6 +49,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
@ -62,6 +63,7 @@ public class EmailAlertPluginTest {
PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class);
@Test
@Ignore
public void testRunSend() throws Exception {
//create alert group

67
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java

@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.sql;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
@ -75,19 +76,13 @@ public class SqlParameters extends AbstractParameters {
private List<String> postStatements;
/**
* title
*/
private String title;
/**
* receivers
* groupId
*/
private String receivers;
private int groupId;
/**
* receivers cc
* title
*/
private String receiversCc;
private String title;
public String getType() {
return type;
@ -153,21 +148,6 @@ public class SqlParameters extends AbstractParameters {
this.title = title;
}
public String getReceivers() {
return receivers;
}
public void setReceivers(String receivers) {
this.receivers = receivers;
}
public String getReceiversCc() {
return receiversCc;
}
public void setReceiversCc(String receiversCc) {
this.receiversCc = receiversCc;
}
public List<String> getPreStatements() {
return preStatements;
}
@ -184,6 +164,14 @@ public class SqlParameters extends AbstractParameters {
this.postStatements = postStatements;
}
public int getGroupId() {
return groupId;
}
public void setGroupId(int groupId) {
this.groupId = groupId;
}
@Override
public boolean checkParameters() {
return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql);
@ -196,19 +184,18 @@ public class SqlParameters extends AbstractParameters {
@Override
public String toString() {
return "SqlParameters{" +
"type='" + type + '\'' +
", datasource=" + datasource +
", sql='" + sql + '\'' +
", sqlType=" + sqlType +
", udfs='" + udfs + '\'' +
", showType='" + showType + '\'' +
", connParams='" + connParams + '\'' +
", title='" + title + '\'' +
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
", preStatements=" + preStatements +
", postStatements=" + postStatements +
'}';
return "SqlParameters{"
+ "type='" + type + '\''
+ ", datasource=" + datasource
+ ", sql='" + sql + '\''
+ ", sqlType=" + sqlType
+ ", udfs='" + udfs + '\''
+ ", showType='" + showType + '\''
+ ", connParams='" + connParams + '\''
+ ", groupId='" + groupId + '\''
+ ", title='" + title + '\''
+ ", preStatements=" + preStatements
+ ", postStatements=" + postStatements
+ '}';
}
}

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
@ -25,7 +26,11 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -33,8 +38,6 @@ import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import javax.annotation.PostConstruct;
/**
* worker server
*/
@ -70,6 +73,11 @@ public class WorkerServer {
@Autowired
private SpringApplicationContext springApplicationContext;
/**
* alert model netty remote server
*/
private AlertClientService alertClientService;
/**
* worker server startup
*
@ -86,7 +94,7 @@ public class WorkerServer {
* worker server run
*/
@PostConstruct
public void run(){
public void run() {
logger.info("start worker server...");
//init remoting server
@ -100,6 +108,9 @@ public class WorkerServer {
// worker registry
this.workerRegistry.registry();
//alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT);
/**
* register hooks, which are called before the process exits
*/
@ -115,7 +126,7 @@ public class WorkerServer {
try {
//execute only once
if(Stopper.isStopped()){
if (Stopper.isStopped()) {
return;
}
@ -127,13 +138,15 @@ public class WorkerServer {
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
}catch (Exception e){
} catch (Exception e) {
logger.warn("thread sleep exception", e);
}
this.nettyRemotingServer.close();
this.workerRegistry.unRegistry();
this.alertClientService.close();
} catch (Exception e) {
logger.error("worker server stop exception ", e);
System.exit(-1);

19
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -1,4 +1,3 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -15,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.config;
import org.apache.dolphinscheduler.common.Constants;
import java.util.Set;
import org.apache.dolphinscheduler.common.Constants;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
@ -52,6 +53,9 @@ public class WorkerConfig {
@Value("${worker.weight:100}")
private int weight;
@Value("${alert.listen.host:localhost}")
private String alertListenHost;
public int getListenPort() {
return listenPort;
}
@ -101,7 +105,7 @@ public class WorkerConfig {
}
public int getWorkerMaxCpuloadAvg() {
if (workerMaxCpuloadAvg == -1){
if (workerMaxCpuloadAvg == -1) {
return Constants.DEFAULT_WORKER_CPU_LOAD;
}
return workerMaxCpuloadAvg;
@ -111,7 +115,6 @@ public class WorkerConfig {
this.workerMaxCpuloadAvg = workerMaxCpuloadAvg;
}
public int getWeight() {
return weight;
}
@ -119,4 +122,12 @@ public class WorkerConfig {
public void setWeight(int weight) {
this.weight = weight;
}
public String getAlertListenHost() {
return alertListenHost;
}
public void setAlertListenHost(String alertListenHost) {
this.alertListenHost = alertListenHost;
}
}

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@ -73,12 +74,24 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
*/
private final TaskCallbackService taskCallbackService;
public TaskExecuteProcessor(){
/**
* alert client service
*/
private AlertClientService alertClientService;
public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
public TaskExecuteProcessor(AlertClientService alertClientService) {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
this.alertClientService = alertClientService;
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
@ -89,7 +102,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("received command : {}", taskRequestCommand);
if(taskRequestCommand == null){
if (taskRequestCommand == null) {
logger.error("task execute request command is null");
return;
}
@ -97,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if(taskExecutionContext == null){
if (taskExecutionContext == null) {
logger.error("task execution context is null");
return;
}
@ -144,7 +157,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
return Boolean.TRUE;
});
// submit task
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger));
workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService));
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e);
}
@ -162,9 +175,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
ackCommand.setExecutePath(null);
}else{
} else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
}
taskExecutionContext.setLogPath(ackCommand.getLogPath());
@ -176,7 +189,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
private String getExecLocalPath(TaskExecutionContext taskExecutionContext){
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(),

29
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.Constants;
@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections.MapUtils;
@ -55,7 +57,6 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
/**
* task scheduler thread
*/
@ -91,6 +92,11 @@ public class TaskExecuteThread implements Runnable {
*/
private Logger taskLogger;
/**
* alert client server
*/
private AlertClientService alertClientService;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
@ -98,11 +104,12 @@ public class TaskExecuteThread implements Runnable {
*/
public TaskExecuteThread(TaskExecutionContext taskExecutionContext
, TaskCallbackService taskCallbackService
, Logger taskLogger) {
, Logger taskLogger, AlertClientService alertClientService) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
this.taskLogger = taskLogger;
this.alertClientService = alertClientService;
}
@Override
@ -140,7 +147,7 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
task = TaskManager.newTask(taskExecutionContext, taskLogger);
task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService);
// task init
task.init();
@ -201,10 +208,10 @@ public class TaskExecuteThread implements Runnable {
// the default timeout is the maximum value of the integer
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if (taskTimeoutParameter.getEnable()){
if (taskTimeoutParameter.getEnable()) {
// get timeout strategy
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
switch (taskTimeoutParameter.getStrategy()){
switch (taskTimeoutParameter.getStrategy()) {
case WARN:
break;
case FAILED:
@ -225,21 +232,19 @@ public class TaskExecuteThread implements Runnable {
}
}
/**
* kill task
*/
public void kill(){
if (task != null){
public void kill() {
if (task != null) {
try {
task.cancelApplication(true);
}catch (Exception e){
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
/**
* download resource file
*
@ -250,7 +255,7 @@ public class TaskExecuteThread implements Runnable {
private void downloadResource(String execLocalPath,
Map<String,String> projectRes,
Logger logger) throws Exception {
if (MapUtils.isEmpty(projectRes)){
if (MapUtils.isEmpty(projectRes)) {
return;
}
@ -267,7 +272,7 @@ public class TaskExecuteThread implements Runnable {
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
}catch (Exception e){
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.slf4j.Logger;
/**
@ -44,7 +46,7 @@ public class TaskManager {
* @return AbstractTask
* @throws IllegalArgumentException illegal argument exception
*/
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException {
public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException {
TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType());
if (anEnum == null) {
logger.error("not support task type: {}", taskExecutionContext.getTaskType());
@ -57,7 +59,7 @@ public class TaskManager {
case PROCEDURE:
return new ProcedureTask(taskExecutionContext, logger);
case SQL:
return new SqlTask(taskExecutionContext, logger);
return new SqlTask(taskExecutionContext, logger, alertClientService);
case MR:
return new MapReduceTask(taskExecutionContext, logger);
case SPARK:

65
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task.sql;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import static org.apache.dolphinscheduler.common.Constants.HIVE_CONF;
import static org.apache.dolphinscheduler.common.Constants.PASSWORD;
import static org.apache.dolphinscheduler.common.Constants.SEMICOLON;
@ -38,16 +37,15 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import java.sql.Connection;
import java.sql.DriverManager;
@ -80,10 +78,7 @@ public class SqlTask extends AbstractTask {
* sql parameters
*/
private SqlParameters sqlParameters;
/**
* alert dao
*/
private AlertDao alertDao;
/**
* base datasource
*/
@ -99,7 +94,10 @@ public class SqlTask extends AbstractTask {
*/
private static final int LIMIT = 10000;
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) {
private AlertClientService alertClientService;
public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
@ -111,7 +109,7 @@ public class SqlTask extends AbstractTask {
throw new RuntimeException("sql task params is not valid");
}
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.alertClientService = alertClientService;
}
@Override
@ -291,9 +289,7 @@ public class SqlTask extends AbstractTask {
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql : {}", result);
sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle())
? sqlParameters.getTitle() : taskExecutionContext.getTaskName()
+ " query result sets",
sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets",
JSONUtils.toJsonString(resultJSONArray));
}
@ -444,48 +440,11 @@ public class SqlTask extends AbstractTask {
* @param title title
* @param content content
*/
public void sendAttachment(String title, String content) {
List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
// receiving group list
List<String> receiversList = new ArrayList<>();
for (User user : users) {
receiversList.add(user.getEmail().trim());
}
// custom receiver
String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)) {
String[] splits = receivers.split(COMMA);
for (String receiver : splits) {
receiversList.add(receiver.trim());
}
}
// copy list
List<String> receiversCcList = new ArrayList<>();
// Custom Copier
String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)) {
String[] splits = receiversCc.split(COMMA);
for (String receiverCc : splits) {
receiversCcList.add(receiverCc.trim());
}
}
String showTypeName = sqlParameters.getShowType().replace(COMMA, "").trim();
/*
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList,
receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp());
if(!(boolean) mailResult.get(STATUS)){
public void sendAttachment(int groupId, String title, String content) {
AlertSendResponseCommand alertSendResponseCommand = alertClientService.sendAlert(groupId, title, content);
if (!alertSendResponseCommand.getResStatus()) {
throw new RuntimeException("send mail failed!");
}
//TODO AlertServer should provide a grpc interface, which is called when other services need to send alerts
}else{
logger.error("showType: {} is not valid " ,showTypeName);
throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName));
}*/
}
/**

3
dolphinscheduler-server/src/main/resources/worker.properties

@ -35,3 +35,6 @@
# default worker weight
#work.weight=100
# alert server listener host
alert.listen.host=localhost

11
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@ -64,6 +65,8 @@ public class TaskExecuteThreadTest {
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private AlertClientService alertClientService;
@Before
public void before() {
// init task execution context, logger
@ -100,8 +103,10 @@ public class TaskExecuteThreadTest {
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
alertClientService = PowerMockito.mock(AlertClientService.class);
PowerMockito.mockStatic(TaskManager.class);
PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger))
PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService))
.thenReturn(new SimpleTask(taskExecutionContext, taskLogger));
PowerMockito.mockStatic(JSONUtils.class);
@ -117,7 +122,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setTaskType("SQL");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
taskExecuteThread.run();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
@ -129,7 +134,7 @@ public class TaskExecuteThreadTest {
taskExecutionContext.setStartTime(null);
taskExecutionContext.setDelayTime(1);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger);
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService);
taskExecuteThread.run();
Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());

27
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
@ -46,6 +47,8 @@ public class TaskManagerTest {
private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private AlertClientService alertClientService;
@Before
public void before() {
// init task execution context, logger
@ -74,41 +77,43 @@ public class TaskManagerTest {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class))
.thenReturn(taskExecutionContextCacheManager);
alertClientService = PowerMockito.mock(AlertClientService.class);
}
@Test
public void testNewTask() {
taskExecutionContext.setTaskType("SHELL");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("WATERDROP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("HTTP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("MR");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("SPARK");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("FLINK");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("PYTHON");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("DATAX");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
taskExecutionContext.setTaskType("SQOOP");
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger));
Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService));
}
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNull() {
taskExecutionContext.setTaskType(null);
TaskManager.newTask(taskExecutionContext,taskLogger);
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
}
@Test(expected = IllegalArgumentException.class)
public void testNewTaskIsNotExists() {
taskExecutionContext.setTaskType("XXX");
TaskManager.newTask(taskExecutionContext,taskLogger);
TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService);
}
}

111
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sql;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* sql task test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(value = {SqlTask.class, DriverManager.class})
public class SqlTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class);
private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\","
+ "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}";
private SqlTask sqlTask;
private TaskExecutionContext taskExecutionContext;
private AlertClientService alertClientService;
@Before
public void before() throws Exception {
taskExecutionContext = new TaskExecutionContext();
TaskProps props = new TaskProps();
props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstanceId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams(
"{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}");
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams());
PowerMockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp");
PowerMockito.when(taskExecutionContext.getTaskAppId()).thenReturn("1");
PowerMockito.when(taskExecutionContext.getTenantCode()).thenReturn("root");
PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date());
PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000);
PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx");
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS);
PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext);
alertClientService = PowerMockito.mock(AlertClientService.class);
sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService);
sqlTask.init();
}
@Test
public void testGetParameters() {
Assert.assertNotNull(sqlTask.getParameters());
}
@Test(expected = Exception.class)
public void testHandle() throws Exception {
Connection connection = PowerMockito.mock(Connection.class);
PowerMockito.mockStatic(DriverManager.class);
PowerMockito.when(DriverManager.getConnection(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(connection);
PreparedStatement preparedStatement = PowerMockito.mock(PreparedStatement.class);
PowerMockito.when(connection.prepareStatement(Mockito.any())).thenReturn(preparedStatement);
PowerMockito.mockStatic(ParameterUtils.class);
PowerMockito.when(ParameterUtils.replaceScheduleTime(Mockito.any(), Mockito.any())).thenReturn("insert into tb_1 values('1','2')");
sqlTask.handle();
Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,sqlTask.getExitStatusCode());
}
}

26
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java

@ -38,6 +38,10 @@ public class AlertClientService {
private volatile boolean isRunning;
private String host;
private int port;
/**
* request time out
*/
@ -52,6 +56,17 @@ public class AlertClientService {
this.isRunning = true;
}
/**
* alert client
*/
public AlertClientService(String host, int port) {
this.clientConfig = new NettyClientConfig();
this.client = new NettyRemotingClient(clientConfig);
this.isRunning = true;
this.host = host;
this.port = port;
}
/**
* close
*/
@ -61,6 +76,17 @@ public class AlertClientService {
logger.info("alter client closed");
}
/**
* alert sync send data
* @param groupId
* @param title
* @param content
* @return
*/
public AlertSendResponseCommand sendAlert(int groupId, String title, String content) {
return this.sendAlert(this.host,this.port,groupId,title,content);
}
/**
* alert sync send data
* @param host host

64
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue

@ -35,12 +35,14 @@
:sql-type="sqlType">
</m-sql-type>
</div>
<!--
<div v-if="sqlType==0" style="display: inline-block;padding-left: 10px;margin-top: 2px;">
<x-checkbox-group v-model="showType">
<x-checkbox :label="'TABLE'" :disabled="isDetails">{{$t('TableMode')}}</x-checkbox>
<x-checkbox :label="'ATTACHMENT'" :disabled="isDetails">{{$t('Attachment')}}</x-checkbox>
</x-checkbox-group>
</div>
-->
</div>
</m-list-box>
<template v-if="sqlType==0">
@ -50,21 +52,21 @@
<x-input
type="input"
v-model="title"
:placeholder="$t('Please enter the title of email')"
:placeholder="$t('Please enter the title of alert')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text"><strong class='requiredIcon'>*</strong>{{$t('Recipient')}}</div>
<!-- TODO Wait for the alarm group/instance page to be developed and add specific content -->
<div slot="text"><strong class='requiredIcon'>*</strong>{{$t('AlertGroup')}}</div>
<div slot="content">
<m-email ref="refEmail" v-model="receivers" :disabled="isDetails" :repeat-data="receiversCc"></m-email>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('Cc')}}</div>
<div slot="content">
<m-email ref="refCc" v-model="receiversCc" :disabled="isDetails" :repeat-data="receivers"></m-email>
<x-input
type="input"
v-model="groupId"
:placeholder="$t('Please select the alert group')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
</template>
@ -174,6 +176,8 @@
sqlType: '0',
// Email title
title: '',
// Alert groupId
groupId: '',
// Form/attachment
showType: ['TABLE'],
// Sql parameter
@ -181,11 +185,7 @@
// Pre statements
preStatements: [],
// Post statements
postStatements: [],
// recipients
receivers: [],
// copy to
receiversCc: []
postStatements: []
}
},
mixins: [disabledState],
@ -275,24 +275,18 @@
if (!this.$refs.refDs._verifDatasource()) {
return false
}
/*
if (this.sqlType==0 && !this.showType.length) {
this.$message.warning(`${i18n.$t('One form or attachment must be selected')}`)
return false
}
*/
if (this.sqlType==0 && !this.title) {
this.$message.warning(`${i18n.$t('Mail subject required')}`)
return false
}
if (this.sqlType==0 && !this.receivers.length) {
this.$message.warning(`${i18n.$t('Recipient required')}`)
this.$message.warning(`${i18n.$t('Please enter the title of alert')}`)
return false
}
// receivers Subcomponent verification
if (this.sqlType==0 && !this.$refs.refEmail._manualEmail()) {
return false
}
// receiversCc Subcomponent verification
if (this.sqlType==0 && !this.$refs.refCc._manualEmail()) {
if (this.sqlType==0 && !this.groupId) {
this.$message.warning(`${i18n.$t('Please select the alert group')}`)
return false
}
// udfs Subcomponent verification Verification only if the data type is HIVE
@ -325,8 +319,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
receivers: this.receivers.join(','),
receiversCc: this.receiversCc.join(','),
groupId: this.groupId,
showType: (() => {
/**
* Special processing return order TABLE,ATTACHMENT
@ -387,10 +380,7 @@
} else {
param.processInstanceId = current.params.id
}
this.store.dispatch('dag/getReceiver', param).then(res => {
this.receivers = res.receivers && res.receivers.split(',') || []
this.receiversCc = res.receiversCc && res.receiversCc.split(',') || []
})
},
_cacheParams () {
this.$emit('on-cache-params', {
@ -400,8 +390,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
receivers: this.receivers.join(','),
receiversCc: this.receiversCc.join(','),
groupId: this.groupId,
showType: (() => {
let showType = this.showType
@ -433,8 +422,7 @@
}
if (val != 0) {
this.title = ''
this.receivers = []
this.receiversCc = []
this.groupId = ''
}
},
// Listening data source
@ -469,8 +457,7 @@
this.preStatements = o.params.preStatements || []
this.postStatements = o.params.postStatements || []
this.title = o.params.title || ''
this.receivers = o.params.receivers && o.params.receivers.split(',') || []
this.receiversCc = o.params.receiversCc && o.params.receiversCc.split(',') || []
this.groupId = o.params.groupId || ''
}
// read tasks from cache
if (!_.some(this.store.state.dag.cacheTasks, { id: this.createNodeId }) &&
@ -501,8 +488,7 @@
udfs: this.udfs,
sqlType: this.sqlType,
title: this.title,
receivers: this.receivers.join(','),
receiversCc: this.receiversCc.join(','),
groupId: this.groupId,
showType: (() => {
let showType = this.showType

3
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -122,6 +122,9 @@ export default {
'SQL Type': 'SQL Type',
Title: 'Title',
'Please enter the title of email': 'Please enter the title of email',
'Please enter the title of alert': 'Please enter the title of alert',
'AlertGroup': 'AlertGroup',
'Please select the alert group': 'Please select the alert group',
Table: 'Table',
TableMode: 'Table',
Attachment: 'Attachment',

3
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -122,6 +122,9 @@ export default {
'SQL Type': 'sql类型',
Title: '主题',
'Please enter the title of email': '请输入邮件主题',
'Please enter the title of alert': '请输入告警主题',
'AlertGroup': '告警组',
'Please select the alert group': '请选择告警组',
Table: '表名',
TableMode: '表格',
Attachment: '附件',

2
install.sh

@ -64,6 +64,8 @@ sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttl
sed -i ${txt} "s#mail.smtp.ssl.trust.*#mail.smtp.ssl.trust=${sslTrust}#g" conf/alert.properties
sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties
sed -i ${txt} "s#alert.listen.host.*#alert.listen.host=${alertServer}#g" conf/worker.properties
# 2.create directory
echo "2.create directory"

2
pom.xml

@ -904,6 +904,7 @@
<include>**/server/worker/registry/WorkerRegistryTest.java</include>
<include>**/server/worker/shell/ShellCommandExecutorTest.java</include>
<include>**/server/worker/sql/SqlExecutorTest.java</include>
<include>**/server/worker/task/sql/SqlTaskTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
@ -920,7 +921,6 @@
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/alert/AlertClientServiceTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->
<include>**/dao/mapper/ProcessDefinitionMapperTest.java</include>

Loading…
Cancel
Save