diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java index 37d54c8ef9..a8ead79be9 100644 --- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.alert; import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager; +import org.apache.dolphinscheduler.alert.plugin.DolphinPluginLoader; import org.apache.dolphinscheduler.alert.plugin.DolphinPluginManagerConfig; import org.apache.dolphinscheduler.alert.runner.AlertSender; import org.apache.dolphinscheduler.alert.utils.Constants; @@ -71,6 +72,9 @@ public class AlertServerTest { AlertSender alertSender = PowerMockito.mock(AlertSender.class); PowerMockito.whenNew(AlertSender.class).withAnyArguments().thenReturn(alertSender); + DolphinPluginLoader dolphinPluginLoader = PowerMockito.mock(DolphinPluginLoader.class); + PowerMockito.whenNew(DolphinPluginLoader.class).withAnyArguments().thenReturn(dolphinPluginLoader); + AlertServer alertServer = AlertServer.getInstance(); Assert.assertNotNull(alertServer); diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java index 6558419160..a6bd51c7f6 100644 --- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPluginTest.java @@ -49,6 +49,7 @@ import java.util.LinkedHashMap; import java.util.List; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.ImmutableList; @@ -62,6 +63,7 @@ public class EmailAlertPluginTest { PluginDao pluginDao = DaoFactory.getDaoInstance(PluginDao.class); @Test + @Ignore public void testRunSend() throws Exception { //create alert group diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java index 4604234e8f..3b5d39f274 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sql/SqlParameters.java @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.task.sql; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import java.util.ArrayList; import java.util.List; @@ -75,19 +76,13 @@ public class SqlParameters extends AbstractParameters { private List postStatements; /** - * title - */ - private String title; - - /** - * receivers + * groupId */ - private String receivers; - + private int groupId; /** - * receivers cc + * title */ - private String receiversCc; + private String title; public String getType() { return type; @@ -153,21 +148,6 @@ public class SqlParameters extends AbstractParameters { this.title = title; } - public String getReceivers() { - return receivers; - } - - public void setReceivers(String receivers) { - this.receivers = receivers; - } - - public String getReceiversCc() { - return receiversCc; - } - - public void setReceiversCc(String receiversCc) { - this.receiversCc = receiversCc; - } public List getPreStatements() { return preStatements; } @@ -184,6 +164,14 @@ public class SqlParameters extends AbstractParameters { this.postStatements = postStatements; } + public int getGroupId() { + return groupId; + } + + public void setGroupId(int groupId) { + this.groupId = groupId; + } + @Override public boolean checkParameters() { return datasource != 0 && StringUtils.isNotEmpty(type) && StringUtils.isNotEmpty(sql); @@ -196,19 +184,18 @@ public class SqlParameters extends AbstractParameters { @Override public String toString() { - return "SqlParameters{" + - "type='" + type + '\'' + - ", datasource=" + datasource + - ", sql='" + sql + '\'' + - ", sqlType=" + sqlType + - ", udfs='" + udfs + '\'' + - ", showType='" + showType + '\'' + - ", connParams='" + connParams + '\'' + - ", title='" + title + '\'' + - ", receivers='" + receivers + '\'' + - ", receiversCc='" + receiversCc + '\'' + - ", preStatements=" + preStatements + - ", postStatements=" + postStatements + - '}'; + return "SqlParameters{" + + "type='" + type + '\'' + + ", datasource=" + datasource + + ", sql='" + sql + '\'' + + ", sqlType=" + sqlType + + ", udfs='" + udfs + '\'' + + ", showType='" + showType + '\'' + + ", connParams='" + connParams + '\'' + + ", groupId='" + groupId + '\'' + + ", title='" + title + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index f0833cb7e0..072e76aaeb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; @@ -25,7 +26,11 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; +import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -33,8 +38,6 @@ import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; -import javax.annotation.PostConstruct; - /** * worker server */ @@ -70,6 +73,11 @@ public class WorkerServer { @Autowired private SpringApplicationContext springApplicationContext; + /** + * alert model netty remote server + */ + private AlertClientService alertClientService; + /** * worker server startup * @@ -86,7 +94,7 @@ public class WorkerServer { * worker server run */ @PostConstruct - public void run(){ + public void run() { logger.info("start worker server..."); //init remoting server @@ -100,6 +108,9 @@ public class WorkerServer { // worker registry this.workerRegistry.registry(); + //alert-server client registry + alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT); + /** * register hooks, which are called before the process exits */ @@ -115,7 +126,7 @@ public class WorkerServer { try { //execute only once - if(Stopper.isStopped()){ + if (Stopper.isStopped()) { return; } @@ -127,13 +138,15 @@ public class WorkerServer { try { //thread sleep 3 seconds for thread quitely stop Thread.sleep(3000L); - }catch (Exception e){ + } catch (Exception e) { logger.warn("thread sleep exception", e); } this.nettyRemotingServer.close(); this.workerRegistry.unRegistry(); + this.alertClientService.close(); + } catch (Exception e) { logger.error("worker server stop exception ", e); System.exit(-1); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index fa97403527..a32d4c8ff3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,11 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.config; +import org.apache.dolphinscheduler.common.Constants; + import java.util.Set; -import org.apache.dolphinscheduler.common.Constants; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; @@ -52,6 +53,9 @@ public class WorkerConfig { @Value("${worker.weight:100}") private int weight; + @Value("${alert.listen.host:localhost}") + private String alertListenHost; + public int getListenPort() { return listenPort; } @@ -101,7 +105,7 @@ public class WorkerConfig { } public int getWorkerMaxCpuloadAvg() { - if (workerMaxCpuloadAvg == -1){ + if (workerMaxCpuloadAvg == -1) { return Constants.DEFAULT_WORKER_CPU_LOAD; } return workerMaxCpuloadAvg; @@ -111,7 +115,6 @@ public class WorkerConfig { this.workerMaxCpuloadAvg = workerMaxCpuloadAvg; } - public int getWeight() { return weight; } @@ -119,4 +122,12 @@ public class WorkerConfig { public void setWeight(int weight) { this.weight = weight; } + + public String getAlertListenHost() { + return alertListenHost; + } + + public void setAlertListenHost(String alertListenHost) { + this.alertListenHost = alertListenHost; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 3717ce37ae..adef703e1d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; +import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; @@ -73,12 +74,24 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { */ private final TaskCallbackService taskCallbackService; - public TaskExecuteProcessor(){ + /** + * alert client service + */ + private AlertClientService alertClientService; + + public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); } + public TaskExecuteProcessor(AlertClientService alertClientService) { + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); + this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); + this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); + this.alertClientService = alertClientService; + } + @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), @@ -89,7 +102,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { logger.info("received command : {}", taskRequestCommand); - if(taskRequestCommand == null){ + if (taskRequestCommand == null) { logger.error("task execute request command is null"); return; } @@ -97,7 +110,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); - if(taskExecutionContext == null){ + if (taskExecutionContext == null) { logger.error("task execution context is null"); return; } @@ -144,7 +157,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { return Boolean.TRUE; }); // submit task - workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger)); + workerExecService.submit(new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService)); } catch (ExecutionException | RetryException e) { logger.error(e.getMessage(), e); } @@ -162,9 +175,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setStartTime(taskExecutionContext.getStartTime()); - if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ + if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) { ackCommand.setExecutePath(null); - }else{ + } else { ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); } taskExecutionContext.setLogPath(ackCommand.getLogPath()); @@ -176,7 +189,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { * @param taskExecutionContext taskExecutionContext * @return execute local path */ - private String getExecLocalPath(TaskExecutionContext taskExecutionContext){ + private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 58f743303c..39046e96eb 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.common.Constants; @@ -37,6 +38,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.commons.collections.MapUtils; @@ -55,7 +57,6 @@ import org.slf4j.LoggerFactory; import com.github.rholder.retry.RetryException; - /** * task scheduler thread */ @@ -91,6 +92,11 @@ public class TaskExecuteThread implements Runnable { */ private Logger taskLogger; + /** + * alert client server + */ + private AlertClientService alertClientService; + /** * constructor * @param taskExecutionContext taskExecutionContext @@ -98,11 +104,12 @@ public class TaskExecuteThread implements Runnable { */ public TaskExecuteThread(TaskExecutionContext taskExecutionContext , TaskCallbackService taskCallbackService - , Logger taskLogger) { + , Logger taskLogger, AlertClientService alertClientService) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); this.taskLogger = taskLogger; + this.alertClientService = alertClientService; } @Override @@ -140,7 +147,7 @@ public class TaskExecuteThread implements Runnable { taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); - task = TaskManager.newTask(taskExecutionContext, taskLogger); + task = TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService); // task init task.init(); @@ -201,10 +208,10 @@ public class TaskExecuteThread implements Runnable { // the default timeout is the maximum value of the integer taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE); TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); - if (taskTimeoutParameter.getEnable()){ + if (taskTimeoutParameter.getEnable()) { // get timeout strategy taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode()); - switch (taskTimeoutParameter.getStrategy()){ + switch (taskTimeoutParameter.getStrategy()) { case WARN: break; case FAILED: @@ -225,21 +232,19 @@ public class TaskExecuteThread implements Runnable { } } - /** * kill task */ - public void kill(){ - if (task != null){ + public void kill() { + if (task != null) { try { task.cancelApplication(true); - }catch (Exception e){ + } catch (Exception e) { logger.error(e.getMessage(),e); } } } - /** * download resource file * @@ -250,7 +255,7 @@ public class TaskExecuteThread implements Runnable { private void downloadResource(String execLocalPath, Map projectRes, Logger logger) throws Exception { - if (MapUtils.isEmpty(projectRes)){ + if (MapUtils.isEmpty(projectRes)) { return; } @@ -267,7 +272,7 @@ public class TaskExecuteThread implements Runnable { logger.info("get resource file from hdfs :{}", resHdfsPath); HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true); - }catch (Exception e){ + } catch (Exception e) { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index a9463336b4..b89c8d4ca6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask; import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask; import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask; +import org.apache.dolphinscheduler.service.alert.AlertClientService; + import org.slf4j.Logger; /** @@ -44,7 +46,7 @@ public class TaskManager { * @return AbstractTask * @throws IllegalArgumentException illegal argument exception */ - public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger) throws IllegalArgumentException { + public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException { TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType()); if (anEnum == null) { logger.error("not support task type: {}", taskExecutionContext.getTaskType()); @@ -57,7 +59,7 @@ public class TaskManager { case PROCEDURE: return new ProcedureTask(taskExecutionContext, logger); case SQL: - return new SqlTask(taskExecutionContext, logger); + return new SqlTask(taskExecutionContext, logger, alertClientService); case MR: return new MapReduceTask(taskExecutionContext, logger); case SPARK: diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index cba7a488c1..8686e9a660 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql; -import static org.apache.dolphinscheduler.common.Constants.COMMA; import static org.apache.dolphinscheduler.common.Constants.HIVE_CONF; import static org.apache.dolphinscheduler.common.Constants.PASSWORD; import static org.apache.dolphinscheduler.common.Constants.SEMICOLON; @@ -38,16 +37,15 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.UDFUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.alert.AlertClientService; import java.sql.Connection; import java.sql.DriverManager; @@ -80,10 +78,7 @@ public class SqlTask extends AbstractTask { * sql parameters */ private SqlParameters sqlParameters; - /** - * alert dao - */ - private AlertDao alertDao; + /** * base datasource */ @@ -99,7 +94,10 @@ public class SqlTask extends AbstractTask { */ private static final int LIMIT = 10000; - public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger) { + + private AlertClientService alertClientService; + + public SqlTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) { super(taskExecutionContext, logger); this.taskExecutionContext = taskExecutionContext; @@ -111,7 +109,7 @@ public class SqlTask extends AbstractTask { throw new RuntimeException("sql task params is not valid"); } - this.alertDao = SpringApplicationContext.getBean(AlertDao.class); + this.alertClientService = alertClientService; } @Override @@ -291,9 +289,7 @@ public class SqlTask extends AbstractTask { String result = JSONUtils.toJsonString(resultJSONArray); logger.debug("execute sql : {}", result); - sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle()) - ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() - + " query result sets", + sendAttachment(sqlParameters.getGroupId(), StringUtils.isNotEmpty(sqlParameters.getTitle()) ? sqlParameters.getTitle() : taskExecutionContext.getTaskName() + " query result sets", JSONUtils.toJsonString(resultJSONArray)); } @@ -444,48 +440,11 @@ public class SqlTask extends AbstractTask { * @param title title * @param content content */ - public void sendAttachment(String title, String content) { - - List users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId()); - - // receiving group list - List receiversList = new ArrayList<>(); - for (User user : users) { - receiversList.add(user.getEmail().trim()); + public void sendAttachment(int groupId, String title, String content) { + AlertSendResponseCommand alertSendResponseCommand = alertClientService.sendAlert(groupId, title, content); + if (!alertSendResponseCommand.getResStatus()) { + throw new RuntimeException("send mail failed!"); } - // custom receiver - String receivers = sqlParameters.getReceivers(); - if (StringUtils.isNotEmpty(receivers)) { - String[] splits = receivers.split(COMMA); - for (String receiver : splits) { - receiversList.add(receiver.trim()); - } - } - - // copy list - List receiversCcList = new ArrayList<>(); - // Custom Copier - String receiversCc = sqlParameters.getReceiversCc(); - if (StringUtils.isNotEmpty(receiversCc)) { - String[] splits = receiversCc.split(COMMA); - for (String receiverCc : splits) { - receiversCcList.add(receiverCc.trim()); - } - } - - String showTypeName = sqlParameters.getShowType().replace(COMMA, "").trim(); - /* - if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ - Map mailResult = MailUtils.sendMails(receviersList, - receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp()); - if(!(boolean) mailResult.get(STATUS)){ - throw new RuntimeException("send mail failed!"); - } - //TODO AlertServer should provide a grpc interface, which is called when other services need to send alerts - }else{ - logger.error("showType: {} is not valid " ,showTypeName); - throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName)); - }*/ } /** diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index 9fba30c147..cea1b4ea6c 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -35,3 +35,6 @@ # default worker weight #work.weight=100 + +# alert server listener host +alert.listen.host=localhost diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java index 2e7e531f30..27c10db13d 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.TaskManager; +import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; @@ -64,6 +65,8 @@ public class TaskExecuteThreadTest { private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + private AlertClientService alertClientService; + @Before public void before() { // init task execution context, logger @@ -100,8 +103,10 @@ public class TaskExecuteThreadTest { PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) .thenReturn(taskExecutionContextCacheManager); + alertClientService = PowerMockito.mock(AlertClientService.class); + PowerMockito.mockStatic(TaskManager.class); - PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger)) + PowerMockito.when(TaskManager.newTask(taskExecutionContext, taskLogger, alertClientService)) .thenReturn(new SimpleTask(taskExecutionContext, taskLogger)); PowerMockito.mockStatic(JSONUtils.class); @@ -117,7 +122,7 @@ public class TaskExecuteThreadTest { taskExecutionContext.setTaskType("SQL"); taskExecutionContext.setStartTime(new Date()); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); taskExecuteThread.run(); Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); @@ -129,7 +134,7 @@ public class TaskExecuteThreadTest { taskExecutionContext.setStartTime(null); taskExecutionContext.setDelayTime(1); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION); - TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger); + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, taskLogger, alertClientService); taskExecuteThread.run(); Assert.assertEquals(ExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java index eb0383979c..6acfd180c2 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/TaskManagerTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; +import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; @@ -46,6 +47,8 @@ public class TaskManagerTest { private TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager; + private AlertClientService alertClientService; + @Before public void before() { // init task execution context, logger @@ -74,41 +77,43 @@ public class TaskManagerTest { PowerMockito.mockStatic(SpringApplicationContext.class); PowerMockito.when(SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)) .thenReturn(taskExecutionContextCacheManager); + + alertClientService = PowerMockito.mock(AlertClientService.class); } @Test public void testNewTask() { taskExecutionContext.setTaskType("SHELL"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("WATERDROP"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("HTTP"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("MR"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("SPARK"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("FLINK"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("PYTHON"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("DATAX"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); taskExecutionContext.setTaskType("SQOOP"); - Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger)); + Assert.assertNotNull(TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService)); } @Test(expected = IllegalArgumentException.class) public void testNewTaskIsNull() { taskExecutionContext.setTaskType(null); - TaskManager.newTask(taskExecutionContext,taskLogger); + TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); } @Test(expected = IllegalArgumentException.class) public void testNewTaskIsNotExists() { taskExecutionContext.setTaskType("XXX"); - TaskManager.newTask(taskExecutionContext,taskLogger); + TaskManager.newTask(taskExecutionContext,taskLogger,alertClientService); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java new file mode 100644 index 0000000000..64db568916 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTaskTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.task.sql; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.service.alert.AlertClientService; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.Date; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * sql task test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(value = {SqlTask.class, DriverManager.class}) +public class SqlTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(SqlTaskTest.class); + + private static final String CONNECTION_PARAMS = "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\"," + + "\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"; + + private SqlTask sqlTask; + + private TaskExecutionContext taskExecutionContext; + + private AlertClientService alertClientService; + @Before + public void before() throws Exception { + taskExecutionContext = new TaskExecutionContext(); + + TaskProps props = new TaskProps(); + props.setExecutePath("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstanceId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams( + "{\"localParams\":[],\"type\":\"POSTGRESQL\",\"datasource\":1,\"sql\":\"insert into tb_1 values('1','2')\",\"sqlType\":1}"); + + taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class); + PowerMockito.when(taskExecutionContext.getTaskParams()).thenReturn(props.getTaskParams()); + PowerMockito.when(taskExecutionContext.getExecutePath()).thenReturn("/tmp"); + PowerMockito.when(taskExecutionContext.getTaskAppId()).thenReturn("1"); + PowerMockito.when(taskExecutionContext.getTenantCode()).thenReturn("root"); + PowerMockito.when(taskExecutionContext.getStartTime()).thenReturn(new Date()); + PowerMockito.when(taskExecutionContext.getTaskTimeout()).thenReturn(10000); + PowerMockito.when(taskExecutionContext.getLogPath()).thenReturn("/tmp/dx"); + + SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); + sqlTaskExecutionContext.setConnectionParams(CONNECTION_PARAMS); + PowerMockito.when(taskExecutionContext.getSqlTaskExecutionContext()).thenReturn(sqlTaskExecutionContext); + + alertClientService = PowerMockito.mock(AlertClientService.class); + sqlTask = new SqlTask(taskExecutionContext, logger, alertClientService); + sqlTask.init(); + } + + @Test + public void testGetParameters() { + Assert.assertNotNull(sqlTask.getParameters()); + } + + @Test(expected = Exception.class) + public void testHandle() throws Exception { + Connection connection = PowerMockito.mock(Connection.class); + PowerMockito.mockStatic(DriverManager.class); + PowerMockito.when(DriverManager.getConnection(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(connection); + PreparedStatement preparedStatement = PowerMockito.mock(PreparedStatement.class); + PowerMockito.when(connection.prepareStatement(Mockito.any())).thenReturn(preparedStatement); + PowerMockito.mockStatic(ParameterUtils.class); + PowerMockito.when(ParameterUtils.replaceScheduleTime(Mockito.any(), Mockito.any())).thenReturn("insert into tb_1 values('1','2')"); + + sqlTask.handle(); + Assert.assertEquals(Constants.EXIT_CODE_SUCCESS,sqlTask.getExitStatusCode()); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java index 7839b4a460..49977fa969 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/AlertClientService.java @@ -38,6 +38,10 @@ public class AlertClientService { private volatile boolean isRunning; + private String host; + + private int port; + /** * request time out */ @@ -52,6 +56,17 @@ public class AlertClientService { this.isRunning = true; } + /** + * alert client + */ + public AlertClientService(String host, int port) { + this.clientConfig = new NettyClientConfig(); + this.client = new NettyRemotingClient(clientConfig); + this.isRunning = true; + this.host = host; + this.port = port; + } + /** * close */ @@ -61,6 +76,17 @@ public class AlertClientService { logger.info("alter client closed"); } + /** + * alert sync send data + * @param groupId + * @param title + * @param content + * @return + */ + public AlertSendResponseCommand sendAlert(int groupId, String title, String content) { + return this.sendAlert(this.host,this.port,groupId,title,content); + } + /** * alert sync send data * @param host host diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue index 94ddaf8c5d..8e892ab247 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue @@ -35,12 +35,14 @@ :sql-type="sqlType"> + @@ -174,6 +176,8 @@ sqlType: '0', // Email title title: '', + // Alert groupId + groupId: '', // Form/attachment showType: ['TABLE'], // Sql parameter @@ -181,11 +185,7 @@ // Pre statements preStatements: [], // Post statements - postStatements: [], - // recipients - receivers: [], - // copy to - receiversCc: [] + postStatements: [] } }, mixins: [disabledState], @@ -275,24 +275,18 @@ if (!this.$refs.refDs._verifDatasource()) { return false } + /* if (this.sqlType==0 && !this.showType.length) { this.$message.warning(`${i18n.$t('One form or attachment must be selected')}`) return false } + */ if (this.sqlType==0 && !this.title) { - this.$message.warning(`${i18n.$t('Mail subject required')}`) - return false - } - if (this.sqlType==0 && !this.receivers.length) { - this.$message.warning(`${i18n.$t('Recipient required')}`) + this.$message.warning(`${i18n.$t('Please enter the title of alert')}`) return false } - // receivers Subcomponent verification - if (this.sqlType==0 && !this.$refs.refEmail._manualEmail()) { - return false - } - // receiversCc Subcomponent verification - if (this.sqlType==0 && !this.$refs.refCc._manualEmail()) { + if (this.sqlType==0 && !this.groupId) { + this.$message.warning(`${i18n.$t('Please select the alert group')}`) return false } // udfs Subcomponent verification Verification only if the data type is HIVE @@ -325,8 +319,7 @@ udfs: this.udfs, sqlType: this.sqlType, title: this.title, - receivers: this.receivers.join(','), - receiversCc: this.receiversCc.join(','), + groupId: this.groupId, showType: (() => { /** * Special processing return order TABLE,ATTACHMENT @@ -387,10 +380,7 @@ } else { param.processInstanceId = current.params.id } - this.store.dispatch('dag/getReceiver', param).then(res => { - this.receivers = res.receivers && res.receivers.split(',') || [] - this.receiversCc = res.receiversCc && res.receiversCc.split(',') || [] - }) + }, _cacheParams () { this.$emit('on-cache-params', { @@ -400,8 +390,7 @@ udfs: this.udfs, sqlType: this.sqlType, title: this.title, - receivers: this.receivers.join(','), - receiversCc: this.receiversCc.join(','), + groupId: this.groupId, showType: (() => { let showType = this.showType @@ -433,8 +422,7 @@ } if (val != 0) { this.title = '' - this.receivers = [] - this.receiversCc = [] + this.groupId = '' } }, // Listening data source @@ -469,8 +457,7 @@ this.preStatements = o.params.preStatements || [] this.postStatements = o.params.postStatements || [] this.title = o.params.title || '' - this.receivers = o.params.receivers && o.params.receivers.split(',') || [] - this.receiversCc = o.params.receiversCc && o.params.receiversCc.split(',') || [] + this.groupId = o.params.groupId || '' } // read tasks from cache if (!_.some(this.store.state.dag.cacheTasks, { id: this.createNodeId }) && @@ -501,8 +488,7 @@ udfs: this.udfs, sqlType: this.sqlType, title: this.title, - receivers: this.receivers.join(','), - receiversCc: this.receiversCc.join(','), + groupId: this.groupId, showType: (() => { let showType = this.showType diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index f1295da677..00d6757d62 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -122,6 +122,9 @@ export default { 'SQL Type': 'SQL Type', Title: 'Title', 'Please enter the title of email': 'Please enter the title of email', + 'Please enter the title of alert': 'Please enter the title of alert', + 'AlertGroup': 'AlertGroup', + 'Please select the alert group': 'Please select the alert group', Table: 'Table', TableMode: 'Table', Attachment: 'Attachment', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index 148bc33f0a..e936400d25 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -122,6 +122,9 @@ export default { 'SQL Type': 'sql类型', Title: '主题', 'Please enter the title of email': '请输入邮件主题', + 'Please enter the title of alert': '请输入告警主题', + 'AlertGroup': '告警组', + 'Please select the alert group': '请选择告警组', Table: '表名', TableMode: '表格', Attachment: '附件', diff --git a/install.sh b/install.sh index da21085d14..7991275a01 100755 --- a/install.sh +++ b/install.sh @@ -64,6 +64,8 @@ sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttl sed -i ${txt} "s#mail.smtp.ssl.trust.*#mail.smtp.ssl.trust=${sslTrust}#g" conf/alert.properties sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties +sed -i ${txt} "s#alert.listen.host.*#alert.listen.host=${alertServer}#g" conf/worker.properties + # 2.create directory echo "2.create directory" diff --git a/pom.xml b/pom.xml index 83681556ce..5f498241ae 100644 --- a/pom.xml +++ b/pom.xml @@ -904,6 +904,7 @@ **/server/worker/registry/WorkerRegistryTest.java **/server/worker/shell/ShellCommandExecutorTest.java **/server/worker/sql/SqlExecutorTest.java + **/server/worker/task/sql/SqlTaskTest.java **/server/worker/task/spark/SparkTaskTest.java **/server/worker/task/EnvFileTest.java **/server/worker/task/spark/SparkTaskTest.java @@ -920,7 +921,6 @@ **/service/zk/CuratorZookeeperClientTest.java **/service/queue/TaskUpdateQueueTest.java **/service/alert/AlertClientServiceTest.java - **/dao/mapper/DataSourceUserMapperTest.java **/dao/mapper/ProcessDefinitionMapperTest.java