Browse Source

Merge remote-tracking branch 'origin/dev' into dev

pull/3/MERGE
BoYiZhang 4 years ago
parent
commit
3ccb355704
  1. 15
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EmailManager.java
  2. 12
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPlugin.java
  3. 10
      dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  5. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java
  6. 16
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
  7. 22
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
  8. 5
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java
  9. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  10. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  11. 9
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  12. 2
      e2e/src/test/java/org/apache/dolphinscheduler/locator/project/CreateWorkflowLocator.java

15
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/manager/EmailManager.java

@ -27,28 +27,29 @@ import java.util.Map;
public class EmailManager { public class EmailManager {
/** /**
* email send * email send
* @param receviersList the receiver list * @param receiversList the receiver list
* @param receviersCcList the cc List * @param receiversCcList the cc List
* @param title the title * @param title the title
* @param content the content * @param content the content
* @param showType the showType * @param showType the showType
* @return the send result * @return the send result
*/ */
public Map<String,Object> send(List<String> receviersList,List<String> receviersCcList,String title,String content,String showType){ public Map<String,Object> send(List<String> receiversList,List<String> receiversCcList,String title,String content,String showType){
return MailUtils.sendMails(receviersList, receviersCcList, title, content, showType); return MailUtils.sendMails(receiversList, receiversCcList, title, content, showType);
} }
/** /**
* msg send * msg send
* @param receviersList the receiver list * @param receiversList the receiver list
* @param title the title * @param title the title
* @param content the content * @param content the content
* @param showType the showType * @param showType the showType
* @return the send result * @return the send result
*/ */
public Map<String,Object> send(List<String> receviersList,String title,String content,String showType){ public Map<String,Object> send(List<String> receiversList,String title,String content,String showType){
return MailUtils.sendMails(receviersList,title, content, showType); return MailUtils.sendMails(receiversList,title, content, showType);
} }
} }

12
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/plugin/EmailAlertPlugin.java

@ -71,32 +71,32 @@ public class EmailAlertPlugin implements AlertPlugin {
AlertData alert = info.getAlertData(); AlertData alert = info.getAlertData();
List<String> receviersList = (List<String>) info.getProp(Constants.PLUGIN_DEFAULT_EMAIL_RECEIVERS); List<String> receiversList = (List<String>) info.getProp(Constants.PLUGIN_DEFAULT_EMAIL_RECEIVERS);
// receiving group list // receiving group list
// custom receiver // custom receiver
String receivers = alert.getReceivers(); String receivers = alert.getReceivers();
if (StringUtils.isNotEmpty(receivers)) { if (StringUtils.isNotEmpty(receivers)) {
String[] splits = receivers.split(","); String[] splits = receivers.split(",");
receviersList.addAll(Arrays.asList(splits)); receiversList.addAll(Arrays.asList(splits));
} }
List<String> receviersCcList = new ArrayList<>(); List<String> receiversCcList = new ArrayList<>();
// Custom Copier // Custom Copier
String receiversCc = alert.getReceiversCc(); String receiversCc = alert.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)) { if (StringUtils.isNotEmpty(receiversCc)) {
String[] splits = receiversCc.split(","); String[] splits = receiversCc.split(",");
receviersCcList.addAll(Arrays.asList(splits)); receiversCcList.addAll(Arrays.asList(splits));
} }
if (CollectionUtils.isEmpty(receviersList) && CollectionUtils.isEmpty(receviersCcList)) { if (CollectionUtils.isEmpty(receiversList) && CollectionUtils.isEmpty(receiversCcList)) {
logger.warn("alert send error : At least one receiver address required"); logger.warn("alert send error : At least one receiver address required");
retMaps.put(Constants.STATUS, "false"); retMaps.put(Constants.STATUS, "false");
retMaps.put(Constants.MESSAGE, "execution failure,At least one receiver address required."); retMaps.put(Constants.MESSAGE, "execution failure,At least one receiver address required.");
return retMaps; return retMaps;
} }
retMaps = emailManager.send(receviersList, receviersCcList, alert.getTitle(), alert.getContent(), retMaps = emailManager.send(receiversList, receiversCcList, alert.getTitle(), alert.getContent(),
alert.getShowType()); alert.getShowType());
//send flag //send flag

