From effbad4e206fc79f5aa66b65443b2fed1927879a Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Sat, 21 Aug 2021 12:02:45 +0800 Subject: [PATCH] [Feature][JsonSplit-api] api of ProcessDefinition/TaskDefinition (#6010) * fix api run error * fix ut * api of ProcessDefinition/TaskDefinition Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../ProcessDefinitionController.java | 160 +++++++++--------- .../controller/TaskDefinitionController.java | 100 +++++++---- .../dolphinscheduler/api/enums/Status.java | 6 +- .../api/service/ProcessDefinitionService.java | 83 +++++---- .../api/service/TaskDefinitionService.java | 30 +++- .../impl/ProcessDefinitionServiceImpl.java | 160 +++++++++--------- .../impl/TaskDefinitionServiceImpl.java | 144 ++++++++++------ .../ProcessDefinitionControllerTest.java | 60 ++++--- .../service/ProcessDefinitionServiceTest.java | 41 ++--- .../TaskDefinitionServiceImplTest.java | 27 +-- .../dao/entity/TaskDefinition.java | 81 ++++++--- .../dao/entity/TaskDefinitionLog.java | 10 ++ 12 files changed, 513 insertions(+), 389 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index a4a31fd61c..82608e206a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -18,10 +18,10 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.BATCH_COPY_PROCESS_DEFINITION_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.BATCH_MOVE_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_DEFINITION; -import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINITION_VERSION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR; @@ -69,8 +69,6 @@ import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; -import com.fasterxml.jackson.core.JsonProcessingException; - import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -103,6 +101,7 @@ public class ProcessDefinitionController extends BaseController { * @param timeout timeout * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson * @return create result code */ @ApiOperation(value = "save", notes = "CREATE_PROCESS_DEFINITION_NOTES") @@ -123,10 +122,10 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "locations", required = false) String locations, @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, @RequestParam(value = "tenantCode", required = true) String tenantCode, - @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson) throws JsonProcessingException { - + @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson, + @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) { Map result = processDefinitionService.createProcessDefinition(loginUser, projectCode, name, description, globalParams, - locations, timeout, tenantCode, taskRelationJson); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); return returnDataList(result); } @@ -135,13 +134,13 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionCodes process definition codes + * @param codes process definition codes * @param targetProjectCode target project code * @return copy result code */ @ApiOperation(value = "copy", notes = "COPY_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"), + @ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"), @ApiImplicitParam(name = "targetProjectCode", value = "TARGET_PROJECT_CODE", required = true, dataType = "Long", example = "123") }) @PostMapping(value = "/copy") @@ -150,9 +149,9 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result copyProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "processDefinitionCodes", required = true) String processDefinitionCodes, + @RequestParam(value = "codes", required = true) String codes, @RequestParam(value = "targetProjectCode", required = true) long targetProjectCode) { - return returnDataList(processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, processDefinitionCodes, targetProjectCode)); + return returnDataList(processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, codes, targetProjectCode)); } /** @@ -160,13 +159,13 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionCodes process definition codes + * @param codes process definition codes * @param targetProjectCode target project code * @return move result code */ @ApiOperation(value = "moveProcessDefinition", notes = "MOVE_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"), + @ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"), @ApiImplicitParam(name = "targetProjectCode", value = "TARGET_PROJECT_CODE", required = true, dataType = "Long", example = "123") }) @PostMapping(value = "/move") @@ -175,9 +174,9 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result moveProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "processDefinitionCodes", required = true) String processDefinitionCodes, + @RequestParam(value = "codes", required = true) String codes, @RequestParam(value = "targetProjectCode", required = true) long targetProjectCode) { - return returnDataList(processDefinitionService.batchMoveProcessDefinition(loginUser, projectCode, processDefinitionCodes, targetProjectCode)); + return returnDataList(processDefinitionService.batchMoveProcessDefinition(loginUser, projectCode, codes, targetProjectCode)); } /** @@ -216,9 +215,9 @@ public class ProcessDefinitionController extends BaseController { * @param timeout timeout * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson * @return update result code */ - @ApiOperation(value = "update", notes = "UPDATE_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"), @@ -241,10 +240,11 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, @RequestParam(value = "tenantCode", required = true) String tenantCode, @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson, + @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson, @RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) { Map result = processDefinitionService.updateProcessDefinition(loginUser, projectCode, name, code, description, globalParams, - locations, timeout, tenantCode, taskRelationJson); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); // If the update fails, the result will be returned directly if (result.get(Constants.STATUS) != Status.SUCCESS) { return returnDataList(result); @@ -264,14 +264,14 @@ public class ProcessDefinitionController extends BaseController { * @param projectCode project code * @param pageNo the process definition version list current page number * @param pageSize the process definition version list page size - * @param processDefinitionCode the process definition code + * @param code the process definition code * @return the process definition version list */ @ApiOperation(value = "queryVersions", notes = "QUERY_PROCESS_DEFINITION_VERSIONS_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"), - @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1") + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1") }) @GetMapping(value = "/versions") @ResponseStatus(HttpStatus.OK) @@ -281,13 +281,13 @@ public class ProcessDefinitionController extends BaseController { @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam(value = "pageNo") int pageNo, @RequestParam(value = "pageSize") int pageSize, - @RequestParam(value = "processDefinitionCode") long processDefinitionCode) { + @RequestParam(value = "code") long code) { Result result = checkPageParams(pageNo, pageSize); if (!result.checkResult()) { return result; } - result = processDefinitionService.queryProcessDefinitionVersions(loginUser, projectCode, pageNo, pageSize, processDefinitionCode); + result = processDefinitionService.queryProcessDefinitionVersions(loginUser, projectCode, pageNo, pageSize, code); return result; } @@ -297,14 +297,14 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user info * @param projectCode project code - * @param processDefinitionId the process definition id + * @param code the process definition code * @param version the version user want to switch * @return switch version result code */ @ApiOperation(value = "switchVersion", notes = "SWITCH_PROCESS_DEFINITION_VERSION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Long", example = "100") + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), + @ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100") }) @GetMapping(value = "/version/switch") @ResponseStatus(HttpStatus.OK) @@ -312,25 +312,25 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result switchProcessDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "processDefinitionId") int processDefinitionId, + @RequestParam(value = "code") long code, @RequestParam(value = "version") int version) { - Map result = processDefinitionService.switchProcessDefinitionVersion(loginUser, projectCode, processDefinitionId, version); + Map result = processDefinitionService.switchProcessDefinitionVersion(loginUser, projectCode, code, version); return returnDataList(result); } /** - * delete the certain process definition version by version and process definition id + * delete the certain process definition version by version and process definition code * * @param loginUser login user info * @param projectCode project code - * @param processDefinitionId process definition id + * @param code the process definition code * @param version the process definition version user want to delete * @return delete version result code */ @ApiOperation(value = "deleteVersion", notes = "DELETE_PROCESS_DEFINITION_VERSION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Long", example = "100") + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), + @ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100") }) @GetMapping(value = "/version/delete") @ResponseStatus(HttpStatus.OK) @@ -338,9 +338,9 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result deleteProcessDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "processDefinitionId") int processDefinitionId, + @RequestParam(value = "code") long code, @RequestParam(value = "version") int version) { - Map result = processDefinitionService.deleteByProcessDefinitionIdAndVersion(loginUser, projectCode, processDefinitionId, version); + Map result = processDefinitionService.deleteProcessDefinitionVersion(loginUser, projectCode, code, version); return returnDataList(result); } @@ -399,12 +399,12 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionName process definition name + * @param name process definition name * @return process definition detail */ @ApiOperation(value = "queryProcessDefinitionByName", notes = "QUERY_PROCESS_DEFINITION_BY_NAME_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionName", value = "PROCESS_DEFINITION_NAME", required = true, dataType = "String") + @ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, dataType = "String") }) @GetMapping(value = "/select-by-name") @ResponseStatus(HttpStatus.OK) @@ -412,8 +412,8 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("processDefinitionName") String processDefinitionName) { - Map result = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, processDefinitionName); + @RequestParam("name") String name) { + Map result = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, name); return returnDataList(result); } @@ -448,9 +448,9 @@ public class ProcessDefinitionController extends BaseController { */ @ApiOperation(value = "queryListPaging", notes = "QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"), @ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10") }) @GetMapping(value = "/list-paging") @@ -459,9 +459,9 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryProcessDefinitionListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("pageNo") Integer pageNo, @RequestParam(value = "searchVal", required = false) String searchVal, @RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId, + @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize) { Result result = checkPageParams(pageNo, pageSize); if (!result.checkResult()) { @@ -469,7 +469,7 @@ public class ProcessDefinitionController extends BaseController { } searchVal = ParameterUtils.handleEscapes(searchVal); - return processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, searchVal, pageNo, pageSize, userId); + return processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, searchVal, userId, pageNo, pageSize); } /** @@ -493,7 +493,7 @@ public class ProcessDefinitionController extends BaseController { public Result viewTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam("code") long code, - @RequestParam("limit") Integer limit) throws Exception { + @RequestParam("limit") Integer limit) { Map result = processDefinitionService.viewTree(code, limit); return returnDataList(result); } @@ -503,23 +503,20 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionCode process definition code + * @param code process definition code * @return task list */ @ApiOperation(value = "getNodeListByDefinitionCode", notes = "GET_NODE_LIST_BY_DEFINITION_CODE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100") + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100") }) @GetMapping(value = "gen-task-list") @ResponseStatus(HttpStatus.OK) @ApiException(GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR) - public Result getNodeListByDefinitionCode( - @ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("processDefinitionCode") long processDefinitionCode) { - logger.info("query task node name list by definitionCode, login user:{}, project name:{}, code : {}", - loginUser.getUserName(), projectCode, processDefinitionCode); - Map result = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, processDefinitionCode); + public Result getNodeListByDefinitionCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam("code") long code) { + Map result = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, code); return returnDataList(result); } @@ -528,7 +525,7 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionCodes process definition codes + * @param codes process definition codes * @return node list data */ @ApiOperation(value = "getNodeListByDefinitionCodes", notes = "GET_NODE_LIST_BY_DEFINITION_CODE_NOTES") @@ -540,31 +537,31 @@ public class ProcessDefinitionController extends BaseController { @ApiException(GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR) public Result getNodeListMapByDefinitionCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("processDefinitionCodes") String processDefinitionCodes) { - Map result = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, processDefinitionCodes); + @RequestParam("codes") String codes) { + Map result = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, codes); return returnDataList(result); } /** - * delete process definition by id + * delete process definition by code * * @param loginUser login user * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @return delete result code */ @ApiOperation(value = "deleteByCode", notes = "DELETE_PROCESS_DEFINITION_BY_ID_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", dataType = "Int", example = "100") + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", dataType = "Int", example = "100") }) @GetMapping(value = "/delete") @ResponseStatus(HttpStatus.OK) - @ApiException(DELETE_PROCESS_DEFINE_BY_ID_ERROR) + @ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result deleteProcessDefinitionById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("processDefinitionId") Integer processDefinitionId) { - Map result = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, processDefinitionId); + public Result deleteProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam("code") long code) { + Map result = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, code); return returnDataList(result); } @@ -573,45 +570,43 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionIds process definition id list + * @param codes process definition code list * @return delete result code */ @ApiOperation(value = "batchDeleteByCodes", notes = "BATCH_DELETE_PROCESS_DEFINITION_BY_IDS_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionIds", value = "PROCESS_DEFINITION_IDS", type = "String") + @ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "String") }) @GetMapping(value = "/batch-delete") @ResponseStatus(HttpStatus.OK) - @ApiException(BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR) + @ApiException(BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result batchDeleteProcessDefinitionByIds(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("processDefinitionIds") String processDefinitionIds - ) { + public Result batchDeleteProcessDefinitionByCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam("codes") String codes) { Map result = new HashMap<>(); - List deleteFailedIdList = new ArrayList<>(); - if (StringUtils.isNotEmpty(processDefinitionIds)) { - String[] processDefinitionIdArray = processDefinitionIds.split(","); - for (String strProcessDefinitionId : processDefinitionIdArray) { - int processDefinitionId = Integer.parseInt(strProcessDefinitionId); + List deleteFailedCodeList = new ArrayList<>(); + if (StringUtils.isNotEmpty(codes)) { + String[] processDefinitionCodeArray = codes.split(","); + for (String strProcessDefinitionCode : processDefinitionCodeArray) { + long code = Long.parseLong(strProcessDefinitionCode); try { - Map deleteResult = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, processDefinitionId); + Map deleteResult = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, code); if (!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))) { - deleteFailedIdList.add(strProcessDefinitionId); + deleteFailedCodeList.add(strProcessDefinitionCode); logger.error((String) deleteResult.get(Constants.MSG)); } } catch (Exception e) { - deleteFailedIdList.add(strProcessDefinitionId); + deleteFailedCodeList.add(strProcessDefinitionCode); } } } - if (!deleteFailedIdList.isEmpty()) { - putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR, String.join(",", deleteFailedIdList)); + if (!deleteFailedCodeList.isEmpty()) { + putMsg(result, BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR, String.join(",", deleteFailedCodeList)); } else { putMsg(result, Status.SUCCESS); } - return returnDataList(result); } @@ -620,22 +615,22 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionCodes process definition codes + * @param codes process definition codes * @param response response */ @ApiOperation(value = "batchExportByCodes", notes = "BATCH_EXPORT_PROCESS_DEFINITION_BY_CODES_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "String") + @ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "String") }) @GetMapping(value = "/export") @ResponseBody @AccessLogAnnotation(ignoreRequestArgs = {"loginUser", "response"}) public void batchExportProcessDefinitionByCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("processDefinitionCodes") String processDefinitionCodes, + @RequestParam("codes") String codes, HttpServletResponse response) { try { - processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, processDefinitionCodes, response); + processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, codes, response); } catch (Exception e) { logger.error(Status.BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR.getMsg(), e); } @@ -680,5 +675,4 @@ public class ProcessDefinitionController extends BaseController { Map result = processDefinitionService.importProcessDefinition(loginUser, projectCode, file); return returnDataList(result); } - } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index 5bc4f2a53f..385fad6471 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -95,15 +95,15 @@ public class TaskDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param taskDefinitionCode task definition code - * @param taskDefinitionJson task definition json + * @param code task definition code + * @param taskDefinitionJsonObj task definition json object * @return update result code */ @ApiOperation(value = "update", notes = "UPDATE_TASK_DEFINITION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), - @ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", required = true, type = "String") + @ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String") }) @PostMapping(value = "/update") @ResponseStatus(HttpStatus.OK) @@ -111,9 +111,9 @@ public class TaskDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result updateTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "taskDefinitionCode") long taskDefinitionCode, - @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) { - Map result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskDefinitionCode, taskDefinitionJson); + @RequestParam(value = "code") long code, + @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj) { + Map result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, code, taskDefinitionJsonObj); return returnDataList(result); } @@ -124,14 +124,14 @@ public class TaskDefinitionController extends BaseController { * @param projectCode project code * @param pageNo the task definition version list current page number * @param pageSize the task definition version list page size - * @param taskDefinitionCode the task definition code + * @param code the task definition code * @return the task definition version list */ @ApiOperation(value = "queryVersions", notes = "QUERY_TASK_DEFINITION_VERSIONS_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"), - @ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") }) @GetMapping(value = "/versions") @ResponseStatus(HttpStatus.OK) @@ -141,8 +141,8 @@ public class TaskDefinitionController extends BaseController { @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @RequestParam(value = "pageNo") int pageNo, @RequestParam(value = "pageSize") int pageSize, - @RequestParam(value = "taskDefinitionCode") long taskDefinitionCode) { - Map result = taskDefinitionService.queryTaskDefinitionVersions(loginUser, projectCode, pageNo, pageSize, taskDefinitionCode); + @RequestParam(value = "code") long code) { + Map result = taskDefinitionService.queryTaskDefinitionVersions(loginUser, projectCode, pageNo, pageSize, code); return returnDataList(result); } @@ -151,13 +151,13 @@ public class TaskDefinitionController extends BaseController { * * @param loginUser login user info * @param projectCode project code - * @param taskDefinitionCode the task definition code + * @param code the task definition code * @param version the version user want to switch * @return switch version result code */ @ApiOperation(value = "switchVersion", notes = "SWITCH_TASK_DEFINITION_VERSION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), @ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100") }) @GetMapping(value = "/version/switch") @@ -166,9 +166,9 @@ public class TaskDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result switchTaskDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "taskDefinitionCode") long taskDefinitionCode, + @RequestParam(value = "code") long code, @RequestParam(value = "version") int version) { - Map result = taskDefinitionService.switchVersion(loginUser, projectCode, taskDefinitionCode, version); + Map result = taskDefinitionService.switchVersion(loginUser, projectCode, code, version); return returnDataList(result); } @@ -177,13 +177,13 @@ public class TaskDefinitionController extends BaseController { * * @param loginUser login user info * @param projectCode project code - * @param taskDefinitionCode the task definition code + * @param code the task definition code * @param version the task definition version user want to delete * @return delete version result code */ @ApiOperation(value = "deleteVersion", notes = "DELETE_TASK_DEFINITION_VERSION_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"), @ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100") }) @GetMapping(value = "/version/delete") @@ -192,9 +192,9 @@ public class TaskDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result deleteTaskDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "taskDefinitionCode") long taskDefinitionCode, + @RequestParam(value = "code") long code, @RequestParam(value = "version") int version) { - Map result = taskDefinitionService.deleteByCodeAndVersion(loginUser, projectCode, taskDefinitionCode, version); + Map result = taskDefinitionService.deleteByCodeAndVersion(loginUser, projectCode, code, version); return returnDataList(result); } @@ -203,12 +203,12 @@ public class TaskDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param taskDefinitionCode the task definition code + * @param code the task definition code * @return delete result code */ @ApiOperation(value = "deleteTaskDefinition", notes = "DELETE_TASK_DEFINITION_BY_CODE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") }) @GetMapping(value = "/delete") @ResponseStatus(HttpStatus.OK) @@ -216,8 +216,8 @@ public class TaskDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result deleteTaskDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "taskDefinitionCode") long taskDefinitionCode) { - Map result = taskDefinitionService.deleteTaskDefinitionByCode(loginUser, projectCode, taskDefinitionCode); + @RequestParam(value = "code") long code) { + Map result = taskDefinitionService.deleteTaskDefinitionByCode(loginUser, projectCode, code); return returnDataList(result); } @@ -226,12 +226,12 @@ public class TaskDefinitionController extends BaseController { * * @param loginUser login user * @param projectCode project code - * @param taskDefinitionCode the task definition code + * @param code the task definition code * @return task definition detail */ @ApiOperation(value = "queryTaskDefinitionDetail", notes = "QUERY_TASK_DEFINITION_DETAIL_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") + @ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") }) @GetMapping(value = "/select-by-code") @ResponseStatus(HttpStatus.OK) @@ -239,8 +239,8 @@ public class TaskDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryTaskDefinitionDetail(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "taskDefinitionCode") long taskDefinitionCode) { - Map result = taskDefinitionService.queryTaskDefinitionDetail(loginUser, projectCode, taskDefinitionCode); + @RequestParam(value = "code") long code) { + Map result = taskDefinitionService.queryTaskDefinitionDetail(loginUser, projectCode, code); return returnDataList(result); } @@ -250,16 +250,16 @@ public class TaskDefinitionController extends BaseController { * @param loginUser login user * @param projectCode project code * @param searchVal search value + * @param userId user id * @param pageNo page number * @param pageSize page size - * @param userId user id * @return task definition page */ @ApiOperation(value = "queryTaskDefinitionListPaging", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"), @ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10") }) @GetMapping(value = "/list-paging") @@ -268,17 +268,55 @@ public class TaskDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryTaskDefinitionListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam("pageNo") Integer pageNo, @RequestParam(value = "searchVal", required = false) String searchVal, @RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId, + @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize) { + Result result = checkPageParams(pageNo, pageSize); + if (!result.checkResult()) { + return result; + } + searchVal = ParameterUtils.handleEscapes(searchVal); + return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, userId, pageNo, pageSize); + } + /** + * query task definition list paging by taskType + * + * @param loginUser login user + * @param projectCode project code + * @param searchVal search value + * @param taskType taskType + * @param userId user id + * @param pageNo page number + * @param pageSize page size + * @return task definition page + */ + @ApiOperation(value = "queryTaskDefinitionByTaskType", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, type = "String"), + @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"), + @ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10") + }) + @GetMapping(value = "/task-type-list-paging") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_DEFINITION_LIST_PAGING_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryTaskDefinitionByTaskType(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "taskType", required = true) String taskType, + @RequestParam(value = "searchVal", required = false) String searchVal, + @RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId, + @RequestParam("pageNo") Integer pageNo, + @RequestParam("pageSize") Integer pageSize) { Result result = checkPageParams(pageNo, pageSize); if (!result.checkResult()) { return result; } searchVal = ParameterUtils.handleEscapes(searchVal); - return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, pageNo, pageSize, userId); + return taskDefinitionService.queryTaskDefinitionByTaskType(loginUser, projectCode, taskType, searchVal, userId, pageNo, pageSize); } /** @@ -297,7 +335,7 @@ public class TaskDefinitionController extends BaseController { @ApiException(LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result genTaskCodeList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam("pageNo") Integer genNum) { + @RequestParam("genNum") Integer genNum) { Map result = taskDefinitionService.genTaskCodeList(loginUser, genNum); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 50585f49e0..ac3293c341 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -192,7 +192,7 @@ public enum Status { BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"), QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"), QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"), - DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163, "delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), + DELETE_PROCESS_DEFINITION_BY_CODE_FAIL(10163, "delete process definition by code fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"), FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"), TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"), @@ -253,11 +253,11 @@ public enum Status { PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"), PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), - DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022, "delete process definition by id error", "删除工作流定义错误"), + DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"), SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already on line", "调度配置[{0}]已上线"), DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"), BATCH_DELETE_PROCESS_DEFINE_ERROR(50025, "batch delete process definition error", "批量删除工作流定义错误"), - BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), + BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "batch delete process definition by codes {0} error", "批量删除工作流定义[{0}]错误"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 42a8bb4c31..42fce02bad 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -27,8 +27,6 @@ import javax.servlet.http.HttpServletResponse; import org.springframework.web.multipart.MultipartFile; -import com.fasterxml.jackson.core.JsonProcessingException; - /** * process definition service */ @@ -46,8 +44,8 @@ public interface ProcessDefinitionService { * @param timeout timeout * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson * @return create result code - * @throws JsonProcessingException JsonProcessingException */ Map createProcessDefinition(User loginUser, long projectCode, @@ -57,7 +55,8 @@ public interface ProcessDefinitionService { String locations, int timeout, String tenantCode, - String taskRelationJson) throws JsonProcessingException; + String taskRelationJson, + String taskDefinitionJson); /** * query process definition list @@ -83,9 +82,9 @@ public interface ProcessDefinitionService { Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal, + Integer userId, Integer pageNo, - Integer pageSize, - Integer userId); + Integer pageSize); /** * query detail of process definition @@ -101,29 +100,29 @@ public interface ProcessDefinitionService { long code); /** - * query datail of process definition + * query detail of process definition * * @param loginUser login user * @param projectCode project code - * @param processDefinitionName process definition name + * @param name process definition name * @return process definition detail */ Map queryProcessDefinitionByName(User loginUser, long projectCode, - String processDefinitionName); + String name); /** * batch copy process definition * * @param loginUser loginUser * @param projectCode projectCode - * @param processDefinitionCodes processDefinitionCodes + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ Map batchCopyProcessDefinition(User loginUser, long projectCode, - String processDefinitionCodes, + String codes, long targetProjectCode); /** @@ -131,12 +130,12 @@ public interface ProcessDefinitionService { * * @param loginUser loginUser * @param projectCode projectCode - * @param processDefinitionCodes processDefinitionCodes + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ Map batchMoveProcessDefinition(User loginUser, long projectCode, - String processDefinitionCodes, + String codes, long targetProjectCode); /** @@ -152,6 +151,7 @@ public interface ProcessDefinitionService { * @param timeout timeout * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson * @return update result code */ Map updateProcessDefinition(User loginUser, @@ -163,7 +163,8 @@ public interface ProcessDefinitionService { String locations, int timeout, String tenantCode, - String taskRelationJson); + String taskRelationJson, + String taskDefinitionJson); /** * verify process definition name unique @@ -178,16 +179,16 @@ public interface ProcessDefinitionService { String name); /** - * delete process definition by id + * delete process definition by code * * @param loginUser login user * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @return delete result code */ - Map deleteProcessDefinitionById(User loginUser, - long projectCode, - Integer processDefinitionId); + Map deleteProcessDefinitionByCode(User loginUser, + long projectCode, + long code); /** * release process definition: online / offline @@ -208,12 +209,12 @@ public interface ProcessDefinitionService { * * @param loginUser login user * @param projectCode project code - * @param processDefinitionCodes process definition codes + * @param codes process definition codes * @param response http servlet response */ void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, - String processDefinitionCodes, + String codes, HttpServletResponse response); /** @@ -241,24 +242,24 @@ public interface ProcessDefinitionService { * * @param loginUser loginUser * @param projectCode project code - * @param defineCode define code + * @param code processDefinition code * @return task node list */ Map getTaskNodeListByDefinitionCode(User loginUser, long projectCode, - long defineCode); + long code); /** * get task node details map based on process definition * * @param loginUser loginUser * @param projectCode project code - * @param defineCodeList define code list + * @param codes define code list * @return task node list */ Map getNodeListMapByDefinitionCodes(User loginUser, long projectCode, - String defineCodeList); + String codes); /** * query process definition all by project code @@ -274,23 +275,21 @@ public interface ProcessDefinitionService { * @param code process definition code * @param limit limit * @return tree view json data - * @throws Exception exception */ - Map viewTree(long code, - Integer limit) throws Exception; + Map viewTree(long code, Integer limit); /** - * switch the defined process definition verison + * switch the defined process definition version * * @param loginUser login user * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @param version the version user want to switch * @return switch process definition version result code */ Map switchProcessDefinitionVersion(User loginUser, long projectCode, - int processDefinitionId, + long code, int version); /** @@ -300,28 +299,28 @@ public interface ProcessDefinitionService { * @param projectCode project code * @param pageNo page number * @param pageSize page size - * @param processDefinitionCode process definition code + * @param code process definition code * @return the pagination process definition versions info of the certain process definition */ Result queryProcessDefinitionVersions(User loginUser, - long projectCode, - int pageNo, - int pageSize, - long processDefinitionCode); + long projectCode, + int pageNo, + int pageSize, + long code); /** - * delete one certain process definition by version number and process definition id + * delete one certain process definition by version number and process definition code * * @param loginUser login user info to check auth * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @param version version number * @return delele result code */ - Map deleteByProcessDefinitionIdAndVersion(User loginUser, - long projectCode, - int processDefinitionId, - int version); + Map deleteProcessDefinitionVersion(User loginUser, + long projectCode, + long code, + int version); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index 6d4cc2c269..dcf92e24aa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -66,12 +66,12 @@ public interface TaskDefinitionService { * @param loginUser login user * @param projectCode project code * @param taskCode task code - * @param taskDefinitionJson task definition json + * @param taskDefinitionJsonObj task definition json object */ Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, - String taskDefinitionJson); + String taskDefinitionJsonObj); /** * update task definition @@ -134,17 +134,37 @@ public interface TaskDefinitionService { * @param loginUser login user * @param projectCode project code * @param searchVal search value + * @param userId user id * @param pageNo page number * @param pageSize page size - * @param userId user id * @return task definition page */ Result queryTaskDefinitionListPaging(User loginUser, long projectCode, String searchVal, + Integer userId, + Integer pageNo, + Integer pageSize); + + /** + * query task definition list paging + * + * @param loginUser login user + * @param projectCode project code + * @param taskType taskType + * @param searchVal search value + * @param userId user id + * @param pageNo page number + * @param pageSize page size + * @return task definition page + */ + Result queryTaskDefinitionByTaskType(User loginUser, + long projectCode, + String taskType, + String searchVal, + Integer userId, Integer pageNo, - Integer pageSize, - Integer userId); + Integer pageSize); /** * gen task code list diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 2bbb968fbf..def99899d4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; +import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -111,18 +112,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class); - private static final String PROCESSDEFINITIONCODE = "processDefinitionCode"; - private static final String RELEASESTATE = "releaseState"; - private static final String TASKS = "tasks"; - @Autowired private ProjectMapper projectMapper; @Autowired private ProjectService projectService; + @Autowired + private TaskDefinitionService taskDefinitionService; + @Autowired private UserMapper userMapper; @@ -174,6 +174,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param timeout timeout * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson * @return create result code */ @Override @@ -186,7 +187,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro String locations, int timeout, String tenantCode, - String taskRelationJson) { + String taskRelationJson, + String taskDefinitionJson) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -213,6 +215,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } + taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson); + long processDefinitionCode; try { processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); @@ -221,7 +225,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, locations, timeout, loginUser.getId(), tenant.getId()); + globalParams, locations, timeout, loginUser.getId(), tenant.getId()); return createProcessDefine(loginUser, result, taskRelationList, processDefinition); } @@ -314,26 +318,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param loginUser login user * @param projectCode project code * @param searchVal search value + * @param userId user id * @param pageNo page number * @param pageSize page size - * @param userId user id * @return process definition page */ @Override - public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal, Integer pageNo, Integer pageSize, Integer userId) { + public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal, Integer userId, Integer pageNo, Integer pageSize) { Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); //check user access for project Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { - putMsg(result,resultStatus); + putMsg(result, resultStatus); return result; } Page page = new Page<>(pageNo, pageSize); IPage processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( - page, searchVal, userId, project.getCode(), isAdmin(loginUser)); + page, searchVal, userId, project.getCode(), isAdmin(loginUser)); List records = processDefinitionIPage.getRecords(); for (ProcessDefinition pd : records) { @@ -380,17 +384,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } @Override - public Map queryProcessDefinitionByName(User loginUser, long projectCode, String processDefinitionName) { + public Map queryProcessDefinitionByName(User loginUser, long projectCode, String name) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name); if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, name); } else { DagData dagData = processService.genDagData(processDefinition); result.put(Constants.DATA_LIST, dagData); @@ -412,8 +416,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param timeout timeout * @param tenantCode tenantCode * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson * @return update result code */ + @Transactional(rollbackFor = Exception.class) @Override public Map updateProcessDefinition(User loginUser, long projectCode, @@ -424,7 +430,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro String locations, int timeout, String tenantCode, - String taskRelationJson) { + String taskRelationJson, + String taskDefinitionJson) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -463,7 +470,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } } - + taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson); processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenant.getId()); return updateProcessDefine(loginUser, result, taskRelationList, processDefinition); } @@ -476,7 +483,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); if (insertVersion > 0) { int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList); + processDefinition.getCode(), insertVersion, taskRelationList); if (insertResult > 0) { putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); @@ -515,30 +522,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } /** - * delete process definition by id + * delete process definition by code * * @param loginUser login user * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @return delete result code */ @Override @Transactional(rollbackFor = RuntimeException.class) - public Map deleteProcessDefinitionById(User loginUser, long projectCode, Integer processDefinitionId) { + public Map deleteProcessDefinitionByCode(User loginUser, long projectCode, long code) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - - ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId); - - // TODO: replace id to code - // ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode); - + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } @@ -550,21 +552,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro // check process definition is already online if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, code); return result; } // check process instances is already running List processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES); if (CollectionUtils.isNotEmpty(processInstances)) { - putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, processInstances.size()); + putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_CODE_FAIL, processInstances.size()); return result; } // get the timing according to the process definition - List schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionId); + List schedules = scheduleMapper.queryByProcessDefinitionCode(code); if (!schedules.isEmpty() && schedules.size() > 1) { logger.warn("scheduler num is {},Greater than 1", schedules.size()); - putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); + putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); return result; } else if (schedules.size() == 1) { Schedule schedule = schedules.get(0); @@ -576,12 +578,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } - int delete = processDefinitionMapper.deleteById(processDefinitionId); + int delete = processDefinitionMapper.deleteById(processDefinition.getId()); processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); if (delete > 0) { putMsg(result, Status.SUCCESS); } else { - putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); + putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); } return result; } @@ -636,7 +638,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setReleaseState(releaseState); processDefinitionMapper.updateById(processDefinition); List scheduleList = scheduleMapper.selectAllByProcessDefineArray( - new long[]{processDefinition.getCode()} + new long[]{processDefinition.getCode()} ); for (Schedule schedule : scheduleList) { @@ -660,8 +662,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * batch export process definition by codes */ @Override - public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String processDefinitionCodes, HttpServletResponse response) { - if (StringUtils.isEmpty(processDefinitionCodes)) { + public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes, HttpServletResponse response) { + if (StringUtils.isEmpty(codes)) { return; } Project project = projectMapper.queryByCode(projectCode); @@ -670,7 +672,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (result.get(Constants.STATUS) != Status.SUCCESS) { return; } - Set defineCodeSet = Lists.newArrayList(processDefinitionCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); + Set defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); List processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet); List dagDataSchedules = processDefinitionList.stream().map(this::exportProcessDagData).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(dagDataSchedules)) { @@ -931,21 +933,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * * @param loginUser loginUser * @param projectCode project code - * @param defineCode define code + * @param code process definition code * @return task node list */ @Override - public Map getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long defineCode) { + public Map getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long code) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(defineCode); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); if (processDefinition == null) { logger.info("process define not exists"); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCode); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } DagData dagData = processService.genDagData(processDefinition); @@ -960,11 +962,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * * @param loginUser loginUser * @param projectCode project code - * @param defineCodes define codes + * @param codes define codes * @return task node list */ @Override - public Map getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String defineCodes) { + public Map getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String codes) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -972,11 +974,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - Set defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); + Set defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); List processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet); if (CollectionUtils.isEmpty(processDefinitionList)) { logger.info("process definition not exists"); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCodes); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); return result; } Map> taskNodeMap = new HashMap<>(); @@ -1042,7 +1044,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processInstanceList.forEach(processInstance -> processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime()))); List taskDefinitionList = processService.queryTaskDefinitionListByProcess(code, processDefinition.getVersion()); Map taskDefinitionMap = taskDefinitionList.stream() - .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); + .collect(Collectors.toMap(TaskDefinitionLog::getCode, taskDefinitionLog -> taskDefinitionLog)); if (limit > processInstanceList.size()) { limit = processInstanceList.size(); @@ -1056,8 +1058,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessInstance processInstance = processInstanceList.get(i); Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", - processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), - DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + processInstance.getState().toString(), processInstance.getStartTime(), endTime, processInstance.getHost(), + DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); } List parentTreeViewDtoList = new ArrayList<>(); @@ -1095,11 +1097,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (taskInstance.isSubProcess()) { TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); subProcessId = Integer.parseInt(JSONUtils.parseObject( - taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); + taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), - taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), - DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); + taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), + DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); } } for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { @@ -1162,20 +1164,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * * @param loginUser loginUser * @param projectCode projectCode - * @param processDefinitionCodes processDefinitionCodes + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override public Map batchCopyProcessDefinition(User loginUser, long projectCode, - String processDefinitionCodes, + String codes, long targetProjectCode) { - Map result = checkParams(loginUser, projectCode, processDefinitionCodes, targetProjectCode); + Map result = checkParams(loginUser, projectCode, codes, targetProjectCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } List failedProcessList = new ArrayList<>(); - doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, processDefinitionCodes, result, true); + doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true); checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true); return result; } @@ -1185,15 +1187,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * * @param loginUser loginUser * @param projectCode projectCode - * @param processDefinitionCodes processDefinitionCodes + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override public Map batchMoveProcessDefinition(User loginUser, long projectCode, - String processDefinitionCodes, + String codes, long targetProjectCode) { - Map result = checkParams(loginUser, projectCode, processDefinitionCodes, targetProjectCode); + Map result = checkParams(loginUser, projectCode, codes, targetProjectCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -1201,7 +1203,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } List failedProcessList = new ArrayList<>(); - doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, processDefinitionCodes, result, false); + doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, false); checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, false); return result; } @@ -1247,7 +1249,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro diffCode.forEach(code -> failedProcessList.add(code + "[null]")); for (ProcessDefinition processDefinition : processDefinitionList) { List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); List taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); processDefinition.setProjectCode(targetProjectCode); if (isCopy) { @@ -1267,12 +1269,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * * @param loginUser login user * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @param version the version user want to switch * @return switch process definition version result code */ @Override - public Map switchProcessDefinitionVersion(User loginUser, long projectCode, int processDefinitionId, int version) { + public Map switchProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -1280,22 +1282,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); if (Objects.isNull(processDefinition)) { - putMsg(result - , Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR - , processDefinitionId); + putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code); return result; } ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper - .queryByDefinitionCodeAndVersion(processDefinition.getCode(), version); + .queryByDefinitionCodeAndVersion(code, version); if (Objects.isNull(processDefinitionLog)) { - putMsg(result - , Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR - , processDefinition.getCode() - , version); + putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, processDefinition.getCode(), version); return result; } int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog); @@ -1336,23 +1333,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param projectCode project code * @param pageNo page number * @param pageSize page size - * @param processDefinitionCode process definition code + * @param code process definition code * @return the pagination process definition versions info of the certain process definition */ @Override - public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, long processDefinitionCode) { - + public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, long code) { Result result = new Result(); Project project = projectMapper.queryByCode(projectCode); // check user access for project Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode); Status resultStatus = (Status) checkResult.get(Constants.STATUS); if (resultStatus != Status.SUCCESS) { - putMsg(result,resultStatus); + putMsg(result, resultStatus); return result; } - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); Page page = new Page<>(pageNo, pageSize); @@ -1362,34 +1358,34 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro pageInfo.setTotalList(processDefinitionLogs); pageInfo.setTotal((int) processDefinitionVersionsPaging.getTotal()); result.setData(pageInfo); - putMsg(result,Status.SUCCESS); + putMsg(result, Status.SUCCESS); return result; } /** - * delete one certain process definition by version number and process definition id + * delete one certain process definition by version number and process definition code * * @param loginUser login user info to check auth * @param projectCode project code - * @param processDefinitionId process definition id + * @param code process definition code * @param version version number * @return delele result code */ @Override - public Map deleteByProcessDefinitionIdAndVersion(User loginUser, long projectCode, int processDefinitionId, int version) { + public Map deleteProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); } else { - processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(processDefinition.getCode(), version); + processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version); putMsg(result, Status.SUCCESS); } return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 2e012283ca..ec5e20285b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -17,15 +17,12 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; - import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException; @@ -109,36 +106,78 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe int totalSuccessNumber = 0; List totalSuccessCode = new ArrayList<>(); Date now = new Date(); + List newTaskDefinitionLogs = new ArrayList<>(); + List updateTaskDefinitionLogs = new ArrayList<>(); for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - checkTaskDefinition(result, taskDefinitionLog); - if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID - || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { + if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { + logger.error("task definition {} parameter invalid", taskDefinitionLog.getName()); + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); return result; } taskDefinitionLog.setProjectCode(projectCode); + taskDefinitionLog.setUpdateTime(now); + taskDefinitionLog.setOperateTime(now); + taskDefinitionLog.setOperator(loginUser.getId()); + if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) { + TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper + .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion()); + if (definitionCodeAndVersion != null) { + if (!taskDefinitionLog.equals(definitionCodeAndVersion)) { + taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId()); + Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode()); + if (version == null || version == 0) { + putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionLog.getCode()); + return result; + } + taskDefinitionLog.setVersion(version + 1); + taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime()); + updateTaskDefinitionLogs.add(taskDefinitionLog); + totalSuccessCode.add(taskDefinitionLog.getCode()); + } + continue; + } + } taskDefinitionLog.setUserId(loginUser.getId()); taskDefinitionLog.setVersion(1); taskDefinitionLog.setCreateTime(now); - taskDefinitionLog.setUpdateTime(now); - long code = 0L; - try { - code = SnowFlakeUtils.getInstance().nextId(); - taskDefinitionLog.setCode(code); - } catch (SnowFlakeException e) { - logger.error("Task code get error, ", e); - putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); - return result; + totalSuccessCode.add(taskDefinitionLog.getCode()); + newTaskDefinitionLogs.add(taskDefinitionLog); + if (taskDefinitionLog.getCode() == 0) { + long code; + try { + code = SnowFlakeUtils.getInstance().nextId(); + taskDefinitionLog.setVersion(1); + taskDefinitionLog.setCode(code); + } catch (SnowFlakeException e) { + logger.error("Task code get error, ", e); + putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code"); + return result; + } } - taskDefinitionLog.setOperator(loginUser.getId()); - taskDefinitionLog.setOperateTime(now); - totalSuccessCode.add(code); + totalSuccessCode.add(taskDefinitionLog.getCode()); + newTaskDefinitionLogs.add(taskDefinitionLog); totalSuccessNumber++; } - int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogs); - int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs); - if ((logInsert & insert) == 0) { - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - return result; + for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) { + TaskDefinition task = taskDefinitionMapper.queryByDefinitionCode(taskDefinitionToUpdate.getCode()); + if (task == null) { + newTaskDefinitionLogs.add(taskDefinitionToUpdate); + } else { + int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate); + int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); + if ((update & insert) != 1) { + putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + return result; + } + } + } + if (!newTaskDefinitionLogs.isEmpty()) { + int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); + int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); + if ((logInsert & insert) == 0) { + putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + return result; + } } Map resData = new HashMap<>(); resData.put("total", totalSuccessNumber); @@ -214,11 +253,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @param loginUser login user * @param projectCode project code * @param taskCode task code - * @param taskDefinitionJson task definition json + * @param taskDefinitionJsonObj task definition json object */ @Transactional(rollbackFor = RuntimeException.class) @Override - public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJson) { + public Map updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) { Project project = projectMapper.queryByCode(projectCode); //check user access for project Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); @@ -234,19 +273,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } - TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJson, TaskDefinitionLog.class); - checkTaskDefinition(result, taskDefinitionToUpdate); - if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID - || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { + TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class); + if (taskDefinitionToUpdate == null) { + logger.error("taskDefinitionJson is not valid json"); + putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); + return result; + } + if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) { + logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName()); + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); return result; } Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); + if (version == null || version == 0) { + putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); + return result; + } Date now = new Date(); - taskDefinitionToUpdate.setCode(taskDefinition.getCode()); + taskDefinitionToUpdate.setCode(taskCode); taskDefinitionToUpdate.setId(taskDefinition.getId()); taskDefinitionToUpdate.setProjectCode(projectCode); taskDefinitionToUpdate.setUserId(taskDefinition.getUserId()); - taskDefinitionToUpdate.setVersion(version == null || version == 0 ? 1 : version + 1); + taskDefinitionToUpdate.setVersion(version + 1); taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase()); taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate)); taskDefinitionToUpdate.setUpdateTime(now); @@ -264,25 +312,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } - public void checkTaskNode(Map result, TaskNode taskNode, String taskDefinitionJson) { - if (taskNode == null) { - logger.error("taskDefinitionJson is not valid json"); - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); - return; - } - if (!CheckUtils.checkTaskNodeParameters(taskNode)) { - logger.error("task node {} parameter invalid", taskNode.getName()); - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); - } - } - - private void checkTaskDefinition(Map result, TaskDefinition taskDefinition) { - if (!CheckUtils.checkTaskDefinitionParameters(taskDefinition)) { - logger.error("task definition {} parameter invalid", taskDefinition.getName()); - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName()); - } - } - /** * update task definition * @@ -336,9 +365,20 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe public Result queryTaskDefinitionListPaging(User loginUser, long projectCode, String searchVal, + Integer userId, + Integer pageNo, + Integer pageSize) { + return null; + } + + @Override + public Result queryTaskDefinitionByTaskType(User loginUser, + long projectCode, + String taskType, + String searchVal, + Integer userId, Integer pageNo, - Integer pageSize, - Integer userId) { + Integer pageSize) { return null; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index dc81651d6c..4737c2f8f8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -71,11 +71,17 @@ public class ProcessDefinitionControllerTest { } @Test - public void testCreateProcessDefinition() throws Exception { - String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," + public void testCreateProcessDefinition() { + String relationJson = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," + "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]"; - + String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; long projectCode = 1L; String name = "dag_test"; String description = "desc test"; @@ -88,10 +94,10 @@ public class ProcessDefinitionControllerTest { result.put(Constants.DATA_LIST, 1); Mockito.when(processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, - locations, timeout, tenantCode, json)).thenReturn(result); + locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result); Result response = processDefinitionController.createProcessDefinition(user, projectCode, name, description, globalParams, - locations, timeout, tenantCode, json); + locations, timeout, tenantCode, relationJson, taskDefinitionJson); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } @@ -128,9 +134,16 @@ public class ProcessDefinitionControllerTest { @Test public void updateProcessDefinition() { - String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," - + "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," - + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]"; + String relationJson = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," + + "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," + + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]"; + String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; long projectCode = 1L; String name = "dag_test"; @@ -144,10 +157,10 @@ public class ProcessDefinitionControllerTest { result.put("processDefinitionId", 1); Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description, globalParams, - locations, timeout, tenantCode, json)).thenReturn(result); + locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result); Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code, description, globalParams, - locations, timeout, tenantCode, json, ReleaseState.OFFLINE); + locations, timeout, tenantCode, relationJson, taskDefinitionJson, ReleaseState.OFFLINE); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } @@ -265,15 +278,15 @@ public class ProcessDefinitionControllerTest { } @Test - public void testDeleteProcessDefinitionById() { + public void testDeleteProcessDefinitionByCode() { long projectCode = 1L; - int id = 1; + long code = 1L; Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); - Mockito.when(processDefinitionService.deleteProcessDefinitionById(user, projectCode, id)).thenReturn(result); - Result response = processDefinitionController.deleteProcessDefinitionById(user, projectCode, id); + Mockito.when(processDefinitionService.deleteProcessDefinitionByCode(user, projectCode, code)).thenReturn(result); + Result response = processDefinitionController.deleteProcessDefinitionByCode(user, projectCode, code); Assert.assertTrue(response != null && response.isSuccess()); } @@ -333,7 +346,7 @@ public class ProcessDefinitionControllerTest { } @Test - public void testQueryProcessDefinitionListPaging() throws Exception { + public void testQueryProcessDefinitionListPaging() { long projectCode = 1L; int pageNo = 1; int pageSize = 10; @@ -344,8 +357,8 @@ public class ProcessDefinitionControllerTest { putMsg(result, Status.SUCCESS); result.setData(new PageInfo(1, 10)); - Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, searchVal, pageNo, pageSize, userId)).thenReturn(result); - Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectCode, pageNo, searchVal, userId, pageSize); + Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, searchVal, userId, pageNo, pageSize)).thenReturn(result); + Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectCode, searchVal, userId, pageNo, pageSize); Assert.assertTrue(response != null && response.isSuccess()); } @@ -399,17 +412,10 @@ public class ProcessDefinitionControllerTest { long projectCode = 1L; Map resultMap = new HashMap<>(); putMsg(resultMap, Status.SUCCESS); - Mockito.when(processDefinitionService.deleteByProcessDefinitionIdAndVersion( - user - , projectCode - , 1 - , 10)) - .thenReturn(resultMap); + Mockito.when(processDefinitionService.deleteProcessDefinitionVersion( + user, projectCode, 1, 10)).thenReturn(resultMap); Result result = processDefinitionController.deleteProcessDefinitionVersion( - user - , projectCode - , 1 - , 10); + user, projectCode, 1, 10); Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode()); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 719d6c70e5..f727b91af5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -143,7 +143,7 @@ public class ProcessDefinitionServiceTest { //project not found Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Result map = processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", 1, 5, 0); - Assert.assertEquals(Status.PROJECT_NOT_FOUNT.getCode(), (int)map.getCode()); + Assert.assertEquals(Status.PROJECT_NOT_FOUNT.getCode(), (int) map.getCode()); putMsg(result, Status.SUCCESS, projectCode); loginUser.setId(1); @@ -310,7 +310,7 @@ public class ProcessDefinitionServiceTest { } @Test - public void deleteProcessDefinitionByIdTest() { + public void deleteProcessDefinitionByCodeTest() { long projectCode = 1L; Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); @@ -323,14 +323,14 @@ public class ProcessDefinitionServiceTest { Map result = new HashMap<>(); putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Map map = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 6); + Map map = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 6L); Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS)); //project check auth success, instance not exist putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(processDefineMapper.selectById(1)).thenReturn(null); - Map instanceNotExitRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 1); + Mockito.when(processDefineMapper.queryByCode(1L)).thenReturn(null); + Map instanceNotExitRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 1L); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS)); ProcessDefinition processDefinition = getProcessDefinition(); @@ -338,8 +338,8 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); //user no auth loginUser.setUserType(UserType.GENERAL_USER); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); - Map userNoAuthRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); + Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); + Map userNoAuthRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS)); //process definition online @@ -347,21 +347,21 @@ public class ProcessDefinitionServiceTest { putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); processDefinition.setReleaseState(ReleaseState.ONLINE); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); - Map dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); + Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); + Map dfOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS)); //scheduler list elements > 1 processDefinition.setReleaseState(ReleaseState.OFFLINE); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition); + Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); List schedules = new ArrayList<>(); schedules.add(getSchedule()); schedules.add(getSchedule()); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules); - Map schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); - Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS)); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); + Map schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); + Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS)); //scheduler online schedules.clear(); @@ -370,8 +370,8 @@ public class ProcessDefinitionServiceTest { schedules.add(schedule); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules); - Map schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); + Map schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS)); //delete fail @@ -380,16 +380,17 @@ public class ProcessDefinitionServiceTest { schedules.add(schedule); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0); - Map deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); - Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS)); + Map deleteFail = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); + Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, deleteFail.get(Constants.STATUS)); //delete success Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); + Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Map deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46); + Map deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS)); } @@ -594,7 +595,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Map updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1, - "", "", "", 0, "root", null); + "", "", "", 0, "root", null, null); Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS)); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 9dd475be8b..c3f1bcc00d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -151,23 +151,13 @@ public class TaskDefinitionServiceImplTest { @Test public void updateTaskDefinition () { - String updateTaskDefinitionJson = "{\n" - + "\"name\": \"test12111\",\n" - + "\"description\": \"test\",\n" - + "\"taskType\": \"SHELL\",\n" - + "\"flag\": 0,\n" - + "\"taskParams\": \"{\\\"resourceList\\\":[],\\\"localParams\\\":[],\\\"rawScript\\\":\\\"echo 11\\\",\\\"conditionResult\\\": " - + "{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]},\\\"dependence\\\":{}}\",\n" - + "\"taskPriority\": 0,\n" - + "\"workerGroup\": \"default\",\n" - + "\"failRetryTimes\": 0,\n" - + "\"failRetryInterval\": 1,\n" - + "\"timeoutFlag\": 1,\n" - + "\"timeoutNotifyStrategy\": 0,\n" - + "\"timeout\": 0,\n" - + "\"delayTime\": 0,\n" - + "\"resourceIds\": \"\"\n" - + "}"; + String taskDefinitionJson = "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":" + + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\"," + + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":" + + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"]," + + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0," + + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}"; long projectCode = 1L; long taskCode = 1L; @@ -186,7 +176,8 @@ public class TaskDefinitionServiceImplTest { Mockito.when(taskDefinitionMapper.queryByDefinitionCode(taskCode)).thenReturn(new TaskDefinition()); Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1); - result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, updateTaskDefinitionJson); + Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1); + result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 28859d47a4..dd25d6e5f8 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; import com.baomidou.mybatisplus.annotation.IdType; @@ -400,34 +401,62 @@ public class TaskDefinition { return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE); } + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + TaskDefinition that = (TaskDefinition) o; + return failRetryTimes == that.failRetryTimes + && failRetryInterval == that.failRetryInterval + && timeout == that.timeout + && delayTime == that.delayTime + && Objects.equals(name, that.name) + && Objects.equals(description, that.description) + && Objects.equals(taskType, that.taskType) + && Objects.equals(taskParams, that.taskParams) + && flag == that.flag + && taskPriority == that.taskPriority + && Objects.equals(workerGroup, that.workerGroup) + && timeoutFlag == that.timeoutFlag + && timeoutNotifyStrategy == that.timeoutNotifyStrategy + && Objects.equals(resourceIds, that.resourceIds); + } + + @Override + public int hashCode() { + return Objects.hash(name, description, taskType, taskParams, flag, taskPriority, workerGroup, failRetryTimes, + failRetryInterval, timeoutFlag, timeoutNotifyStrategy, timeout, delayTime, resourceIds); + } + @Override public String toString() { return "TaskDefinition{" - + "id=" + id - + ", code=" + code - + ", name='" + name + '\'' - + ", version=" + version - + ", description='" + description + '\'' - + ", projectCode=" + projectCode - + ", userId=" + userId - + ", taskType=" + taskType - + ", taskParams='" + taskParams + '\'' - + ", taskParamList=" + taskParamList - + ", taskParamMap=" + taskParamMap - + ", flag=" + flag - + ", taskPriority=" + taskPriority - + ", userName='" + userName + '\'' - + ", projectName='" + projectName + '\'' - + ", workerGroup='" + workerGroup + '\'' - + ", failRetryTimes=" + failRetryTimes - + ", failRetryInterval=" + failRetryInterval - + ", timeoutFlag=" + timeoutFlag - + ", timeoutNotifyStrategy=" + timeoutNotifyStrategy - + ", timeout=" + timeout - + ", delayTime=" + delayTime - + ", resourceIds='" + resourceIds + '\'' - + ", createTime=" + createTime - + ", updateTime=" + updateTime - + '}'; + + "id=" + id + + ", code=" + code + + ", name='" + name + '\'' + + ", version=" + version + + ", description='" + description + '\'' + + ", projectCode=" + projectCode + + ", userId=" + userId + + ", taskType=" + taskType + + ", taskParams='" + taskParams + '\'' + + ", taskParamList=" + taskParamList + + ", taskParamMap=" + taskParamMap + + ", flag=" + flag + + ", taskPriority=" + taskPriority + + ", userName='" + userName + '\'' + + ", projectName='" + projectName + '\'' + + ", workerGroup='" + workerGroup + '\'' + + ", failRetryTimes=" + failRetryTimes + + ", failRetryInterval=" + failRetryInterval + + ", timeoutFlag=" + timeoutFlag + + ", timeoutNotifyStrategy=" + timeoutNotifyStrategy + + ", timeout=" + timeout + + ", delayTime=" + delayTime + + ", resourceIds='" + resourceIds + '\'' + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java index 96851cc7b8..b052c930a0 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java @@ -88,6 +88,16 @@ public class TaskDefinitionLog extends TaskDefinition { this.operateTime = operateTime; } + @Override + public boolean equals(Object o) { + return super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } + @Override public String toString() { return super.toString();