From b08ba3461ea2e4611e12e9616bc1d087d72272f5 Mon Sep 17 00:00:00 2001 From: zixi0825 <649790970@qq.com> Date: Wed, 6 May 2020 16:33:45 +0800 Subject: [PATCH] process batch export and import #2501 (#2560) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Increase dataX environment variable, sslTrust default value (#2555) * add LoginTest license * Delete useless packages * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * e2e add project and workflow case * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group in init.sql * Update worker_group_id to worker_group * Increase dataX environment variable, sslTrust default value Co-authored-by: chenxingchun <438044805@qq.com> * no valid worker group,master can kill task directly (#2541) * dispatch task fail will set task status failed * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,no worker condition , master will while ture wait for worker startup 2,worker response task status sync wait for result * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * 1,task status statistics and process status statistics bug fix (#2357) 2,worker group bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * send mail error, #2466 bug fix * #2486 bug fix * host and workergroup compatible * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * EnterpriseWeChatUtils modify * #2499 bug fix * add comment * revert comment * revert comment * #2499 buf fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * #2499 bug fix * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly * no valid worker group,master can kill task directly Co-authored-by: qiaozhanwei * add batch import and export process definition * fix unit test bug * add unit test * add unit test 2 * fix duplicated code problem * fix duplicated code problem 2 * fix duplicated code problem 3 * fix duplicated code problem 4 * fix the vulnerability bug * fix the vulnerability bug * fix the code smells * fix the code smells 1 * fix the code smells 2 * fix the code smells 3 Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com> Co-authored-by: chenxingchun <438044805@qq.com> Co-authored-by: qiaozhanwei Co-authored-by: qiaozhanwei Co-authored-by: sunchaohe --- .../ProcessDefinitionController.java | 24 +- .../dolphinscheduler/api/enums/Status.java | 5 +- .../api/service/ProcessDefinitionService.java | 263 +++++++++++++----- .../main/resources/i18n/messages.properties | 2 + .../resources/i18n/messages_en_US.properties | 2 + .../resources/i18n/messages_zh_CN.properties | 3 + .../ProcessDefinitionControllerTest.java | 63 ++++- .../exceptions/ApiExceptionHandlerTest.java | 2 +- .../service/ProcessDefinitionServiceTest.java | 112 +++++--- .../src/main/resources/datasource.properties | 9 +- .../definition/pages/list/_source/list.vue | 34 ++- .../src/js/conf/home/store/dag/actions.js | 7 +- 12 files changed, 378 insertions(+), 148 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 4f3dafdf27..6b539d01b1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -449,30 +449,30 @@ public class ProcessDefinitionController extends BaseController { } /** - * export process definition by id + * batch export process definition by ids * * @param loginUser login user * @param projectName project name - * @param processDefinitionId process definition id + * @param processDefinitionIds process definition ids * @param response response */ - @ApiOperation(value = "exportProcessDefinitionById", notes= "EXPORT_PROCESS_DEFINITION_BY_ID_NOTES") + @ApiOperation(value = "batchExportProcessDefinitionByIds", notes= "BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + @ApiImplicitParam(name = "processDefinitionIds", value = "PROCESS_DEFINITION_ID", required = true, dataType = "String") }) @GetMapping(value = "/export") @ResponseBody - public void exportProcessDefinitionById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, - @RequestParam("processDefinitionId") Integer processDefinitionId, - HttpServletResponse response) { + public void batchExportProcessDefinitionByIds(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam("processDefinitionIds") String processDefinitionIds, + HttpServletResponse response) { try { - logger.info("export process definition by id, login user:{}, project name:{}, process definition id:{}", - loginUser.getUserName(), projectName, processDefinitionId); - processDefinitionService.exportProcessDefinitionById(loginUser, projectName, processDefinitionId, response); + logger.info("batch export process definition by ids, login user:{}, project name:{}, process definition ids:{}", + loginUser.getUserName(), projectName, processDefinitionIds); + processDefinitionService.batchExportProcessDefinitionByIds(loginUser, projectName, processDefinitionIds, response); } catch (Exception e) { - logger.error(Status.EXPORT_PROCESS_DEFINE_BY_ID_ERROR.getMsg(), e); + logger.error(Status.BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR.getMsg(), e); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 8c52dd4d50..b3d10da288 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -214,8 +214,8 @@ public enum Status { EXECUTE_PROCESS_INSTANCE_ERROR(50015,"execute process instance error", "操作工作流实例错误"), CHECK_PROCESS_DEFINITION_ERROR(50016,"check process definition error", "检查工作流实例错误"), QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR(50017,"query recipients and copyers by process definition error", "查询收件人和抄送人错误"), - DATA_IS_NOT_VALID(50017,"data %s not valid", "数据[%s]无效"), - DATA_IS_NULL(50018,"data %s is null", "数据[%s]不能为空"), + DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), + DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), @@ -226,6 +226,7 @@ public enum Status { BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026,"batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027,"there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028,"export process definition by id error", "导出工作流定义错误"), + BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029,"import process definition error", "导入工作流定义错误"), HDFS_NOT_STARTUP(60001,"hdfs not startup", "hdfs未启用"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 14cadbf189..4fdafa46fa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -563,14 +563,18 @@ public class ProcessDefinitionService extends BaseDAGService { } /** - * export process definition by id - * - * @param loginUser login user - * @param projectName project name - * @param processDefinitionId process definition id - * @param response response + * batch export process definition by ids + * @param loginUser + * @param projectName + * @param processDefinitionIds + * @param response */ - public void exportProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId, HttpServletResponse response) { + public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response){ + + if(StringUtils.isEmpty(processDefinitionIds)){ + return; + } + //export project info Project project = projectMapper.queryByName(projectName); @@ -578,39 +582,68 @@ public class ProcessDefinitionService extends BaseDAGService { Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus == Status.SUCCESS) { + if(resultStatus != Status.SUCCESS){ + return; + } + + List processDefinitionList = + getProcessDefinitionList(processDefinitionIds); + + if(CollectionUtils.isNotEmpty(processDefinitionList)){ + downloadProcessDefinitionFile(response, processDefinitionList); + } + } + + /** + * get process definition list by ids + * @param processDefinitionIds + * @return + */ + private List getProcessDefinitionList(String processDefinitionIds){ + List processDefinitionList = new ArrayList<>(); + String[] processDefinitionIdArray = processDefinitionIds.split(","); + for (String strProcessDefinitionId : processDefinitionIdArray) { //get workflow info + int processDefinitionId = Integer.parseInt(strProcessDefinitionId); ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); - if (null != processDefinition) { - String exportProcessJson = exportProcessMetaDataStr(processDefinitionId, processDefinition); - response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); - response.setHeader("Content-Disposition", "attachment;filename="+processDefinition.getName()+".json"); - BufferedOutputStream buff = null; - ServletOutputStream out = null; + processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); + } + } + + return processDefinitionList; + } + + /** + * download the process definition file + * @param response + * @param processDefinitionList + */ + private void downloadProcessDefinitionFile(HttpServletResponse response, List processDefinitionList) { + response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); + BufferedOutputStream buff = null; + ServletOutputStream out = null; + try { + out = response.getOutputStream(); + buff = new BufferedOutputStream(out); + buff.write(JSON.toJSONString(processDefinitionList).getBytes(StandardCharsets.UTF_8)); + buff.flush(); + buff.close(); + } catch (IOException e) { + logger.warn("export process fail", e); + }finally { + if (null != buff) { try { - out = response.getOutputStream(); - buff = new BufferedOutputStream(out); - buff.write(exportProcessJson.getBytes(StandardCharsets.UTF_8)); - buff.flush(); buff.close(); - } catch (IOException e) { - logger.warn("export process fail", e); - }finally { - if (null != buff) { - try { - buff.close(); - } catch (Exception e) { - logger.warn("export process buffer not close", e); - } - } - if (null != out) { - try { - out.close(); - } catch (Exception e) { - logger.warn("export process output stream not close", e); - } - } + } catch (Exception e) { + logger.warn("export process buffer not close", e); + } + } + if (null != out) { + try { + out.close(); + } catch (Exception e) { + logger.warn("export process output stream not close", e); } } } @@ -623,6 +656,17 @@ public class ProcessDefinitionService extends BaseDAGService { * @return export process metadata string */ public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { + //create workflow json file + return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId,processDefinition)); + } + + /** + * get export process metadata string + * @param processDefinitionId process definition id + * @param processDefinition process definition + * @return export process metadata string + */ + public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { //correct task param which has data source or dependent param String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); @@ -639,14 +683,6 @@ public class ProcessDefinitionService extends BaseDAGService { List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); if (!schedules.isEmpty()) { Schedule schedule = schedules.get(0); - /*WorkerGroup workerGroup = workerGroupMapper.selectById(schedule.getWorkerGroupId()); - - if (null == workerGroup && schedule.getWorkerGroupId() == -1) { - workerGroup = new WorkerGroup(); - workerGroup.setId(-1); - workerGroup.setName(""); - }*/ - exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); @@ -658,7 +694,7 @@ public class ProcessDefinitionService extends BaseDAGService { exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup()); } //create workflow json file - return JSONUtils.toJsonString(exportProcessMeta); + return exportProcessMeta; } /** @@ -705,24 +741,36 @@ public class ProcessDefinitionService extends BaseDAGService { public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { Map result = new HashMap<>(5); String processMetaJson = FileUtils.file2String(file); - ProcessMeta processMeta = JSONUtils.parseObject(processMetaJson, ProcessMeta.class); + List processMetaList = JSON.parseArray(processMetaJson,ProcessMeta.class); //check file content - if (null == processMeta) { + if (CollectionUtils.isEmpty(processMetaList)) { putMsg(result, Status.DATA_IS_NULL, "fileContent"); return result; } - if (StringUtils.isEmpty(processMeta.getProjectName())) { - putMsg(result, Status.DATA_IS_NULL, "projectName"); - return result; - } - if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); - return result; + + for(ProcessMeta processMeta:processMetaList){ + + if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)){ + return result; + } } - if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); - return result; + + return result; + } + + /** + * check and import process definition + * @param loginUser + * @param currentProjectName + * @param result + * @param processMeta + * @return + */ + private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map result, ProcessMeta processMeta) { + + if(!checkImportanceParams(processMeta,result)){ + return false; } //deal with process name @@ -734,31 +782,84 @@ public class ProcessDefinitionService extends BaseDAGService { processDefinitionName, 1); } - //add special task param - String importProcessParam = addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject); + // get create process result + Map createProcessResult = + getCreateProcessResult(loginUser, + currentProjectName, + result, + processMeta, + processDefinitionName, + addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); + + if(createProcessResult == null){ + return false; + } + + //create process definition + Integer processDefinitionId = + Objects.isNull(createProcessResult.get("processDefinitionId"))? + null:Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); + + //scheduler param + return getImportProcessScheduleResult(loginUser, + currentProjectName, + result, + processMeta, + processDefinitionName, + processDefinitionId); + + } - Map createProcessResult; + /** + * get create process result + * @param loginUser + * @param currentProjectName + * @param result + * @param processMeta + * @param processDefinitionName + * @param importProcessParam + * @return + */ + private Map getCreateProcessResult(User loginUser, + String currentProjectName, + Map result, + ProcessMeta processMeta, + String processDefinitionName, + String importProcessParam){ + Map createProcessResult = null; try { createProcessResult = createProcessDefinition(loginUser ,currentProjectName, - processDefinitionName, + processDefinitionName+"_import_"+System.currentTimeMillis(), importProcessParam, processMeta.getProcessDefinitionDescription(), processMeta.getProcessDefinitionLocations(), processMeta.getProcessDefinitionConnects()); + putMsg(result, Status.SUCCESS); } catch (JsonProcessingException e) { logger.error("import process meta json data: {}", e.getMessage(), e); putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); - return result; } - putMsg(result, Status.SUCCESS); - //create process definition - Integer processDefinitionId = null; - if (null != createProcessResult && Objects.nonNull(createProcessResult.get("processDefinitionId"))) { - processDefinitionId = Integer.parseInt(createProcessResult.get("processDefinitionId").toString()); - } - //scheduler param + return createProcessResult; + } + + /** + * get import process schedule result + * @param loginUser + * @param currentProjectName + * @param result + * @param processMeta + * @param processDefinitionName + * @param processDefinitionId + * @return + */ + private boolean getImportProcessScheduleResult(User loginUser, + String currentProjectName, + Map result, + ProcessMeta processMeta, + String processDefinitionName, + Integer processDefinitionId) { if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) { int scheduleInsert = importProcessSchedule(loginUser, currentProjectName, @@ -768,11 +869,33 @@ public class ProcessDefinitionService extends BaseDAGService { if (0 == scheduleInsert) { putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); - return result; + return false; } } + return true; + } - return result; + /** + * check importance params + * @param processMeta + * @param result + * @return + */ + private boolean checkImportanceParams(ProcessMeta processMeta,Map result){ + if (StringUtils.isEmpty(processMeta.getProjectName())) { + putMsg(result, Status.DATA_IS_NULL, "projectName"); + return false; + } + if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { + putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); + return false; + } + if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { + putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); + return false; + } + + return true; } /** diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index 369e5e3c72..c8e48ad865 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -252,3 +252,5 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging +EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id +BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index 92df742613..0669e8d8cf 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -252,3 +252,5 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source DELETE_SCHEDULER_BY_ID_NOTES=delete scheduler by id QUERY_ALERT_GROUP_LIST_PAGING_NOTES=query alert group list paging +EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=export process definition by id +BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES= batch export process definition by ids diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 3b427912b5..9053b0924c 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -250,3 +250,6 @@ UNAUTHORIZED_DATA_SOURCE_NOTES=未授权的数据源 AUTHORIZED_DATA_SOURCE_NOTES=授权的数据源 DELETE_SCHEDULER_BY_ID_NOTES=根据定时id删除定时数据 QUERY_ALERT_GROUP_LIST_PAGING_NOTES=分页查询告警组列表 +EXPORT_PROCESS_DEFINITION_BY_ID_NOTES=通过工作流ID导出工作流定义 +BATCH_EXPORT_PROCESS_DEFINITION_BY_IDS_NOTES=批量导出工作流定义 + diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index a69df9744e..8c0d04c6c6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -18,12 +18,14 @@ package org.apache.dolphinscheduler.api.controller; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.User; import org.junit.*; import org.junit.runner.RunWith; @@ -33,6 +35,8 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.mock.web.MockHttpServletResponse; +import javax.servlet.http.HttpServletResponse; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashMap; @@ -111,7 +115,7 @@ public class ProcessDefinitionControllerTest{ } @Test - public void UpdateProcessDefinition() throws Exception { + public void updateProcessDefinition() throws Exception { String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; @@ -287,4 +291,61 @@ public class ProcessDefinitionControllerTest{ Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); } + + @Test + public void testQueryProcessDefinitionAllByProjectId() throws Exception{ + int projectId = 1; + Map result = new HashMap<>(); + putMsg(result,Status.SUCCESS); + + Mockito.when(processDefinitionService.queryProcessDefinitionAllByProjectId(projectId)).thenReturn(result); + Result response = processDefinitionController.queryProcessDefinitionAllByProjectId(user,projectId); + + Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); + } + + @Test + public void testViewTree() throws Exception{ + String projectName = "test"; + int processId = 1; + int limit = 2; + Map result = new HashMap<>(); + putMsg(result,Status.SUCCESS); + + Mockito.when(processDefinitionService.viewTree(processId,limit)).thenReturn(result); + Result response = processDefinitionController.viewTree(user,projectName,processId,limit); + + Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); + } + + @Test + public void testQueryProcessDefinitionListPaging() throws Exception{ + String projectName = "test"; + int pageNo = 1; + int pageSize = 10; + String searchVal = ""; + int userId = 1; + + Map result = new HashMap<>(); + putMsg(result,Status.SUCCESS); + result.put(Constants.DATA_LIST,new PageInfo(1,10)); + + Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user,projectName, searchVal, pageNo, pageSize, userId)).thenReturn(result); + Result response = processDefinitionController.queryProcessDefinitionListPaging(user,projectName,pageNo,searchVal,userId,pageSize); + + Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); + } + + @Test + public void testBatchExportProcessDefinitionByIds() throws Exception{ + + String processDefinitionIds = "1,2"; + String projectName = "test"; + HttpServletResponse response = new MockHttpServletResponse(); + ProcessDefinitionService service = new ProcessDefinitionService(); + ProcessDefinitionService spy = Mockito.spy(service); + Mockito.doNothing().when(spy).batchExportProcessDefinitionByIds(user, projectName, processDefinitionIds, response); + processDefinitionController.batchExportProcessDefinitionByIds(user, projectName, processDefinitionIds, response); + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandlerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandlerTest.java index 95cd96d08e..c00b1b0613 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandlerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/exceptions/ApiExceptionHandlerTest.java @@ -46,7 +46,7 @@ public class ApiExceptionHandlerTest { public void exceptionHandlerRuntime() throws NoSuchMethodException { ApiExceptionHandler handler = new ApiExceptionHandler(); ProcessDefinitionController controller = new ProcessDefinitionController(); - Method method = controller.getClass().getMethod("exportProcessDefinitionById", User.class, String.class, Integer.class, HttpServletResponse.class); + Method method = controller.getClass().getMethod("batchExportProcessDefinitionByIds", User.class, String.class, String.class, HttpServletResponse.class); HandlerMethod hm = new HandlerMethod(controller, method); Result result = handler.exceptionHandler(new RuntimeException("test exception"), hm); Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(),result.getCode().intValue()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 5a03cdb268..edf4ef7b97 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -667,32 +667,60 @@ public class ProcessDefinitionServiceTest { } + @Test + public void testCreateProcess() throws IOException{ + + String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; + String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; + + String projectName = "test"; + String name = "dag_test"; + String description = "desc test"; + String connects = "[]"; + Map result = new HashMap<>(5); + putMsg(result, Status.SUCCESS); + result.put("processDefinitionId",1); + + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); + User loginUser = new User(); + loginUser.setId(1); + loginUser.setUserType(UserType.ADMIN_USER); + Project project = getProject(projectName); + + //project not found + Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); + Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1); + Map result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects); + + Assert.assertEquals(Status.SUCCESS,result1.get(Constants.STATUS)); + } + @Test public void testImportProcessDefinitionById() throws IOException { - String processJson = "{\"projectName\":\"testProject\",\"processDefinitionName\":\"shell-4\"," + - "\"processDefinitionJson\":\"{\\\"tenantId\\\":1,\\\"globalParams\\\":[]," + - "\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"description\\\":\\\"\\\",\\\"runFlag\\\":\\\"NORMAL\\\"," + - "\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"#!/bin/bash\\\\necho \\\\\\\"shell-4\\\\\\\"\\\"," + - "\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}," + - "\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-4\\\"," + - "\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-84090\\\"}," + - "{\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"shell-5\\\",\\\"workerGroupId\\\":-1," + - "\\\"description\\\":\\\"\\\",\\\"dependence\\\":{},\\\"preTasks\\\":[\\\"shell-4\\\"],\\\"id\\\":\\\"tasks-87364\\\"," + - "\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SUB_PROCESS\\\",\\\"params\\\":{\\\"processDefinitionId\\\":46}," + - "\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}}],\\\"timeout\\\":0}\"," + - "\"processDefinitionDescription\":\"\",\"processDefinitionLocations\":\"{\\\"tasks-84090\\\":{\\\"name\\\":\\\"shell-4\\\"," + - "\\\"targetarr\\\":\\\"\\\",\\\"x\\\":128,\\\"y\\\":114},\\\"tasks-87364\\\":{\\\"name\\\":\\\"shell-5\\\"," + - "\\\"targetarr\\\":\\\"tasks-84090\\\",\\\"x\\\":266,\\\"y\\\":115}}\"," + - "\"processDefinitionConnects\":\"[{\\\"endPointSourceId\\\":\\\"tasks-84090\\\"," + - "\\\"endPointTargetId\\\":\\\"tasks-87364\\\"}]\"}"; - - String subProcessJson = "{\"globalParams\":[]," + - "\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," + - "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," + - "\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + - "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + - "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; + String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; + String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; + + String projectName = "test"; + String name = "dag_test"; + String description = "desc test"; + String connects = "[]"; + Map result = new HashMap<>(5); + putMsg(result, Status.SUCCESS); + result.put("processDefinitionId",1); + + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); + User loginUser = new User(); + loginUser.setId(1); + loginUser.setUserType(UserType.ADMIN_USER); + Project project = getProject(projectName); + + //project not found + Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); + Mockito.when(processDefineMapper.insert(getProcessDefinition())).thenReturn(1); + Map result1 = processDefinitionService.createProcessDefinition(loginUser,projectName,name,json,description,locations,connects); + + String processJson = "[{\"processDefinitionConnects\":\"[]\",\"processDefinitionJson\":\"{\\\"tenantId\\\":-1,\\\"globalParams\\\":[],\\\"tasks\\\":[{\\\"workerGroupId\\\":-1,\\\"runFlag\\\":\\\"NORMAL\\\",\\\"type\\\":\\\"SHELL\\\",\\\"params\\\":{\\\"rawScript\\\":\\\"aa=\\\\\\\"1234\\\\\\\"\\\\necho ${aa}\\\",\\\"localParams\\\":[],\\\"resourceList\\\":[]},\\\"timeout\\\":{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"},\\\"maxRetryTimes\\\":\\\"0\\\",\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"name\\\":\\\"ssh_test1\\\",\\\"dependence\\\":{},\\\"retryInterval\\\":\\\"1\\\",\\\"preTasks\\\":[],\\\"id\\\":\\\"tasks-36196\\\",\\\"desc\\\":\\\"\\\"}],\\\"timeout\\\":0}\",\"processDefinitionLocations\":\"{\\\"tasks-36196\\\":{\\\"name\\\":\\\"ssh_test1\\\",\\\"targetarr\\\":\\\"\\\",\\\"x\\\":141,\\\"y\\\":70}}\",\"processDefinitionName\":\"dag_test\",\"projectName\":\"test\"}]"; FileUtils.writeStringToFile(new File("/tmp/task.json"),processJson); @@ -703,23 +731,16 @@ public class ProcessDefinitionServiceTest { MultipartFile multipartFile = new MockMultipartFile(file.getName(), file.getName(), ContentType.APPLICATION_OCTET_STREAM.toString(), fileInputStream); - User loginUser = new User(); - loginUser.setId(1); - loginUser.setUserType(UserType.ADMIN_USER); - - String currentProjectName = "testProject"; - Map result = new HashMap<>(5); - putMsg(result, Status.SUCCESS, currentProjectName); + String currentProjectName = "test"; ProcessDefinition shellDefinition2 = new ProcessDefinition(); - shellDefinition2.setId(46); - shellDefinition2.setName("shell-5"); - shellDefinition2.setProjectId(2); - shellDefinition2.setProcessDefinitionJson(subProcessJson); + shellDefinition2.setId(25); + shellDefinition2.setName("B"); + shellDefinition2.setProjectId(1); Mockito.when(projectMapper.queryByName(currentProjectName)).thenReturn(getProject(currentProjectName)); Mockito.when(projectService.checkProjectAndAuth(loginUser, getProject(currentProjectName), currentProjectName)).thenReturn(result); - Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2); + Mockito.when(processDefineMapper.queryByDefineId(25)).thenReturn(shellDefinition2); //import process Map importProcessResult = processDefinitionService.importProcessDefinition(loginUser, multipartFile, currentProjectName); @@ -730,18 +751,17 @@ public class ProcessDefinitionServiceTest { Assert.assertTrue(delete); - String processMetaJson = ""; - improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); - - processMetaJson = "{\"scheduleWorkerGroupId\":-1}"; - improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); - - processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}"; - improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + String processMetaJson = "[]"; + importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); +// + processMetaJson = "[{\"scheduleWorkerGroupId\":-1}]"; + importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); - processMetaJson = "{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}"; - improssProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\"}]"; + importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); + processMetaJson = "[{\"scheduleWorkerGroupId\":-1,\"projectName\":\"test\",\"processDefinitionName\":\"test_definition\"}]"; + importProcessCheckData(file, loginUser, currentProjectName, processMetaJson); } @@ -753,7 +773,7 @@ public class ProcessDefinitionServiceTest { * @param processMetaJson process meta json * @throws IOException IO exception */ - private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { + private void importProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { //check null FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson); diff --git a/dolphinscheduler-dao/src/main/resources/datasource.properties b/dolphinscheduler-dao/src/main/resources/datasource.properties index 2f28ca2b0b..984aa17d57 100644 --- a/dolphinscheduler-dao/src/main/resources/datasource.properties +++ b/dolphinscheduler-dao/src/main/resources/datasource.properties @@ -14,17 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. # - - -# postgre -#spring.datasource.driver-class-name=org.postgresql.Driver -#spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler -# mysql + +# postgresql spring.datasource.driver-class-name=org.postgresql.Driver spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler spring.datasource.username=test spring.datasource.password=test +# mysql # connection configuration #spring.datasource.initialSize=5 # min connection number diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue index 1a090dc5e4..f57b688cf0 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue @@ -123,7 +123,7 @@ @@ -137,6 +137,10 @@ + +