10
dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/runner/AlertSender.java

@ -60,9 +60,9 @@ public class AlertSender {
users = alertDao.listUserByAlertgroupId(alert.getAlertGroupId()); users = alertDao.listUserByAlertgroupId(alert.getAlertGroupId());
// receiving group list // receiving group list
List<String> receviersList = new ArrayList<>(); List<String> receiversList = new ArrayList<>();
for (User user : users) { for (User user : users) {
receviersList.add(user.getEmail()); receiversList.add(user.getEmail());
} }
AlertData alertData = new AlertData(); AlertData alertData = new AlertData();
@ -78,17 +78,17 @@ public class AlertSender {
AlertInfo alertInfo = new AlertInfo(); AlertInfo alertInfo = new AlertInfo();
alertInfo.setAlertData(alertData); alertInfo.setAlertData(alertData);
alertInfo.addProp("receivers", receviersList); alertInfo.addProp("receivers", receiversList);
AlertPlugin emailPlugin = pluginManager.findOne(Constants.PLUGIN_DEFAULT_EMAIL_ID); AlertPlugin emailPlugin = pluginManager.findOne(Constants.PLUGIN_DEFAULT_EMAIL_ID);
retMaps = emailPlugin.process(alertInfo); retMaps = emailPlugin.process(alertInfo);
if (retMaps == null) { if (retMaps == null) {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId()); alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, "alert send error", alert.getId());
logger.info("alert send error : return value is null"); logger.error("alert send error : return value is null");
} else if (!Boolean.parseBoolean(String.valueOf(retMaps.get(Constants.STATUS)))) { } else if (!Boolean.parseBoolean(String.valueOf(retMaps.get(Constants.STATUS)))) {
alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(retMaps.get(Constants.MESSAGE)), alert.getId()); alertDao.updateAlert(AlertStatus.EXECUTION_FAILURE, String.valueOf(retMaps.get(Constants.MESSAGE)), alert.getId());
logger.info("alert send error : {}", retMaps.get(Constants.MESSAGE)); logger.error("alert send error : {}", retMaps.get(Constants.MESSAGE));
} else { } else {
alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, (String) retMaps.get(Constants.MESSAGE), alert.getId()); alertDao.updateAlert(AlertStatus.EXECUTION_SUCCESS, (String) retMaps.get(Constants.MESSAGE), alert.getId());
logger.info("alert send success"); logger.info("alert send success");

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -35,7 +35,7 @@ public enum Status {
USER_NAME_NULL(10004,"user name is null", "用户名不能为空"), USER_NAME_NULL(10004,"user name is null", "用户名不能为空"),
HDFS_OPERATION_ERROR(10006, "hdfs operation error", "hdfs操作错误"), HDFS_OPERATION_ERROR(10006, "hdfs operation error", "hdfs操作错误"),
TASK_INSTANCE_NOT_FOUND(10008, "task instance not found", "任务实例不存在"), TASK_INSTANCE_NOT_FOUND(10008, "task instance not found", "任务实例不存在"),
TENANT_NAME_EXIST(10009, "tenant code already exists", "租户编码不能为空"), TENANT_NAME_EXIST(10009, "tenant code {0} already exists", "租户编码[{0}]已存在"),
USER_NOT_EXIST(10010, "user {0} not exists", "用户[{0}]不存在"), USER_NOT_EXIST(10010, "user {0} not exists", "用户[{0}]不存在"),
ALERT_GROUP_NOT_EXIST(10011, "alarm group not found", "告警组不存在"), ALERT_GROUP_NOT_EXIST(10011, "alarm group not found", "告警组不存在"),
ALERT_GROUP_EXIST(10012, "alarm group already exists", "告警组名称已存在"), ALERT_GROUP_EXIST(10012, "alarm group already exists", "告警组名称已存在"),

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java

@ -330,7 +330,7 @@ public class TenantService extends BaseService{
Result result = new Result(); Result result = new Result();
if (checkTenantExists(tenantCode)) { if (checkTenantExists(tenantCode)) {
logger.error("tenant {} has exist, can't create again.", tenantCode); logger.error("tenant {} has exist, can't create again.", tenantCode);
putMsg(result, Status.TENANT_NAME_EXIST); putMsg(result, Status.TENANT_NAME_EXIST, tenantCode);
} else { } else {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} }

16
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java

@ -120,7 +120,23 @@ public class TenantControllerTest extends AbstractControllerTest{
} }
@Test
public void testVerifyTenantCodeExists() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("tenantCode", "tenantCode");
MvcResult mvcResult = mockMvc.perform(get("/tenant/verify-tenant-code")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.TENANT_NAME_EXIST.getCode(), result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
@Test @Test
public void testQueryTenantlist() throws Exception { public void testQueryTenantlist() throws Exception {

22
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java

@ -16,8 +16,11 @@
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage; import java.util.ArrayList;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
@ -41,10 +44,10 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.i18n.LocaleContextHolder;
import java.util.ArrayList; import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.List; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class TenantServiceTest { public class TenantServiceTest {
@ -85,6 +88,7 @@ public class TenantServiceTest {
result = tenantService.createTenant(loginUser, "test", "test", 1, "TenantServiceTest"); result = tenantService.createTenant(loginUser, "test", "test", 1, "TenantServiceTest");
logger.info(result.toString()); logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
} catch (Exception e) { } catch (Exception e) {
logger.error("create tenant error",e); logger.error("create tenant error",e);
Assert.assertTrue(false); Assert.assertTrue(false);
@ -195,8 +199,14 @@ public class TenantServiceTest {
Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg()); Assert.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
// tenantCode exist // tenantCode exist
result = tenantService.verifyTenantCode(getTenant().getTenantCode()); result = tenantService.verifyTenantCode(getTenant().getTenantCode());
String resultString;
if (Locale.SIMPLIFIED_CHINESE.getLanguage().equals(LocaleContextHolder.getLocale().getLanguage())) {
resultString = "租户编码[TenantServiceTest]已存在";
} else {
resultString = "tenant code TenantServiceTest already exists";
}
logger.info(result.toString()); logger.info(result.toString());
Assert.assertEquals(Status.TENANT_NAME_EXIST.getMsg(),result.getMsg()); Assert.assertEquals(resultString, result.getMsg());
} }

5
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/flink/FlinkParameters.java

@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.common.task.flink;
import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* spark parameters * flink parameters
*/ */
public class FlinkParameters extends AbstractParameters { public class FlinkParameters extends AbstractParameters {
@ -226,6 +225,4 @@ public class FlinkParameters extends AbstractParameters {
} }
return resourceList; return resourceList;
} }
} }

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -131,16 +131,12 @@ public class ZookeeperNodeManager implements InitializingBean {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("worker group node : {} added.", path); logger.info("worker group node : {} added.", path);
String group = parseGroup(path); String group = parseGroup(path);
Set<String> workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
logger.info("currentNodes : {}", currentNodes); logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(group, currentNodes); syncWorkerGroupNodes(group, currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("worker group node : {} down.", path); logger.info("worker group node : {} down.", path);
String group = parseGroup(path); String group = parseGroup(path);
Set<String> workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>());
Set<String> previousNodes = new HashSet<>(workerNodes);
Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); Set<String> currentNodes = registryCenter.getWorkerGroupNodesDirectly(group);
syncWorkerGroupNodes(group, currentNodes); syncWorkerGroupNodes(group, currentNodes);
alertDao.sendServerStopedAlert(1, path, "WORKER"); alertDao.sendServerStopedAlert(1, path, "WORKER");
@ -175,12 +171,10 @@ public class ZookeeperNodeManager implements InitializingBean {
try { try {
if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) {
logger.info("master node : {} added.", path); logger.info("master node : {} added.", path);
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly(); Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes); syncMasterNodes(currentNodes);
} else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) {
logger.info("master node : {} down.", path); logger.info("master node : {} down.", path);
Set<String> previousNodes = new HashSet<>(masterNodes);
Set<String> currentNodes = registryCenter.getMasterNodesDirectly(); Set<String> currentNodes = registryCenter.getMasterNodesDirectly();
syncMasterNodes(currentNodes); syncMasterNodes(currentNodes);
alertDao.sendServerStopedAlert(1, path, "MASTER"); alertDao.sendServerStopedAlert(1, path, "MASTER");

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -176,6 +176,7 @@ public class SqlTask extends AbstractTask {
logger.info("SQL title : {}",title); logger.info("SQL title : {}",title);
sqlParameters.setTitle(title); sqlParameters.setTitle(title);
} }
//new //new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job //replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime()); sql = ParameterUtils.replaceScheduleTime(sql, taskExecutionContext.getScheduleTime());
@ -324,6 +325,7 @@ public class SqlTask extends AbstractTask {
} }
} }
} }
/** /**
* create connection * create connection
* *
@ -423,34 +425,34 @@ public class SqlTask extends AbstractTask {
List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId()); List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
// receiving group list // receiving group list
List<String> receviersList = new ArrayList<>(); List<String> receiversList = new ArrayList<>();
for(User user:users){ for(User user:users){
receviersList.add(user.getEmail().trim()); receiversList.add(user.getEmail().trim());
} }
// custom receiver // custom receiver
String receivers = sqlParameters.getReceivers(); String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){ if (StringUtils.isNotEmpty(receivers)){
String[] splits = receivers.split(COMMA); String[] splits = receivers.split(COMMA);
for (String receiver : splits){ for (String receiver : splits){
receviersList.add(receiver.trim()); receiversList.add(receiver.trim());
} }
} }
// copy list // copy list
List<String> receviersCcList = new ArrayList<>(); List<String> receiversCcList = new ArrayList<>();
// Custom Copier // Custom Copier
String receiversCc = sqlParameters.getReceiversCc(); String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){ if (StringUtils.isNotEmpty(receiversCc)){
String[] splits = receiversCc.split(COMMA); String[] splits = receiversCc.split(COMMA);
for (String receiverCc : splits){ for (String receiverCc : splits){
receviersCcList.add(receiverCc.trim()); receiversCcList.add(receiverCc.trim());
} }
} }
String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim(); String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList, Map<String, Object> mailResult = MailUtils.sendMails(receiversList,
receviersCcList, title, content, ShowType.valueOf(showTypeName).getDescp()); receiversCcList, title, content, ShowType.valueOf(showTypeName).getDescp());
if(!(boolean) mailResult.get(STATUS)){ if(!(boolean) mailResult.get(STATUS)){
throw new RuntimeException("send mail failed!"); throw new RuntimeException("send mail failed!");
} }

9
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -260,7 +260,7 @@
</div> </div>
<div class="bottom-box"> <div class="bottom-box">
<div class="submit" style="background: #fff;"> <div class="submit" style="background: #fff;">
<x-button type="text" @click="close()"> {{$t('Cancel')}} </x-button> <x-button type="text" id="cancelBtn"> {{$t('Cancel')}} </x-button>
<x-button type="primary" shape="circle" :loading="spinnerLoading" @click="ok()" :disabled="isDetails">{{spinnerLoading ? 'Loading...' : $t('Confirm add')}} </x-button> <x-button type="primary" shape="circle" :loading="spinnerLoading" @click="ok()" :disabled="isDetails">{{spinnerLoading ? 'Loading...' : $t('Confirm add')}} </x-button>
</div> </div>
</div> </div>
@ -580,6 +580,7 @@
} }
this.isContentBox = false this.isContentBox = false
// flag Whether to delete a node this.$destroy() // flag Whether to delete a node this.$destroy()
this.$emit('close', { this.$emit('close', {
item: { item: {
type: this.cacheBackfillItem.type, type: this.cacheBackfillItem.type,
@ -675,7 +676,11 @@
this.isContentBox = true this.isContentBox = true
}, },
mounted () { mounted () {
let self = this
$("#cancelBtn").mousedown(function(event){
event.preventDefault();
self.close()
});
}, },
updated () { updated () {
}, },

2
e2e/src/test/java/org/apache/dolphinscheduler/locator/project/CreateWorkflowLocator.java

@ -126,7 +126,7 @@ public class CreateWorkflowLocator {
* save workflow * save workflow
*/ */
//click save workflow button //click save workflow button
public static final By CLICK_SAVE_WORKFLOW_BUTTON = By.xpath("//button/span"); public static final By CLICK_SAVE_WORKFLOW_BUTTON = By.xpath("//button[3]/span");
//input workflow name //input workflow name
public static final By INPUT_WORKFLOW_NAME = By.xpath("//input"); public static final By INPUT_WORKFLOW_NAME = By.xpath("//input");

Loading…
Cancel
Save