diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java index bc015c28aa..571b2ea469 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java @@ -256,9 +256,9 @@ public class ProjectController extends BaseController { /** * import process definition - * * @param loginUser login user * @param file resource file + * @param projectName project name * @return import result code */ @ApiOperation(value = "importProcessDefinition", notes= "EXPORT_PROCCESS_DEFINITION_NOTES") diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java new file mode 100644 index 0000000000..7c4a5cf23e --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/ProcessMeta.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.dto; + +/** + * ProcessMeta + */ +public class ProcessMeta { + + /** + * project name + */ + private String projectName; + + /** + * process definition name + */ + private String processDefinitionName; + + /** + * processs definition json + */ + private String processDefinitionJson; + + /** + * process definition desc + */ + private String processDefinitionDescription; + + /** + * process definition locations + */ + private String processDefinitionLocations; + + /** + * process definition connects + */ + private String processDefinitionConnects; + + /** + * warning type + */ + private String scheduleWarningType; + + /** + * warning group id + */ + private int scheduleWarningGroupId; + + /** + * warning group name + */ + private String scheduleWarningGroupName; + + /** + * start time + */ + private String scheduleStartTime; + + /** + * end time + */ + private String scheduleEndTime; + + /** + * crontab + */ + private String scheduleCrontab; + + /** + * failure strategy + */ + private String scheduleFailureStrategy; + + /** + * release state + */ + private String scheduleReleaseState; + + /** + * process instance priority + */ + private String scheduleProcessInstancePriority; + + /** + * worker group id + */ + private int scheduleWorkerGroupId; + + /** + * worker group name + */ + private String scheduleWorkerGroupName; + + public ProcessMeta() { + } + + public String getProjectName() { + return projectName; + } + + public void setProjectName(String projectName) { + this.projectName = projectName; + } + + public String getProcessDefinitionName() { + return processDefinitionName; + } + + public void setProcessDefinitionName(String processDefinitionName) { + this.processDefinitionName = processDefinitionName; + } + + public String getProcessDefinitionJson() { + return processDefinitionJson; + } + + public void setProcessDefinitionJson(String processDefinitionJson) { + this.processDefinitionJson = processDefinitionJson; + } + + public String getProcessDefinitionDescription() { + return processDefinitionDescription; + } + + public void setProcessDefinitionDescription(String processDefinitionDescription) { + this.processDefinitionDescription = processDefinitionDescription; + } + + public String getProcessDefinitionLocations() { + return processDefinitionLocations; + } + + public void setProcessDefinitionLocations(String processDefinitionLocations) { + this.processDefinitionLocations = processDefinitionLocations; + } + + public String getProcessDefinitionConnects() { + return processDefinitionConnects; + } + + public void setProcessDefinitionConnects(String processDefinitionConnects) { + this.processDefinitionConnects = processDefinitionConnects; + } + + public String getScheduleWarningType() { + return scheduleWarningType; + } + + public void setScheduleWarningType(String scheduleWarningType) { + this.scheduleWarningType = scheduleWarningType; + } + + public int getScheduleWarningGroupId() { + return scheduleWarningGroupId; + } + + public void setScheduleWarningGroupId(int scheduleWarningGroupId) { + this.scheduleWarningGroupId = scheduleWarningGroupId; + } + + public String getScheduleWarningGroupName() { + return scheduleWarningGroupName; + } + + public void setScheduleWarningGroupName(String scheduleWarningGroupName) { + this.scheduleWarningGroupName = scheduleWarningGroupName; + } + + public String getScheduleStartTime() { + return scheduleStartTime; + } + + public void setScheduleStartTime(String scheduleStartTime) { + this.scheduleStartTime = scheduleStartTime; + } + + public String getScheduleEndTime() { + return scheduleEndTime; + } + + public void setScheduleEndTime(String scheduleEndTime) { + this.scheduleEndTime = scheduleEndTime; + } + + public String getScheduleCrontab() { + return scheduleCrontab; + } + + public void setScheduleCrontab(String scheduleCrontab) { + this.scheduleCrontab = scheduleCrontab; + } + + public String getScheduleFailureStrategy() { + return scheduleFailureStrategy; + } + + public void setScheduleFailureStrategy(String scheduleFailureStrategy) { + this.scheduleFailureStrategy = scheduleFailureStrategy; + } + + public String getScheduleReleaseState() { + return scheduleReleaseState; + } + + public void setScheduleReleaseState(String scheduleReleaseState) { + this.scheduleReleaseState = scheduleReleaseState; + } + + public String getScheduleProcessInstancePriority() { + return scheduleProcessInstancePriority; + } + + public void setScheduleProcessInstancePriority(String scheduleProcessInstancePriority) { + this.scheduleProcessInstancePriority = scheduleProcessInstancePriority; + } + + public int getScheduleWorkerGroupId() { + return scheduleWorkerGroupId; + } + + public void setScheduleWorkerGroupId(int scheduleWorkerGroupId) { + this.scheduleWorkerGroupId = scheduleWorkerGroupId; + } + + public String getScheduleWorkerGroupName() { + return scheduleWorkerGroupName; + } + + public void setScheduleWorkerGroupName(String scheduleWorkerGroupName) { + this.scheduleWorkerGroupName = scheduleWorkerGroupName; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index ea997af21c..80967acac2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -22,11 +22,14 @@ import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory; +import org.apache.dolphinscheduler.api.utils.exportprocess.exportProcessAddTaskParam; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.graph.DAG; @@ -496,43 +499,7 @@ public class ProcessDefinitionService extends BaseDAGService { ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); if (null != processDefinition) { - //correct task param which has data source or dependent param - String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); - processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); - - Map row = new LinkedHashMap<>(); - row.put("projectName", processDefinition.getProjectName()); - row.put("processDefinitionName", processDefinition.getName()); - row.put("processDefinitionJson", processDefinition.getProcessDefinitionJson()); - row.put("processDefinitionDescription", processDefinition.getDescription()); - row.put("processDefinitionLocations", processDefinition.getLocations()); - row.put("processDefinitionConnects", processDefinition.getConnects()); - - //schedule info - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); - if (!schedules.isEmpty()) { - Schedule schedule = schedules.get(0); - row.put("scheduleWarningType", schedule.getWarningType()); - row.put("scheduleWarningGroupId", schedule.getWarningGroupId()); - row.put("scheduleStartTime", DateUtils.dateToString(schedule.getStartTime())); - row.put("scheduleEndTime", DateUtils.dateToString(schedule.getEndTime())); - row.put("scheduleCrontab", schedule.getCrontab()); - row.put("scheduleFailureStrategy", schedule.getFailureStrategy()); - row.put("scheduleReleaseState", ReleaseState.OFFLINE); - row.put("scheduleProcessInstancePriority", schedule.getProcessInstancePriority()); - if(schedule.getId() == -1){ - row.put("scheduleWorkerGroupId", -1); - }else{ - WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); - if(workerGroup != null){ - row.put("scheduleWorkerGroupName", workerGroup.getName()); - } - } - - } - - //create workflow json file - String rowsJson = JSONUtils.toJsonString(row); + String exportProcessJson = exportProcessMetaDataStr(processDefinitionId, processDefinition); response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json"); BufferedOutputStream buff = null; @@ -540,7 +507,7 @@ public class ProcessDefinitionService extends BaseDAGService { try { out = response.getOutputStream(); buff = new BufferedOutputStream(out); - buff.write(rowsJson.getBytes(StandardCharsets.UTF_8)); + buff.write(exportProcessJson.getBytes(StandardCharsets.UTF_8)); buff.flush(); buff.close(); } catch (IOException e) { @@ -560,12 +527,60 @@ public class ProcessDefinitionService extends BaseDAGService { logger.warn("export process output stream not close", e); } } - } } } } + /** + * get export process metadata string + * @param processDefinitionId process definition id + * @param processDefinition process definition + * @return export process metadata string + */ + public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { + //correct task param which has data source or dependent param + String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); + processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); + + //export process metadata + ProcessMeta exportProcessMeta = new ProcessMeta(); + exportProcessMeta.setProjectName(processDefinition.getProjectName()); + exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); + exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); + exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); + exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); + + //schedule info + List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); + if (!schedules.isEmpty()) { + Schedule schedule = schedules.get(0); + WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); + + if (null == workerGroup && schedule.getWorkerGroupId() == -1) { + workerGroup = new WorkerGroup(); + workerGroup.setId(-1); + workerGroup.setName(""); + } + + exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); + exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); + exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); + exportProcessMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime())); + exportProcessMeta.setScheduleCrontab(schedule.getCrontab()); + exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy())); + exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE)); + exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority())); + + if (null != workerGroup) { + exportProcessMeta.setScheduleWorkerGroupId(workerGroup.getId()); + exportProcessMeta.setScheduleWorkerGroupName(workerGroup.getName()); + } + } + //create workflow json file + return JSONUtils.toJsonString(exportProcessMeta); + } + /** * correct task param which has datasource or dependent * @param processDefinitionJson processDefinitionJson @@ -580,35 +595,9 @@ public class ProcessDefinitionService extends BaseDAGService { if (StringUtils.isNotEmpty(taskNode.getString("type"))) { String taskType = taskNode.getString("type"); - if(checkTaskHasDataSource(taskType)){ - // add sqlParameters - JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); - DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); - if (null != dataSource) { - sqlParameters.put("datasourceName", dataSource.getName()); - } - taskNode.put("params", sqlParameters); - }else if(checkTaskHasDependent(taskType)){ - // add dependent param - JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); - - if(null != dependentParameters){ - JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); - for (int j = 0; j < dependTaskList.size(); j++) { - JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); - JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); - for (int k = 0; k < dependItemList.size(); k++) { - JSONObject dependentItem = dependItemList.getJSONObject(k); - int definitionId = dependentItem.getInteger("definitionId"); - ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); - if(null != definition){ - dependentItem.put("projectName",definition.getProjectName()); - dependentItem.put("definitionName",definition.getName()); - } - } - } - taskNode.put("dependence", dependentParameters); - } + exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + if (null != addTaskParam) { + addTaskParam.addSpecialParam(taskNode); } } } @@ -648,7 +637,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param loginUser login user * @param file process metadata json file * @param currentProjectName current project name - * @return + * @return import process */ @Transactional(rollbackFor = Exception.class) public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { @@ -860,6 +849,8 @@ public class ProcessDefinitionService extends BaseDAGService { * recursion create sub process * @param loginUser login user * @param targetProject target project + * @param jsonArray process task array + * @param subProcessIdMap correct sub process id map */ public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map subProcessIdMap) { for (int i = 0; i < jsonArray.size(); i++) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java new file mode 100644 index 0000000000..c013aac92b --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParam.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.utils.exportprocess; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * task node add datasource param strategy + */ +@Service +public class DataSourceParam implements exportProcessAddTaskParam, InitializingBean { + + @Autowired + private DataSourceMapper dataSourceMapper; + + /** + * add datasource params + * @param taskNode task node json object + * @return task node json object + */ + @Override + public JSONObject addSpecialParam(JSONObject taskNode) { + // add sqlParameters + JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params")); + DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource")); + if (null != dataSource) { + sqlParameters.put("datasourceName", dataSource.getName()); + } + taskNode.put("params", sqlParameters); + + return taskNode; + } + + + /** + * put datasource strategy + */ + @Override + public void afterPropertiesSet() { + TaskNodeParamFactory.register(TaskType.SQL.name(), this); + TaskNodeParamFactory.register(TaskType.PROCEDURE.name(), this); + } +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java new file mode 100644 index 0000000000..bdf202c5fa --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParam.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.utils.exportprocess; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * task node add dependent param strategy + */ +@Service +public class DependentParam implements exportProcessAddTaskParam, InitializingBean { + + + @Autowired + ProcessDefinitionMapper processDefineMapper; + + /** + * add dependent param + * @param taskNode task node json object + * @return task node json object + */ + @Override + public JSONObject addSpecialParam(JSONObject taskNode) { + // add dependent param + JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence")); + + if (null != dependentParameters) { + JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList"); + for (int j = 0; j < dependTaskList.size(); j++) { + JSONObject dependentTaskModel = dependTaskList.getJSONObject(j); + JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList"); + for (int k = 0; k < dependItemList.size(); k++) { + JSONObject dependentItem = dependItemList.getJSONObject(k); + int definitionId = dependentItem.getInteger("definitionId"); + ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId); + if (null != definition) { + dependentItem.put("projectName", definition.getProjectName()); + dependentItem.put("definitionName", definition.getName()); + } + } + } + taskNode.put("dependence", dependentParameters); + } + + return taskNode; + } + + /** + * put dependent strategy + */ + @Override + public void afterPropertiesSet() { + TaskNodeParamFactory.register(TaskType.DEPENDENT.name(), this); + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java new file mode 100644 index 0000000000..f4faa152a6 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/TaskNodeParamFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.utils.exportprocess; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * task node param factory + */ +public class TaskNodeParamFactory { + + private static Map taskServices = new ConcurrentHashMap<>(); + + public static exportProcessAddTaskParam getByTaskType(String taskType){ + return taskServices.get(taskType); + } + + static void register(String taskType, exportProcessAddTaskParam addSpecialTaskParam){ + if (null != taskType) { + taskServices.put(taskType, addSpecialTaskParam); + } + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java new file mode 100644 index 0000000000..5ae1667cfb --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/exportprocess/exportProcessAddTaskParam.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.utils.exportprocess; + +import com.alibaba.fastjson.JSONObject; + +/** + * exportProcessAddTaskParam + */ +public interface exportProcessAddTaskParam { + + /** + * add task special param: sql task dependent task + * @param taskNode task node json object + * @return task node json object + */ + JSONObject addSpecialParam(JSONObject taskNode); +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index b8207972f6..82ba43d520 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -22,16 +22,11 @@ import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.DataSource; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.mapper.*; import org.apache.http.entity.ContentType; import org.json.JSONException; import org.junit.Assert; @@ -52,8 +47,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; +import java.util.*; @RunWith(MockitoJUnitRunner.Silent.class) @SpringBootTest(classes = ApiApplicationServer.class) @@ -75,6 +69,29 @@ public class ProcessDefinitionServiceTest { @Mock private ProjectService projectService; + @Mock + private ScheduleMapper scheduleMapper; + + @Mock + private WorkerGroupMapper workerGroupMapper; + + private String sqlDependentJson = "{\"globalParams\":[]," + + "\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" + + ",\"localParams\":[],\"connParams\":\"\"," + + "\"preStatements\":[],\"postStatements\":[]}," + + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," + + "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + + "\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," + + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," + + "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + + "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + + "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; + @Test public void queryProccessDefinitionList() throws Exception { String projectName = "project_test1"; @@ -147,29 +164,22 @@ public class ProcessDefinitionServiceTest { Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource()); Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition()); + String corSqlDependentJson = processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson); - String sqlDependentJson = "{\"globalParams\":[]," + - "\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + - "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + - "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" + - ",\"localParams\":[],\"connParams\":\"\"," + - "\"preStatements\":[],\"postStatements\":[]}," + - "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + - "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," + - "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + - "\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + - "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," + - "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," + - "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + - "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + - "\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + - "\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; + JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false); - String corSqlDependentJson = processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson); + } + @Test + public void testExportProcessMetaDataStr() { + Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList()); + Mockito.when(workerGroupMapper.selectById(-1)).thenReturn(null); - JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false); + ProcessDefinition processDefinition = getProcessDefinition(); + processDefinition.setProcessDefinitionJson(sqlDependentJson); + String exportProcessMetaDataStr = processDefinitionService.exportProcessMetaDataStr(46, processDefinition); + Assert.assertNotEquals(sqlDependentJson,exportProcessMetaDataStr); } /** @@ -350,6 +360,34 @@ public class ProcessDefinitionServiceTest { return project; } + /** + * get mock schedule + * @return schedule + */ + private Schedule getSchedule() { + Date date = new Date(); + Schedule schedule = new Schedule(); + schedule.setId(46); + schedule.setProcessDefinitionId(1); + schedule.setStartTime(date); + schedule.setEndTime(date); + schedule.setCrontab("0 0 5 * * ? *"); + schedule.setFailureStrategy(FailureStrategy.END); + schedule.setUserId(1); + schedule.setReleaseState(ReleaseState.OFFLINE); + schedule.setProcessInstancePriority(Priority.MEDIUM); + schedule.setWarningType(WarningType.NONE); + schedule.setWarningGroupId(1); + schedule.setWorkerGroupId(-1); + return schedule; + } + + private List getSchedulerList() { + List scheduleList = new ArrayList<>(); + scheduleList.add(getSchedule()); + return scheduleList; + } + private void putMsg(Map result, Status status, Object... statusParams) { result.put(Constants.STATUS, status); if (statusParams != null && statusParams.length > 0) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java new file mode 100644 index 0000000000..0a271d984f --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DataSourceParamTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.utils.exportprocess; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.api.ApiApplicationServer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.json.JSONException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.skyscreamer.jsonassert.JSONAssert; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * DataSourceParamTest + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = ApiApplicationServer.class) +public class DataSourceParamTest { + + @Test + public void testAddDependentSpecialParam() throws JSONException { + + String dependentJson = "{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," + + "\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," + + "\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," + + "\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," + + "\"cycle\":\"day\",\"dateValue\":\"today\"}]}]}}"; + + + JSONObject taskNode = JSONUtils.parseObject(dependentJson); + if (StringUtils.isNotEmpty(taskNode.getString("type"))) { + String taskType = taskNode.getString("type"); + + exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + + JSONObject dependent = addTaskParam.addSpecialParam(taskNode); + + JSONAssert.assertEquals(taskNode.toString(),dependent.toString(),false); + } + + } +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java new file mode 100644 index 0000000000..db81138f43 --- /dev/null +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/exportprocess/DependentParamTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.utils.exportprocess; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.api.ApiApplicationServer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.json.JSONException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.skyscreamer.jsonassert.JSONAssert; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +/** + * DependentParamTest + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = ApiApplicationServer.class) +public class DependentParamTest { + + @Test + public void testAddDependentSpecialParam() throws JSONException { + + String sqlJson = "{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," + + "\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," + + "\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" + + ",\"localParams\":[],\"connParams\":\"\"," + + "\"preStatements\":[],\"postStatements\":[]}," + + "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + + "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," + + "\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + + "\"preTasks\":[\"dependent\"]}"; + + + JSONObject taskNode = JSONUtils.parseObject(sqlJson); + if (StringUtils.isNotEmpty(taskNode.getString("type"))) { + String taskType = taskNode.getString("type"); + + exportProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + + JSONObject sql = addTaskParam.addSpecialParam(taskNode); + + JSONAssert.assertEquals(taskNode.toString(),sql.toString(),false); + } + + } +} diff --git a/pom.xml b/pom.xml index e0d02b460b..09156b7b08 100644 --- a/pom.xml +++ b/pom.xml @@ -343,7 +343,6 @@ mysql mysql-connector-java ${mysql.connector.version} - test org.slf4j @@ -680,6 +679,8 @@ **/common/queue/*.java **/api/utils/CheckUtilsTest.java **/api/utils/FileUtilsTest.java + **/api/utils/exportprocess/DataSourceParamTest.java + **/api/utils/exportprocess/DependentParamTest.java **/api/enums/*.java **/api/service/AccessTokenServiceTest.java **/api/service/QueueServiceTest.java