diff --git a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java index 7525256f9b..b806496e63 100644 --- a/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java +++ b/dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java @@ -48,8 +48,8 @@ public class MailUtilsTest { private static final Logger logger = LoggerFactory.getLogger(MailUtilsTest.class); @Test public void testSendMails() { - String[] receivers = new String[]{"xx@xx.com"}; - String[] receiversCc = new String[]{"xxx@xxx.com"}; + String[] receivers = new String[]{"825193156@qq.com"}; + String[] receiversCc = new String[]{"825193156@qq.com"}; String content ="[\"id:69\"," + "\"name:UserBehavior-0--1193959466\"," + @@ -114,7 +114,7 @@ public class MailUtilsTest { @Test public void testSendTableMail(){ - String[] mails = new String[]{"xx@xx.com"}; + String[] mails = new String[]{"825193156@qq.com"}; Alert alert = new Alert(); alert.setTitle("Mysql Exception"); alert.setShowType(ShowType.TABLE); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 6e31b44c15..91cb385614 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -536,7 +536,7 @@ public class ProcessDefinitionService extends BaseDAGService { Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus == Status.SUCCESS) { - ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); + ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); if (processDefinition != null) { JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson()); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); @@ -551,6 +551,25 @@ public class ProcessDefinitionService extends BaseDAGService { sqlParameters.put("datasourceName", dataSource.getName()); } taskNode.put("params", sqlParameters); + }else if(taskType.equals(TaskType.DEPENDENT.name())){ + JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + if(dependentParameters != null){ + JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + for (int j = 0; j < dependTaskList.size(); j++) { + JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); + JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + for (int k = 0; k < dependItemList.size(); k++) { + JSONObject dependentItem = dependItemList.getJSONObject(k); + int definitionId = dependentItem.getInteger("definitionId"); + ProcessDefinition definition = processDefineMapper.selectById(definitionId); + if(definition != null){ + dependentItem.put("projectName",definition.getProjectName()); + dependentItem.put("definitionName",definition.getName()); + } + } + } + taskNode.put("dependence", dependentParameters); + } } } } @@ -561,7 +580,7 @@ public class ProcessDefinitionService extends BaseDAGService { row.put("projectName", processDefinition.getProjectName()); row.put("processDefinitionName", processDefinition.getName()); row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson()); - row.put("processDefinitionDesc", processDefinition.getDescription()); + row.put("processDefinitionDescription", processDefinition.getDescription()); row.put("processDefinitionLocations", processDefinition.getLocations()); row.put("processDefinitionConnects", processDefinition.getConnects()); @@ -574,7 +593,7 @@ public class ProcessDefinitionService extends BaseDAGService { row.put("scheduleEndTime", schedule.getEndTime()); row.put("scheduleCrontab", schedule.getCrontab()); row.put("scheduleFailureStrategy", schedule.getFailureStrategy()); - row.put("scheduleReleaseState", schedule.getReleaseState()); + row.put("scheduleReleaseState", ReleaseState.OFFLINE); row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority()); if(schedule.getId() == -1){ row.put("scheduleWorkerGroupId", -1); @@ -647,19 +666,22 @@ public class ProcessDefinitionService extends BaseDAGService { projectName = json.get("projectName").toString(); } else { putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); + return result; } if (ObjectUtils.allNotNull(json.get("processDefinitionName"))) { processDefinitionName = json.get("processDefinitionName").toString(); } else { putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); + return result; } if (ObjectUtils.allNotNull(json.get("processDefinitionJson"))) { processDefinitionJson = json.get("processDefinitionJson").toString(); } else { putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); + return result; } - if (ObjectUtils.allNotNull(json.get("processDefinitionDesc"))) { - processDefinitionDesc = json.get("processDefinitionDesc").toString(); + if (ObjectUtils.allNotNull(json.get("processDefinitionDescription"))) { + processDefinitionDesc = json.get("processDefinitionDescription").toString(); } if (ObjectUtils.allNotNull(json.get("processDefinitionLocations"))) { processDefinitionLocations = json.get("processDefinitionLocations").toString(); @@ -668,17 +690,46 @@ public class ProcessDefinitionService extends BaseDAGService { processDefinitionConnects = json.get("processDefinitionConnects").toString(); } + Project project = projectMapper.queryByName(projectName); + if(project != null){ + processDefinitionName = recursionProcessDefinitionName(project.getId(), processDefinitionName, 1); + } + JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); for (int j = 0; j < jsonArray.size(); j++) { JSONObject taskNode = jsonArray.getJSONObject(j); - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); - if (dataSources.size() > 0) { - DataSource dataSource = dataSources.get(0); - sqlParameters.put("datasource", dataSource.getId()); + String taskType = taskNode.getString("type"); + if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())) { + JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); + List dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); + if (dataSources.size() > 0) { + DataSource dataSource = dataSources.get(0); + sqlParameters.put("datasource", dataSource.getId()); + } + taskNode.put("params", sqlParameters); + }else if(taskType.equals(TaskType.DEPENDENT.name())){ + JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + if(dependentParameters != null){ + JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + for (int h = 0; h < dependTaskList.size(); h++) { + JSONObject dependentTaskModel = dependTaskList.getJSONObject(h); + JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + for (int k = 0; k < dependItemList.size(); k++) { + JSONObject dependentItem = dependItemList.getJSONObject(k); + Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName")); + if(dependentItemProject != null){ + ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName")); + if(definition != null){ + dependentItem.put("projectId",dependentItemProject.getId()); + dependentItem.put("definitionId",definition.getId()); + } + } + } + } + taskNode.put("dependence", dependentParameters); + } } - taskNode.put("params", sqlParameters); } jsonObject.put("tasks", jsonArray); @@ -756,6 +807,7 @@ public class ProcessDefinitionService extends BaseDAGService { } + /** * check the process definition node meets the specifications * @@ -1119,5 +1171,20 @@ public class ProcessDefinitionService extends BaseDAGService { return graph.hasCycle(); } + private String recursionProcessDefinitionName(Integer projectId,String processDefinitionName,int num){ + ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); + if (processDefinition != null) { + if(num>1){ + String str = processDefinitionName.substring(0,processDefinitionName.length() - 3); + processDefinitionName = str + "("+num+")"; + }else{ + processDefinitionName = processDefinition.getName() + "("+num+")"; + } + }else{ + return processDefinitionName; + } + return recursionProcessDefinitionName(projectId,processDefinitionName,num + 1); + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java index 1f4a49731d..1eab334a0d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java @@ -30,6 +30,8 @@ public interface ProcessDefinitionMapper extends BaseMapper { ProcessDefinition queryByDefineName(@Param("projectId") int projectId, @Param("processDefinitionName") String name); + ProcessDefinition queryByDefineId(@Param("processDefineId") int processDefineId); + IPage queryDefineListPaging(IPage page, @Param("searchVal") String searchVal, @Param("userId") int userId, diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml index ed53f81540..286be0afc9 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml @@ -52,5 +52,18 @@ group by td.user_id,tu.user_name + + \ No newline at end of file