Browse Source

#1544 fix bug: workflow import (#1676)

* modify FileUtils.readFile2Str

* #1300 Add right alignment function in sql email content

* cancel formatted for alert_mail_template.ftl

* #747 sql task password Log desensitization

* cancel mail_temple

* edit ExcelUtils

* modify test method name

* #747 sql task password Log desensitization

* #1544 workflow import

* Constants add DATASOURCE_PASSWORD_REGEX

* #747 sql task password Log desensitization

* deal with import project have sub process

* modify export process addTaskNodeParam method name

* add testAddTaskNodeSpecialParam UT

* add ProcessDefinitionServiceTest-ut to pom

* add testImportSubProcess in ProcessDefinitionServiceTest

* add testImportSubProcess in ProcessDefinitionServiceTest

* add testImportProcessDefinition
pull/2/head
Yelli 5 years ago committed by qiaozhanwei
parent
commit
26ed786c4d
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
  3. 314
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  4. 294
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  5. 1
      dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue
  6. 1
      pom.xml

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -460,7 +460,7 @@ public class ProcessDefinitionController extends BaseController{
}
}
if(deleteFailedIdList.size() > 0){
if(!deleteFailedIdList.isEmpty()){
putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
}else{
putMsg(result, Status.SUCCESS);

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java

@ -267,11 +267,12 @@ public class ProjectController extends BaseController {
})
@PostMapping(value="/import-definition")
public Result importProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("file") MultipartFile file){
@RequestParam("file") MultipartFile file,
@RequestParam("projectName") String projectName){
try{
logger.info("import process definition by id, login user:{}",
loginUser.getUserName());
Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser,file);
logger.info("import process definition by id, login user:{}, project: {}",
loginUser.getUserName(), projectName);
Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser, file, projectName);
return returnDataList(result);
}catch (Exception e){
logger.error(IMPORT_PROCESS_DEFINE_ERROR.getMsg(),e);

314
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -16,9 +16,18 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
@ -32,14 +41,6 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.slf4j.Logger;
@ -56,8 +57,10 @@ import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID;
@ -482,50 +485,21 @@ public class ProcessDefinitionService extends BaseDAGService {
* @param response response
*/
public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) {
//export project info
Project project = projectMapper.queryByName(projectName);
//check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus == Status.SUCCESS) {
//get workflow info
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
if (processDefinition != null) {
JSONObject jsonObject = JSONUtils.parseObject(processDefinition.getProcessDefinitionJson());
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject taskNode = jsonArray.getJSONObject(i);
if (taskNode.get("type") != null && taskNode.get("type") != "") {
String taskType = taskNode.getString("type");
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
if (dataSource != null) {
sqlParameters.put("datasourceName", dataSource.getName());
}
taskNode.put("params", sqlParameters);
}else if(taskType.equals(TaskType.DEPENDENT.name())){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
int definitionId = dependentItem.getInteger("definitionId");
ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
if(definition != null){
dependentItem.put("projectName",definition.getProjectName());
dependentItem.put("definitionName",definition.getName());
}
}
}
taskNode.put("dependence", dependentParameters);
}
}
}
}
jsonObject.put("tasks", jsonArray);
processDefinition.setProcessDefinitionJson(jsonObject.toString());
if (null != processDefinition) {
//correct task param which has data source or dependent param
String correctProcessDefinitionJson = addTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson());
processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson);
Map<String, Object> row = new LinkedHashMap<>();
row.put("projectName", processDefinition.getProjectName());
@ -535,8 +509,9 @@ public class ProcessDefinitionService extends BaseDAGService {
row.put("processDefinitionLocations", processDefinition.getLocations());
row.put("processDefinitionConnects", processDefinition.getConnects());
//schedule info
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
if (schedules.size() > 0) {
if (!schedules.isEmpty()) {
Schedule schedule = schedules.get(0);
row.put("scheduleWarningType", schedule.getWarningType());
row.put("scheduleWarningGroupId", schedule.getWarningGroupId());
@ -556,6 +531,8 @@ public class ProcessDefinitionService extends BaseDAGService {
}
}
//create workflow json file
String rowsJson = JSONUtils.toJsonString(row);
response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);
response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json");
@ -564,38 +541,136 @@ public class ProcessDefinitionService extends BaseDAGService {
try {
out = response.getOutputStream();
buff = new BufferedOutputStream(out);
buff.write(rowsJson.getBytes("UTF-8"));
buff.write(rowsJson.getBytes(StandardCharsets.UTF_8));
buff.flush();
buff.close();
} catch (IOException e) {
e.printStackTrace();
logger.warn("export process fail", e);
}finally {
if (null != buff) {
try {
buff.close();
} catch (Exception e) {
logger.warn("export process buffer not close", e);
}
}
if (null != out) {
try {
out.close();
} catch (Exception e) {
e.printStackTrace();
logger.warn("export process output stream not close", e);
}
}
}
}
}
}
/**
* correct task param which has datasource or dependent
* @param processDefinitionJson processDefinitionJson
* @return correct processDefinitionJson
*/
public String addTaskNodeSpecialParam(String processDefinitionJson) {
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject taskNode = jsonArray.getJSONObject(i);
if (StringUtils.isNotEmpty(taskNode.getString("type"))) {
String taskType = taskNode.getString("type");
if(checkTaskHasDataSource(taskType)){
// add sqlParameters
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
DataSource dataSource = dataSourceMapper.selectById((Integer) sqlParameters.get("datasource"));
if (null != dataSource) {
sqlParameters.put("datasourceName", dataSource.getName());
}
taskNode.put("params", sqlParameters);
}else if(checkTaskHasDependent(taskType)){
// add dependent param
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(null != dependentParameters){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
for (int j = 0; j < dependTaskList.size(); j++) {
JSONObject dependentTaskModel = dependTaskList.getJSONObject(j);
JSONArray dependItemList = (JSONArray) dependentTaskModel.get("dependItemList");
for (int k = 0; k < dependItemList.size(); k++) {
JSONObject dependentItem = dependItemList.getJSONObject(k);
int definitionId = dependentItem.getInteger("definitionId");
ProcessDefinition definition = processDefineMapper.queryByDefineId(definitionId);
if(null != definition){
dependentItem.put("projectName",definition.getProjectName());
dependentItem.put("definitionName",definition.getName());
}
}
}
taskNode.put("dependence", dependentParameters);
}
}
}
}
jsonObject.put("tasks", jsonArray);
return jsonObject.toString();
}
/**
* check task if has dependent
* @param taskType task type
* @return if task has dependent return true else false
*/
private boolean checkTaskHasDependent(String taskType) {
return taskType.equals(TaskType.DEPENDENT.name());
}
/**
* check task if has data source info
* @param taskType task type
* @return if task has data source return true else false
*/
private boolean checkTaskHasDataSource(String taskType) {
return taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name());
}
/**
* check task if has sub process
* @param taskType task type
* @return if task has sub process return true else false
*/
private boolean checkTaskHasSubProcess(String taskType) {
return taskType.equals(TaskType.SUB_PROCESS.name());
}
/**
* import process definition
* @param loginUser login user
* @param file process metadata json file
* @param currentProjectName current project name
* @return
*/
@Transactional(rollbackFor = Exception.class)
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file) {
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
Map<String, Object> result = new HashMap<>(5);
JSONObject json = null;
try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), "UTF-8" )) {
JSONObject json;
//read workflow json
try(InputStreamReader inputStreamReader = new InputStreamReader( file.getInputStream(), StandardCharsets.UTF_8)) {
BufferedReader streamReader = new BufferedReader(inputStreamReader);
StringBuilder respomseStrBuilder = new StringBuilder();
String inputStr = "";
String inputStr;
while ((inputStr = streamReader.readLine())!= null){
respomseStrBuilder.append( inputStr );
}
json = JSONObject.parseObject( respomseStrBuilder.toString() );
if(json != null){
String projectName = null;
if(null != json){
String originProjectName = null;
String processDefinitionName = null;
String processDefinitionJson = null;
String processDefinitionDesc = null;
@ -614,7 +689,7 @@ public class ProcessDefinitionService extends BaseDAGService {
String scheduleWorkerGroupName = null;
if (ObjectUtils.allNotNull(json.get("projectName"))) {
projectName = json.get("projectName").toString();
originProjectName = json.get("projectName").toString();
} else {
putMsg(result, Status.DATA_IS_NULL, "processDefinitionName");
return result;
@ -641,25 +716,33 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefinitionConnects = json.get("processDefinitionConnects").toString();
}
Project project = projectMapper.queryByName(projectName);
if(project != null){
processDefinitionName = recursionProcessDefinitionName(project.getId(), processDefinitionName, 1);
//check user access for org project
Project originProject = projectMapper.queryByName(originProjectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, originProject, originProjectName);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus == Status.SUCCESS) {
//use currentProjectName to query
Project targetProject = projectMapper.queryByName(currentProjectName);
if(null != targetProject){
processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), processDefinitionName, 1);
}
JSONObject jsonObject = JSONUtils.parseObject(processDefinitionJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
for (int j = 0; j < jsonArray.size(); j++) {
JSONObject taskNode = jsonArray.getJSONObject(j);
String taskType = taskNode.getString("type");
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())) {
if(checkTaskHasDataSource(taskType)) {
JSONObject sqlParameters = JSONUtils.parseObject(taskNode.getString("params"));
List<DataSource> dataSources = dataSourceMapper.queryDataSourceByName(sqlParameters.getString("datasourceName"));
if (dataSources.size() > 0) {
if (!dataSources.isEmpty()) {
DataSource dataSource = dataSources.get(0);
sqlParameters.put("datasource", dataSource.getId());
}
taskNode.put("params", sqlParameters);
}else if(taskType.equals(TaskType.DEPENDENT.name())){
}else if(checkTaskHasDependent(taskType)){
JSONObject dependentParameters = JSONUtils.parseObject(taskNode.getString("dependence"));
if(dependentParameters != null){
JSONArray dependTaskList = (JSONArray) dependentParameters.get("dependTaskList");
@ -682,9 +765,21 @@ public class ProcessDefinitionService extends BaseDAGService {
}
}
}
//recursive sub-process parameter correction map key for old process id value for new process id
Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
List<Object> subProcessList = jsonArray.stream()
.filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).getString("type")))
.collect(Collectors.toList());
if (!subProcessList.isEmpty()) {
importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap);
}
jsonObject.put("tasks", jsonArray);
Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,projectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
Map<String, Object> createProcessDefinitionResult = createProcessDefinition(loginUser,currentProjectName,processDefinitionName,jsonObject.toString(),processDefinitionDesc,processDefinitionLocations,processDefinitionConnects);
Integer processDefinitionId = null;
if (ObjectUtils.allNotNull(createProcessDefinitionResult.get("processDefinitionId"))) {
processDefinitionId = Integer.parseInt(createProcessDefinitionResult.get("processDefinitionId").toString());
@ -692,7 +787,7 @@ public class ProcessDefinitionService extends BaseDAGService {
if (ObjectUtils.allNotNull(json.get("scheduleCrontab")) && processDefinitionId != null) {
Date now = new Date();
Schedule scheduleObj = new Schedule();
scheduleObj.setProjectName(projectName);
scheduleObj.setProjectName(currentProjectName);
scheduleObj.setProcessDefinitionId(processDefinitionId);
scheduleObj.setProcessDefinitionName(processDefinitionName);
scheduleObj.setCreateTime(now);
@ -739,7 +834,7 @@ public class ProcessDefinitionService extends BaseDAGService {
if (ObjectUtils.allNotNull(json.get("scheduleWorkerGroupName"))) {
scheduleWorkerGroupName = json.get("scheduleWorkerGroupName").toString();
List<WorkerGroup> workerGroups = workerGroupMapper.queryWorkerGroupByName(scheduleWorkerGroupName);
if(workerGroups.size() > 0){
if(!workerGroups.isEmpty()){
scheduleObj.setWorkerGroupId(workerGroups.get(0).getId());
}
}
@ -747,17 +842,100 @@ public class ProcessDefinitionService extends BaseDAGService {
}
scheduleMapper.insert(scheduleObj);
}
putMsg(result, Status.SUCCESS);
return result;
}
}else{
putMsg(result, Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR);
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
return result;
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* check import process has sub process
* recursion create sub process
* @param loginUser login user
* @param targetProject target project
*/
public void importSubProcess(User loginUser, Project targetProject, JSONArray jsonArray, Map<Integer, Integer> subProcessIdMap) {
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject taskNode = jsonArray.getJSONObject(i);
String taskType = taskNode.getString("type");
if (checkTaskHasSubProcess(taskType)) {
//get sub process info
JSONObject subParams = JSONUtils.parseObject(taskNode.getString("params"));
Integer subProcessId = subParams.getInteger("processDefinitionId");
ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId);
String subProcessJson = subProcess.getProcessDefinitionJson();
//check current project has sub process
ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName());
if (null == currentProjectSubProcess) {
JSONArray subJsonArray = (JSONArray) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get("tasks");
List<Object> subProcessList = subJsonArray.stream()
.filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).getString("type")))
.collect(Collectors.toList());
if (!subProcessList.isEmpty()) {
importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap);
//sub process processId correct
if (!subProcessIdMap.isEmpty()) {
for (Map.Entry<Integer, Integer> entry : subProcessIdMap.entrySet()) {
String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey();
String newSubProcessId = "\"processDefinitionId\":" + entry.getValue();
subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId);
}
subProcessIdMap.clear();
}
}
//if sub-process recursion
Date now = new Date();
//create sub process in target project
ProcessDefinition processDefine = new ProcessDefinition();
processDefine.setName(subProcess.getName());
processDefine.setVersion(subProcess.getVersion());
processDefine.setReleaseState(subProcess.getReleaseState());
processDefine.setProjectId(targetProject.getId());
processDefine.setUserId(loginUser.getId());
processDefine.setProcessDefinitionJson(subProcessJson);
processDefine.setDescription(subProcess.getDescription());
processDefine.setLocations(subProcess.getLocations());
processDefine.setConnects(subProcess.getConnects());
processDefine.setTimeout(subProcess.getTimeout());
processDefine.setTenantId(subProcess.getTenantId());
processDefine.setGlobalParams(subProcess.getGlobalParams());
processDefine.setCreateTime(now);
processDefine.setUpdateTime(now);
processDefine.setFlag(subProcess.getFlag());
processDefine.setReceivers(subProcess.getReceivers());
processDefine.setReceiversCc(subProcess.getReceiversCc());
processDefineMapper.insert(processDefine);
logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName());
//modify task node
ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(),processDefine.getName());
if (null != newSubProcessDefine) {
subProcessIdMap.put(subProcessId, newSubProcessDefine.getId());
subParams.put("processDefinitionId", newSubProcessDefine.getId());
taskNode.put("params", subParams);
}
}
}
}
}
/**

294
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -16,38 +16,80 @@
*/
package org.apache.dolphinscheduler.api.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.User;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.http.entity.ContentType;
import org.json.JSONException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.skyscreamer.jsonassert.JSONAssert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
@RunWith(SpringRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
@SpringBootTest(classes = ApiApplicationServer.class)
public class ProcessDefinitionServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceTest.class);
@Autowired
@InjectMocks
ProcessDefinitionService processDefinitionService;
@Mock
private DataSourceMapper dataSourceMapper;
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectService projectService;
@Test
public void queryProccessDefinitionList() throws Exception {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProccessDefinitionList(loginUser,"project_test1");
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
@ -55,10 +97,20 @@ public class ProcessDefinitionServiceTest {
@Test
public void queryProcessDefinitionListPagingTest() throws Exception {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionListPaging(loginUser, "project_test1", "",1, 5,0);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
@ -67,13 +119,243 @@ public class ProcessDefinitionServiceTest {
@Test
public void deleteProcessDefinitionByIdTest() throws Exception {
String projectName = "project_test1";
Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName));
Project project = getProject(projectName);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionById(loginUser, "li_sql_test", 6);
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.PROJECT_NOT_FOUNT, projectName);
Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionById(loginUser, "project_test1", 6);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
/**
* add datasource param and dependent when export process
* @throws JSONException
*/
@Test
public void testAddTaskNodeSpecialParam() throws JSONException {
Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource());
Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition());
String sqlDependentJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SQL\",\"id\":\"tasks-27297\",\"name\":\"sql\"," +
"\"params\":{\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from test\"," +
"\"udfs\":\"\",\"sqlType\":\"1\",\"title\":\"\",\"receivers\":\"\",\"receiversCc\":\"\",\"showType\":\"TABLE\"" +
",\"localParams\":[],\"connParams\":\"\"," +
"\"preStatements\":[],\"postStatements\":[]}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\"," +
"\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[\"dependent\"]},{\"type\":\"DEPENDENT\",\"id\":\"tasks-33787\"," +
"\"name\":\"dependent\",\"params\":{},\"description\":\"\",\"runFlag\":\"NORMAL\"," +
"\"dependence\":{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\"," +
"\"dependItemList\":[{\"projectId\":2,\"definitionId\":46,\"depTasks\":\"ALL\"," +
"\"cycle\":\"day\",\"dateValue\":\"today\"}]}]},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
String corSqlDependentJson = processDefinitionService.addTaskNodeSpecialParam(sqlDependentJson);
JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false);
}
/**
* import sub process test
*/
@Test
public void testImportSubProcess() {
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
Project testProject = getProject("test");
//Recursive subprocess sub2 process in sub1 process and sub1process in top process
String topProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-38634\",\"name\":\"shell1\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}," +
"{\"type\":\"SUB_PROCESS\",\"id\":\"tasks-44207\",\"name\":\"shell-4\"," +
"\"params\":{\"processDefinitionId\":39},\"description\":\"\",\"runFlag\":\"NORMAL\"," +
"\"dependence\":{},\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[\"shell1\"]}],\"tenantId\":1,\"timeout\":0}";
String sub1ProcessJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-84090\"," +
"\"name\":\"shell-4\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-4\\\"\"}," +
"\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," +
"\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," +
"\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SUB_PROCESS\"," +
"\"id\":\"tasks-87364\",\"name\":\"shell-5\"," +
"\"params\":{\"processDefinitionId\":46},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{}," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," +
"\"workerGroupId\":-1,\"preTasks\":[\"shell-4\"]}],\"tenantId\":1,\"timeout\":0}";
String sub2ProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," +
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
JSONObject jsonObject = JSONUtils.parseObject(topProcessJson);
JSONArray jsonArray = (JSONArray) jsonObject.get("tasks");
String originSubJson = jsonArray.toString();
Map<Integer, Integer> subProcessIdMap = new HashMap<>(20);
ProcessDefinition shellDefinition1 = new ProcessDefinition();
shellDefinition1.setId(39);
shellDefinition1.setName("shell-4");
shellDefinition1.setProjectId(2);
shellDefinition1.setProcessDefinitionJson(sub1ProcessJson);
ProcessDefinition shellDefinition2 = new ProcessDefinition();
shellDefinition2.setId(46);
shellDefinition2.setName("shell-5");
shellDefinition2.setProjectId(2);
shellDefinition2.setProcessDefinitionJson(sub2ProcessJson);
Mockito.when(processDefineMapper.queryByDefineId(39)).thenReturn(shellDefinition1);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-5")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null);
Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2);
processDefinitionService.importSubProcess(loginUser,testProject,jsonArray,subProcessIdMap);
String correctSubJson = jsonArray.toString();
Assert.assertEquals(originSubJson, correctSubJson);
}
@Test
public void testImportProcessDefinitionById() throws IOException {
String processJson = "{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," +
"\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," +
"\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," +
"\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho \\\\\\\"shell-4\\\\\\\"\\\"," +
"\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}," +
"\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\"," +
"\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"}," +
"{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":-1," +
"\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\"," +
"\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46}," +
"\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\"," +
"\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\"," +
"\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\"," +
"\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," +
"\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\"," +
"\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}";
String subProcessJson = "{\"globalParams\":[]," +
"\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," +
"\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," +
"\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," +
"\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," +
"\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}";
FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson);
File file = new File("/tmp/task.json");
FileInputStream fileInputStream = new FileInputStream("/tmp/task.json");
MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(),
ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream);
User loginUser = new User();
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
String currentProjectName = "testProject";
Map<String, Object> result = new HashMap<>(5);
putMsg(result, Status.SUCCESS, currentProjectName);
ProcessDefinition shellDefinition2 = new ProcessDefinition();
shellDefinition2.setId(46);
shellDefinition2.setName("shell-5");
shellDefinition2.setProjectId(2);
shellDefinition2.setProcessDefinitionJson(subProcessJson);
Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName));
Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2);
//import process
Map<String, Object> importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName);
Assert.assertEquals(Status.SUCCESS, importProcessResult.get(Constants.STATUS));
boolean delete = file.delete();
Assert.assertTrue(delete);
}
/**
* get mock datasource
* @return DataSource
*/
private DataSource getDataSource(){
DataSource dataSource = new DataSource();
dataSource.setId(2);
dataSource.setName("test");
return dataSource;
}
/**
* get mock processDefinition
* @return ProcessDefinition
*/
private ProcessDefinition getProcessDefinition(){
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(46);
processDefinition.setName("testProject");
processDefinition.setProjectId(2);
return processDefinition;
}
/**
* get mock Project
* @param projectName projectName
* @return Project
*/
private Project getProject(String projectName){
Project project = new Project();
project.setId(1);
project.setName(projectName);
project.setUserId(1);
return project;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
}

1
dolphinscheduler-ui/src/js/module/components/fileUpdate/definitionUpdate.vue

@ -141,6 +141,7 @@
let self = this
let formData = new FormData()
formData.append('file', this.file)
formData.append('projectName',this.store.state.dag.projectName)
io.post(`projects/import-definition`, res => {
this.$message.success(res.msg)
resolve()

1
pom.xml

@ -675,6 +675,7 @@
<include>**/api/service/WorkerGroupServiceTest.java</include>
<include>**/api/service/AlertGroupServiceTest.java</include>
<include>**/api/service/ProjectServiceTest.java</include>
<include>**/api/service/ProcessDefinitionServiceTest.java</include>
<include>**/api/service/UdfFuncServiceTest.java</include>
<include>**/alert/utils/ExcelUtilsTest.java</include>
<include>**/alert/utils/FuncUtilsTest.java</include>

Loading…
Cancel
Save