Browse Source

processdefine import export bug fix (#1023)

* add ConnectionFactoryTest and ConnectionFactory read datasource from appliction.yml

* .escheduler_env.sh to dolphinscheduler_env.sh

* dao yml assembly to conf directory

* table name modify

* entity title table  name modify

* logback log name modify

* running through the big process

* running through the big process error modify

* logback log name modify

* data_source.properties rename

* logback log name modify

* install.sh optimization

* install.sh optimization

* command count modify

* command state update

* countCommandState sql update

* countCommandState sql update

* remove application.yml file

* master.properties modify

* install.sh modify

* install.sh modify

* api server startup modify

* the current user quits and the session is completely emptied. bug fix

* remove pom package resources

* checkQueueNameExist method update

* checkQueueExist

* install.sh error output update

* signOut error update

* ProcessDao is null bug fix

* install.sh add mail.user

* request url variables replace

* process define import bug fix

* process define import export bug fix

* processdefine import export bug fix
pull/2/head
qiaozhanwei 5 years ago committed by lgcareer
parent
commit
b8ae782f2b
  1. 6
      dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java
  2. 89
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  4. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

6
dolphinscheduler-alert/src/test/java/org/apache/dolphinscheduler/alert/utils/MailUtilsTest.java

@ -48,8 +48,8 @@ public class MailUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(MailUtilsTest.class); private static final Logger logger = LoggerFactory.getLogger(MailUtilsTest.class);
@Test @Test
public void testSendMails() { public void testSendMails() {
String[] receivers = new String[]{"xx@xx.com"}; String[] receivers = new String[]{"825193156@qq.com"};
String[] receiversCc = new String[]{"xxx@xxx.com"}; String[] receiversCc = new String[]{"825193156@qq.com"};
String content ="[\"id:69\"," + String content ="[\"id:69\"," +
"\"name:UserBehavior-0--1193959466\"," + "\"name:UserBehavior-0--1193959466\"," +
@ -114,7 +114,7 @@ public class MailUtilsTest {
@Test @Test
public void testSendTableMail(){ public void testSendTableMail(){
String[] mails = new String[]{"xx@xx.com"}; String[] mails = new String[]{"825193156@qq.com"};
Alert alert = new Alert(); Alert alert = new Alert();
alert.setTitle("Mysql Exception"); alert.setTitle("Mysql Exception");
alert.setShowType(ShowType.TABLE); alert.setShowType(ShowType.TABLE);

89
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -536,7 +536,7 @@ public class ProcessDefinitionService extends BaseDAGService {
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS); Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus == Status.SUCCESS) { if (resultStatus == Status.SUCCESS) {
ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
if (processDefinition != null) { if (processDefinition != null) {
JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson()); JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson());
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
@ -551,6 +551,25 @@ public class ProcessDefinitionService extends BaseDAGService {
sqlParameters.put("datasourceName", dataSource.getName()); sqlParameters.put("datasourceName", dataSource.getName());
} }
taskNode.put("params", sqlParameters); taskNode.put("params", sqlParameters);
}else if(taskType.equals(TaskType.DEPENDENT.name())){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
int definitionId = dependentItem.getInteger("definitionId");
ProcessDefinition definition = processDefineMapper.selectById(definitionId);
if(definition != null){
dependentItem.put("projectName",definition.getProjectName());
dependentItem.put("definitionName",definition.getName());
}
}
}
taskNode.put("dependence", dependentParameters);
}
} }
} }
} }
@ -561,7 +580,7 @@ public class ProcessDefinitionService extends BaseDAGService {
row.put("projectName", processDefinition.getProjectName()); row.put("projectName", processDefinition.getProjectName());
row.put("processDefinitionName", processDefinition.getName()); row.put("processDefinitionName", processDefinition.getName());
row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson()); row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson());
row.put("processDefinitionDesc", processDefinition.getDescription()); row.put("processDefinitionDescription", processDefinition.getDescription());
row.put("processDefinitionLocations", processDefinition.getLocations()); row.put("processDefinitionLocations", processDefinition.getLocations());
row.put("processDefinitionConnects", processDefinition.getConnects()); row.put("processDefinitionConnects", processDefinition.getConnects());
@ -574,7 +593,7 @@ public class ProcessDefinitionService extends BaseDAGService {
row.put("scheduleEndTime", schedule.getEndTime()); row.put("scheduleEndTime", schedule.getEndTime());
row.put("scheduleCrontab", schedule.getCrontab()); row.put("scheduleCrontab", schedule.getCrontab());
row.put("scheduleFailureStrategy", schedule.getFailureStrategy()); row.put("scheduleFailureStrategy", schedule.getFailureStrategy());
row.put("scheduleReleaseState", schedule.getReleaseState()); row.put("scheduleReleaseState", ReleaseState.OFFLINE);
row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority()); row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority());
if(schedule.getId() == -1){ if(schedule.getId() == -1){
row.put("scheduleWorkerGroupId", -1); row.put("scheduleWorkerGroupId", -1);
@ -647,19 +666,22 @@ public class ProcessDefinitionService extends BaseDAGService {
projectName = json.get("projectName").toString(); projectName = json.get("projectName").toString();
} else { } else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
return result;
} }
if (ObjectUtils.allNotNull(json.get("processDefinitionName"))) { if (ObjectUtils.allNotNull(json.get("processDefinitionName"))) {
processDefinitionName = json.get("processDefinitionName").toString(); processDefinitionName = json.get("processDefinitionName").toString();
} else { } else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
return result;
} }
if (ObjectUtils.allNotNull(json.get("processDefinitionJson"))) { if (ObjectUtils.allNotNull(json.get("processDefinitionJson"))) {
processDefinitionJson = json.get("processDefinitionJson").toString(); processDefinitionJson = json.get("processDefinitionJson").toString();
} else { } else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson");
return result;
} }
if (ObjectUtils.allNotNull(json.get("processDefinitionDesc"))) { if (ObjectUtils.allNotNull(json.get("processDefinitionDescription"))) {
processDefinitionDesc = json.get("processDefinitionDesc").toString(); processDefinitionDesc = json.get("processDefinitionDescription").toString();
} }
if (ObjectUtils.allNotNull(json.get("processDefinitionLocations"))) { if (ObjectUtils.allNotNull(json.get("processDefinitionLocations"))) {
processDefinitionLocations = json.get("processDefinitionLocations").toString(); processDefinitionLocations = json.get("processDefinitionLocations").toString();
@ -668,17 +690,46 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefinitionConnects = json.get("processDefinitionConnects").toString(); processDefinitionConnects = json.get("processDefinitionConnects").toString();
} }
Project project = projectMapper.queryByName(projectName);
if(project != null){
processDefinitionName = recursionProcessDefinitionName(project.getId(), processDefinitionName, 1);
}
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson); JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks"); JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int j = 0; j < jsonArray.size(); j++) { for (int j = 0; j < jsonArray.size(); j++) {
JSONObject taskNode = jsonArray.getJSONObject(j); JSONObject taskNode = jsonArray.getJSONObject(j);
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); String taskType = taskNode.getString("type");
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName")); if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())) {
if (dataSources.size() > 0) { JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
DataSource dataSource = dataSources.get(0); List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
sqlParameters.put("datasource", dataSource.getId()); if (dataSources.size() > 0) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put("datasource", dataSource.getId());
}
taskNode.put("params", sqlParameters);
}else if(taskType.equals(TaskType.DEPENDENT.name())){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int h = 0; h < dependTaskList.size(); h++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(h);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
Project dependentItemProject = projectMapper.queryByName(dependentItem.getString("projectName"));
if(dependentItemProject != null){
ProcessDefinition definition = processDefineMapper.queryByDefineName(dependentItemProject.getId(),dependentItem.getString("definitionName"));
if(definition != null){
dependentItem.put("projectId",dependentItemProject.getId());
dependentItem.put("definitionId",definition.getId());
}
}
}
}
taskNode.put("dependence", dependentParameters);
}
} }
taskNode.put("params", sqlParameters);
} }
jsonObject.put("tasks", jsonArray); jsonObject.put("tasks", jsonArray);
@ -756,6 +807,7 @@ public class ProcessDefinitionService extends BaseDAGService {
} }
/** /**
* check the process definition node meets the specifications * check the process definition node meets the specifications
* *
@ -1119,5 +1171,20 @@ public class ProcessDefinitionService extends BaseDAGService {
return graph.hasCycle(); return graph.hasCycle();
} }
private String recursionProcessDefinitionName(Integer projectId,String processDefinitionName,int num){
ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName);
if (processDefinition != null) {
if(num>1){
String str = processDefinitionName.substring(0,processDefinitionName.length() - 3);
processDefinitionName = str + "("+num+")";
}else{
processDefinitionName = processDefinition.getName() + "("+num+")";
}
}else{
return processDefinitionName;
}
return recursionProcessDefinitionName(projectId,processDefinitionName,num + 1);
}
} }

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -30,6 +30,8 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
ProcessDefinition queryByDefineName(@Param("projectId") int projectId, ProcessDefinition queryByDefineName(@Param("projectId") int projectId,
@Param("processDefinitionName") String name); @Param("processDefinitionName") String name);
ProcessDefinition queryByDefineId(@Param("processDefineId") int processDefineId);
IPage<ProcessDefinition> queryDefineListPaging(IPage<ProcessDefinition> page, IPage<ProcessDefinition> queryDefineListPaging(IPage<ProcessDefinition> page,
@Param("searchVal") String searchVal, @Param("searchVal") String searchVal,
@Param("userId") int userId, @Param("userId") int userId,

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -52,5 +52,18 @@
group by td.user_id,tu.user_name group by td.user_id,tu.user_name
</select> </select>
<select id="queryByDefineId" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT
pd.*, u.user_name,
p.`name` AS project_name
FROM
t_ds_process_definition pd,
t_ds_user u,
t_ds_project p
WHERE
pd.user_id = u.id AND pd.project_id = p.id
AND pd.id = #{processDefineId}
</select>
</mapper> </mapper>
Loading…
Cancel
Save