Browse Source

[Feature][JsonSplit-api] api of ProcessDefinition/TaskDefinition (#6010)

* fix api run error

* fix ut

* api of ProcessDefinition/TaskDefinition

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
effbad4e20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 154
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 100
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  3. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  4. 71
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  5. 30
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  6. 138
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  7. 124
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  8. 60
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  9. 41
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  10. 27
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  11. 29
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  12. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

154
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -18,10 +18,10 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_COPY_PROCESS_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_MOVE_PROCESS_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_PROCESS_DEFINITION;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINITION_VERSION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR;
@ -69,8 +69,6 @@ import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@ -103,6 +101,7 @@ public class ProcessDefinitionController extends BaseController {
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@ApiOperation(value = "save", notes = "CREATE_PROCESS_DEFINITION_NOTES")
@ -123,10 +122,10 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "locations", required = false) String locations,
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson) throws JsonProcessingException {
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) {
Map<String, Object> result = processDefinitionService.createProcessDefinition(loginUser, projectCode, name, description, globalParams,
locations, timeout, tenantCode, taskRelationJson);
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
return returnDataList(result);
}
@ -135,13 +134,13 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodes process definition codes
* @param codes process definition codes
* @param targetProjectCode target project code
* @return copy result code
*/
@ApiOperation(value = "copy", notes = "COPY_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"),
@ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"),
@ApiImplicitParam(name = "targetProjectCode", value = "TARGET_PROJECT_CODE", required = true, dataType = "Long", example = "123")
})
@PostMapping(value = "/copy")
@ -150,9 +149,9 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result copyProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCodes", required = true) String processDefinitionCodes,
@RequestParam(value = "codes", required = true) String codes,
@RequestParam(value = "targetProjectCode", required = true) long targetProjectCode) {
return returnDataList(processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, processDefinitionCodes, targetProjectCode));
return returnDataList(processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, codes, targetProjectCode));
}
/**
@ -160,13 +159,13 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodes process definition codes
* @param codes process definition codes
* @param targetProjectCode target project code
* @return move result code
*/
@ApiOperation(value = "moveProcessDefinition", notes = "MOVE_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"),
@ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "3,4"),
@ApiImplicitParam(name = "targetProjectCode", value = "TARGET_PROJECT_CODE", required = true, dataType = "Long", example = "123")
})
@PostMapping(value = "/move")
@ -175,9 +174,9 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result moveProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionCodes", required = true) String processDefinitionCodes,
@RequestParam(value = "codes", required = true) String codes,
@RequestParam(value = "targetProjectCode", required = true) long targetProjectCode) {
return returnDataList(processDefinitionService.batchMoveProcessDefinition(loginUser, projectCode, processDefinitionCodes, targetProjectCode));
return returnDataList(processDefinitionService.batchMoveProcessDefinition(loginUser, projectCode, codes, targetProjectCode));
}
/**
@ -216,9 +215,9 @@ public class ProcessDefinitionController extends BaseController {
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@ApiOperation(value = "update", notes = "UPDATE_PROCESS_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"),
@ -241,10 +240,11 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "tenantCode", required = true) String tenantCode,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson,
@RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) {
Map<String, Object> result = processDefinitionService.updateProcessDefinition(loginUser, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, taskRelationJson);
locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson);
// If the update fails, the result will be returned directly
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataList(result);
@ -264,14 +264,14 @@ public class ProcessDefinitionController extends BaseController {
* @param projectCode project code
* @param pageNo the process definition version list current page number
* @param pageSize the process definition version list page size
* @param processDefinitionCode the process definition code
* @param code the process definition code
* @return the process definition version list
*/
@ApiOperation(value = "queryVersions", notes = "QUERY_PROCESS_DEFINITION_VERSIONS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
})
@GetMapping(value = "/versions")
@ResponseStatus(HttpStatus.OK)
@ -281,13 +281,13 @@ public class ProcessDefinitionController extends BaseController {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "pageNo") int pageNo,
@RequestParam(value = "pageSize") int pageSize,
@RequestParam(value = "processDefinitionCode") long processDefinitionCode) {
@RequestParam(value = "code") long code) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
}
result = processDefinitionService.queryProcessDefinitionVersions(loginUser, projectCode, pageNo, pageSize, processDefinitionCode);
result = processDefinitionService.queryProcessDefinitionVersions(loginUser, projectCode, pageNo, pageSize, code);
return result;
}
@ -297,14 +297,14 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user info
* @param projectCode project code
* @param processDefinitionId the process definition id
* @param code the process definition code
* @param version the version user want to switch
* @return switch version result code
*/
@ApiOperation(value = "switchVersion", notes = "SWITCH_PROCESS_DEFINITION_VERSION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Long", example = "100")
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/version/switch")
@ResponseStatus(HttpStatus.OK)
@ -312,25 +312,25 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result switchProcessDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionId") int processDefinitionId,
@RequestParam(value = "code") long code,
@RequestParam(value = "version") int version) {
Map<String, Object> result = processDefinitionService.switchProcessDefinitionVersion(loginUser, projectCode, processDefinitionId, version);
Map<String, Object> result = processDefinitionService.switchProcessDefinitionVersion(loginUser, projectCode, code, version);
return returnDataList(result);
}
/**
* delete the certain process definition version by version and process definition id
* delete the certain process definition version by version and process definition code
*
* @param loginUser login user info
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code the process definition code
* @param version the process definition version user want to delete
* @return delete version result code
*/
@ApiOperation(value = "deleteVersion", notes = "DELETE_PROCESS_DEFINITION_VERSION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Long", example = "100")
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/version/delete")
@ResponseStatus(HttpStatus.OK)
@ -338,9 +338,9 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteProcessDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "processDefinitionId") int processDefinitionId,
@RequestParam(value = "code") long code,
@RequestParam(value = "version") int version) {
Map<String, Object> result = processDefinitionService.deleteByProcessDefinitionIdAndVersion(loginUser, projectCode, processDefinitionId, version);
Map<String, Object> result = processDefinitionService.deleteProcessDefinitionVersion(loginUser, projectCode, code, version);
return returnDataList(result);
}
@ -399,12 +399,12 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionName process definition name
* @param name process definition name
* @return process definition detail
*/
@ApiOperation(value = "queryProcessDefinitionByName", notes = "QUERY_PROCESS_DEFINITION_BY_NAME_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionName", value = "PROCESS_DEFINITION_NAME", required = true, dataType = "String")
@ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, dataType = "String")
})
@GetMapping(value = "/select-by-name")
@ResponseStatus(HttpStatus.OK)
@ -412,8 +412,8 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<ProcessDefinition> queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionName") String processDefinitionName) {
Map<String, Object> result = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, processDefinitionName);
@RequestParam("name") String name) {
Map<String, Object> result = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, name);
return returnDataList(result);
}
@ -448,9 +448,9 @@ public class ProcessDefinitionController extends BaseController {
*/
@ApiOperation(value = "queryListPaging", notes = "QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
@GetMapping(value = "/list-paging")
@ -459,9 +459,9 @@ public class ProcessDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryProcessDefinitionListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
@ -469,7 +469,7 @@ public class ProcessDefinitionController extends BaseController {
}
searchVal = ParameterUtils.handleEscapes(searchVal);
return processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, searchVal, pageNo, pageSize, userId);
return processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, searchVal, userId, pageNo, pageSize);
}
/**
@ -493,7 +493,7 @@ public class ProcessDefinitionController extends BaseController {
public Result viewTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("code") long code,
@RequestParam("limit") Integer limit) throws Exception {
@RequestParam("limit") Integer limit) {
Map<String, Object> result = processDefinitionService.viewTree(code, limit);
return returnDataList(result);
}
@ -503,23 +503,20 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCode process definition code
* @param code process definition code
* @return task list
*/
@ApiOperation(value = "getNodeListByDefinitionCode", notes = "GET_NODE_LIST_BY_DEFINITION_CODE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100")
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100")
})
@GetMapping(value = "gen-task-list")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR)
public Result getNodeListByDefinitionCode(
@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result getNodeListByDefinitionCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionCode") long processDefinitionCode) {
logger.info("query task node name list by definitionCode, login user:{}, project name:{}, code : {}",
loginUser.getUserName(), projectCode, processDefinitionCode);
Map<String, Object> result = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, processDefinitionCode);
@RequestParam("code") long code) {
Map<String, Object> result = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, code);
return returnDataList(result);
}
@ -528,7 +525,7 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodes process definition codes
* @param codes process definition codes
* @return node list data
*/
@ApiOperation(value = "getNodeListByDefinitionCodes", notes = "GET_NODE_LIST_BY_DEFINITION_CODE_NOTES")
@ -540,31 +537,31 @@ public class ProcessDefinitionController extends BaseController {
@ApiException(GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR)
public Result getNodeListMapByDefinitionCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionCodes") String processDefinitionCodes) {
Map<String, Object> result = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, processDefinitionCodes);
@RequestParam("codes") String codes) {
Map<String, Object> result = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, codes);
return returnDataList(result);
}
/**
* delete process definition by id
* delete process definition by code
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @return delete result code
*/
@ApiOperation(value = "deleteByCode", notes = "DELETE_PROCESS_DEFINITION_BY_ID_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", dataType = "Int", example = "100")
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", dataType = "Int", example = "100")
})
@GetMapping(value = "/delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_PROCESS_DEFINE_BY_ID_ERROR)
@ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteProcessDefinitionById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result deleteProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionId") Integer processDefinitionId) {
Map<String, Object> result = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, processDefinitionId);
@RequestParam("code") long code) {
Map<String, Object> result = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, code);
return returnDataList(result);
}
@ -573,45 +570,43 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionIds process definition id list
* @param codes process definition code list
* @return delete result code
*/
@ApiOperation(value = "batchDeleteByCodes", notes = "BATCH_DELETE_PROCESS_DEFINITION_BY_IDS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionIds", value = "PROCESS_DEFINITION_IDS", type = "String")
@ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "String")
})
@GetMapping(value = "/batch-delete")
@ResponseStatus(HttpStatus.OK)
@ApiException(BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR)
@ApiException(BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result batchDeleteProcessDefinitionByIds(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result batchDeleteProcessDefinitionByCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionIds") String processDefinitionIds
) {
@RequestParam("codes") String codes) {
Map<String, Object> result = new HashMap<>();
List<String> deleteFailedIdList = new ArrayList<>();
if (StringUtils.isNotEmpty(processDefinitionIds)) {
String[] processDefinitionIdArray = processDefinitionIds.split(",");
for (String strProcessDefinitionId : processDefinitionIdArray) {
int processDefinitionId = Integer.parseInt(strProcessDefinitionId);
List<String> deleteFailedCodeList = new ArrayList<>();
if (StringUtils.isNotEmpty(codes)) {
String[] processDefinitionCodeArray = codes.split(",");
for (String strProcessDefinitionCode : processDefinitionCodeArray) {
long code = Long.parseLong(strProcessDefinitionCode);
try {
Map<String, Object> deleteResult = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, processDefinitionId);
Map<String, Object> deleteResult = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, code);
if (!Status.SUCCESS.equals(deleteResult.get(Constants.STATUS))) {
deleteFailedIdList.add(strProcessDefinitionId);
deleteFailedCodeList.add(strProcessDefinitionCode);
logger.error((String) deleteResult.get(Constants.MSG));
}
} catch (Exception e) {
deleteFailedIdList.add(strProcessDefinitionId);
deleteFailedCodeList.add(strProcessDefinitionCode);
}
}
}
if (!deleteFailedIdList.isEmpty()) {
putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR, String.join(",", deleteFailedIdList));
if (!deleteFailedCodeList.isEmpty()) {
putMsg(result, BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR, String.join(",", deleteFailedCodeList));
} else {
putMsg(result, Status.SUCCESS);
}
return returnDataList(result);
}
@ -620,22 +615,22 @@ public class ProcessDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodes process definition codes
* @param codes process definition codes
* @param response response
*/
@ApiOperation(value = "batchExportByCodes", notes = "BATCH_EXPORT_PROCESS_DEFINITION_BY_CODES_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "String")
@ApiImplicitParam(name = "codes", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "String")
})
@GetMapping(value = "/export")
@ResponseBody
@AccessLogAnnotation(ignoreRequestArgs = {"loginUser", "response"})
public void batchExportProcessDefinitionByCodes(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("processDefinitionCodes") String processDefinitionCodes,
@RequestParam("codes") String codes,
HttpServletResponse response) {
try {
processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, processDefinitionCodes, response);
processDefinitionService.batchExportProcessDefinitionByCodes(loginUser, projectCode, codes, response);
} catch (Exception e) {
logger.error(Status.BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR.getMsg(), e);
}
@ -680,5 +675,4 @@ public class ProcessDefinitionController extends BaseController {
Map<String, Object> result = processDefinitionService.importProcessDefinition(loginUser, projectCode, file);
return returnDataList(result);
}
}

100
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -95,15 +95,15 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionCode task definition code
* @param taskDefinitionJson task definition json
* @param code task definition code
* @param taskDefinitionJsonObj task definition json object
* @return update result code
*/
@ApiOperation(value = "update", notes = "UPDATE_TASK_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "taskDefinitionJson", value = "TASK_DEFINITION_JSON", required = true, type = "String")
@ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String")
})
@PostMapping(value = "/update")
@ResponseStatus(HttpStatus.OK)
@ -111,9 +111,9 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateTaskDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskDefinitionCode") long taskDefinitionCode,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson) {
Map<String, Object> result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskDefinitionCode, taskDefinitionJson);
@RequestParam(value = "code") long code,
@RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj) {
Map<String, Object> result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, code, taskDefinitionJsonObj);
return returnDataList(result);
}
@ -124,14 +124,14 @@ public class TaskDefinitionController extends BaseController {
* @param projectCode project code
* @param pageNo the task definition version list current page number
* @param pageSize the task definition version list page size
* @param taskDefinitionCode the task definition code
* @param code the task definition code
* @return the task definition version list
*/
@ApiOperation(value = "queryVersions", notes = "QUERY_TASK_DEFINITION_VERSIONS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
})
@GetMapping(value = "/versions")
@ResponseStatus(HttpStatus.OK)
@ -141,8 +141,8 @@ public class TaskDefinitionController extends BaseController {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "pageNo") int pageNo,
@RequestParam(value = "pageSize") int pageSize,
@RequestParam(value = "taskDefinitionCode") long taskDefinitionCode) {
Map<String, Object> result = taskDefinitionService.queryTaskDefinitionVersions(loginUser, projectCode, pageNo, pageSize, taskDefinitionCode);
@RequestParam(value = "code") long code) {
Map<String, Object> result = taskDefinitionService.queryTaskDefinitionVersions(loginUser, projectCode, pageNo, pageSize, code);
return returnDataList(result);
}
@ -151,13 +151,13 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user info
* @param projectCode project code
* @param taskDefinitionCode the task definition code
* @param code the task definition code
* @param version the version user want to switch
* @return switch version result code
*/
@ApiOperation(value = "switchVersion", notes = "SWITCH_TASK_DEFINITION_VERSION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/version/switch")
@ -166,9 +166,9 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result switchTaskDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskDefinitionCode") long taskDefinitionCode,
@RequestParam(value = "code") long code,
@RequestParam(value = "version") int version) {
Map<String, Object> result = taskDefinitionService.switchVersion(loginUser, projectCode, taskDefinitionCode, version);
Map<String, Object> result = taskDefinitionService.switchVersion(loginUser, projectCode, code, version);
return returnDataList(result);
}
@ -177,13 +177,13 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user info
* @param projectCode project code
* @param taskDefinitionCode the task definition code
* @param code the task definition code
* @param version the task definition version user want to delete
* @return delete version result code
*/
@ApiOperation(value = "deleteVersion", notes = "DELETE_TASK_DEFINITION_VERSION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "version", value = "VERSION", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/version/delete")
@ -192,9 +192,9 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteTaskDefinitionVersion(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskDefinitionCode") long taskDefinitionCode,
@RequestParam(value = "code") long code,
@RequestParam(value = "version") int version) {
Map<String, Object> result = taskDefinitionService.deleteByCodeAndVersion(loginUser, projectCode, taskDefinitionCode, version);
Map<String, Object> result = taskDefinitionService.deleteByCodeAndVersion(loginUser, projectCode, code, version);
return returnDataList(result);
}
@ -203,12 +203,12 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionCode the task definition code
* @param code the task definition code
* @return delete result code
*/
@ApiOperation(value = "deleteTaskDefinition", notes = "DELETE_TASK_DEFINITION_BY_CODE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
})
@GetMapping(value = "/delete")
@ResponseStatus(HttpStatus.OK)
@ -216,8 +216,8 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteTaskDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskDefinitionCode") long taskDefinitionCode) {
Map<String, Object> result = taskDefinitionService.deleteTaskDefinitionByCode(loginUser, projectCode, taskDefinitionCode);
@RequestParam(value = "code") long code) {
Map<String, Object> result = taskDefinitionService.deleteTaskDefinitionByCode(loginUser, projectCode, code);
return returnDataList(result);
}
@ -226,12 +226,12 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param taskDefinitionCode the task definition code
* @param code the task definition code
* @return task definition detail
*/
@ApiOperation(value = "queryTaskDefinitionDetail", notes = "QUERY_TASK_DEFINITION_DETAIL_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
})
@GetMapping(value = "/select-by-code")
@ResponseStatus(HttpStatus.OK)
@ -239,8 +239,8 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskDefinitionDetail(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskDefinitionCode") long taskDefinitionCode) {
Map<String, Object> result = taskDefinitionService.queryTaskDefinitionDetail(loginUser, projectCode, taskDefinitionCode);
@RequestParam(value = "code") long code) {
Map<String, Object> result = taskDefinitionService.queryTaskDefinitionDetail(loginUser, projectCode, code);
return returnDataList(result);
}
@ -250,16 +250,16 @@ public class TaskDefinitionController extends BaseController {
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param userId user id
* @return task definition page
*/
@ApiOperation(value = "queryTaskDefinitionListPaging", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
@GetMapping(value = "/list-paging")
@ -268,17 +268,55 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskDefinitionListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
}
searchVal = ParameterUtils.handleEscapes(searchVal);
return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, userId, pageNo, pageSize);
}
/**
* query task definition list paging by taskType
*
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param taskType taskType
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return task definition page
*/
@ApiOperation(value = "queryTaskDefinitionByTaskType", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, type = "String"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
@GetMapping(value = "/task-type-list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_DEFINITION_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskDefinitionByTaskType(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskType", required = true) String taskType,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
}
searchVal = ParameterUtils.handleEscapes(searchVal);
return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, pageNo, pageSize, userId);
return taskDefinitionService.queryTaskDefinitionByTaskType(loginUser, projectCode, taskType, searchVal, userId, pageNo, pageSize);
}
/**
@ -297,7 +335,7 @@ public class TaskDefinitionController extends BaseController {
@ApiException(LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result genTaskCodeList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer genNum) {
@RequestParam("genNum") Integer genNum) {
Map<String, Object> result = taskDefinitionService.genTaskCodeList(loginUser, genNum);
return returnDataList(result);
}

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -192,7 +192,7 @@ public enum Status {
BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"),
QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"),
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163, "delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
DELETE_PROCESS_DEFINITION_BY_CODE_FAIL(10163, "delete process definition by code fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
@ -253,11 +253,11 @@ public enum Status {
PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"),
PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"),
PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022, "delete process definition by id error", "删除工作流定义错误"),
DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"),
SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already on line", "调度配置[{0}]已上线"),
DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"),
BATCH_DELETE_PROCESS_DEFINE_ERROR(50025, "batch delete process definition error", "批量删除工作流定义错误"),
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"),
BATCH_DELETE_PROCESS_DEFINE_BY_CODES_ERROR(50026, "batch delete process definition by codes {0} error", "批量删除工作流定义[{0}]错误"),
TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),

71
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -27,8 +27,6 @@ import javax.servlet.http.HttpServletResponse;
import org.springframework.web.multipart.MultipartFile;
import com.fasterxml.jackson.core.JsonProcessingException;
/**
* process definition service
*/
@ -46,8 +44,8 @@ public interface ProcessDefinitionService {
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
* @throws JsonProcessingException JsonProcessingException
*/
Map<String, Object> createProcessDefinition(User loginUser,
long projectCode,
@ -57,7 +55,8 @@ public interface ProcessDefinitionService {
String locations,
int timeout,
String tenantCode,
String taskRelationJson) throws JsonProcessingException;
String taskRelationJson,
String taskDefinitionJson);
/**
* query process definition list
@ -83,9 +82,9 @@ public interface ProcessDefinitionService {
Result queryProcessDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize,
Integer userId);
Integer pageSize);
/**
* query detail of process definition
@ -101,29 +100,29 @@ public interface ProcessDefinitionService {
long code);
/**
* query datail of process definition
* query detail of process definition
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionName process definition name
* @param name process definition name
* @return process definition detail
*/
Map<String, Object> queryProcessDefinitionByName(User loginUser,
long projectCode,
String processDefinitionName);
String name);
/**
* batch copy process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param processDefinitionCodes processDefinitionCodes
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
Map<String, Object> batchCopyProcessDefinition(User loginUser,
long projectCode,
String processDefinitionCodes,
String codes,
long targetProjectCode);
/**
@ -131,12 +130,12 @@ public interface ProcessDefinitionService {
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param processDefinitionCodes processDefinitionCodes
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
Map<String, Object> batchMoveProcessDefinition(User loginUser,
long projectCode,
String processDefinitionCodes,
String codes,
long targetProjectCode);
/**
@ -152,6 +151,7 @@ public interface ProcessDefinitionService {
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
Map<String, Object> updateProcessDefinition(User loginUser,
@ -163,7 +163,8 @@ public interface ProcessDefinitionService {
String locations,
int timeout,
String tenantCode,
String taskRelationJson);
String taskRelationJson,
String taskDefinitionJson);
/**
* verify process definition name unique
@ -178,16 +179,16 @@ public interface ProcessDefinitionService {
String name);
/**
* delete process definition by id
* delete process definition by code
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @return delete result code
*/
Map<String, Object> deleteProcessDefinitionById(User loginUser,
Map<String, Object> deleteProcessDefinitionByCode(User loginUser,
long projectCode,
Integer processDefinitionId);
long code);
/**
* release process definition: online / offline
@ -208,12 +209,12 @@ public interface ProcessDefinitionService {
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionCodes process definition codes
* @param codes process definition codes
* @param response http servlet response
*/
void batchExportProcessDefinitionByCodes(User loginUser,
long projectCode,
String processDefinitionCodes,
String codes,
HttpServletResponse response);
/**
@ -241,24 +242,24 @@ public interface ProcessDefinitionService {
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCode define code
* @param code processDefinition code
* @return task node list
*/
Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser,
long projectCode,
long defineCode);
long code);
/**
* get task node details map based on process definition
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCodeList define code list
* @param codes define code list
* @return task node list
*/
Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser,
long projectCode,
String defineCodeList);
String codes);
/**
* query process definition all by project code
@ -274,23 +275,21 @@ public interface ProcessDefinitionService {
* @param code process definition code
* @param limit limit
* @return tree view json data
* @throws Exception exception
*/
Map<String, Object> viewTree(long code,
Integer limit) throws Exception;
Map<String, Object> viewTree(long code, Integer limit);
/**
* switch the defined process definition verison
* switch the defined process definition version
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @param version the version user want to switch
* @return switch process definition version result code
*/
Map<String, Object> switchProcessDefinitionVersion(User loginUser,
long projectCode,
int processDefinitionId,
long code,
int version);
/**
@ -300,27 +299,27 @@ public interface ProcessDefinitionService {
* @param projectCode project code
* @param pageNo page number
* @param pageSize page size
* @param processDefinitionCode process definition code
* @param code process definition code
* @return the pagination process definition versions info of the certain process definition
*/
Result queryProcessDefinitionVersions(User loginUser,
long projectCode,
int pageNo,
int pageSize,
long processDefinitionCode);
long code);
/**
* delete one certain process definition by version number and process definition id
* delete one certain process definition by version number and process definition code
*
* @param loginUser login user info to check auth
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @param version version number
* @return delele result code
*/
Map<String, Object> deleteByProcessDefinitionIdAndVersion(User loginUser,
Map<String, Object> deleteProcessDefinitionVersion(User loginUser,
long projectCode,
int processDefinitionId,
long code,
int version);
}

30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -66,12 +66,12 @@ public interface TaskDefinitionService {
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param taskDefinitionJson task definition json
* @param taskDefinitionJsonObj task definition json object
*/
Map<String, Object> updateTaskDefinition(User loginUser,
long projectCode,
long taskCode,
String taskDefinitionJson);
String taskDefinitionJsonObj);
/**
* update task definition
@ -134,17 +134,37 @@ public interface TaskDefinitionService {
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param userId user id
* @return task definition page
*/
Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize);
/**
* query task definition list paging
*
* @param loginUser login user
* @param projectCode project code
* @param taskType taskType
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return task definition page
*/
Result queryTaskDefinitionByTaskType(User loginUser,
long projectCode,
String taskType,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize,
Integer userId);
Integer pageSize);
/**
* gen task code list

138
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.FileUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -111,18 +112,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class);
private static final String PROCESSDEFINITIONCODE = "processDefinitionCode";
private static final String RELEASESTATE = "releaseState";
private static final String TASKS = "tasks";
@Autowired
private ProjectMapper projectMapper;
@Autowired
private ProjectService projectService;
@Autowired
private TaskDefinitionService taskDefinitionService;
@Autowired
private UserMapper userMapper;
@ -174,6 +174,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@Override
@ -186,7 +187,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String locations,
int timeout,
String tenantCode,
String taskRelationJson) {
String taskRelationJson,
String taskDefinitionJson) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -213,6 +215,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson);
long processDefinitionCode;
try {
processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
@ -314,20 +318,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param userId user id
* @return process definition page
*/
@Override
public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal, Integer pageNo, Integer pageSize, Integer userId) {
public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal, Integer userId, Integer pageNo, Integer pageSize) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result,resultStatus);
putMsg(result, resultStatus);
return result;
}
@ -380,17 +384,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
@Override
public Map<String, Object> queryProcessDefinitionByName(User loginUser, long projectCode, String processDefinitionName) {
public Map<String, Object> queryProcessDefinitionByName(User loginUser, long projectCode, String name) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, name);
} else {
DagData dagData = processService.genDagData(processDefinition);
result.put(Constants.DATA_LIST, dagData);
@ -412,8 +416,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@Transactional(rollbackFor = Exception.class)
@Override
public Map<String, Object> updateProcessDefinition(User loginUser,
long projectCode,
@ -424,7 +430,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
String locations,
int timeout,
String tenantCode,
String taskRelationJson) {
String taskRelationJson,
String taskDefinitionJson) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -463,7 +470,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
}
taskDefinitionService.createTaskDefinition(loginUser, projectCode, taskDefinitionJson);
processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenant.getId());
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition);
}
@ -515,30 +522,25 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
/**
* delete process definition by id
* delete process definition by code
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @return delete result code
*/
@Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteProcessDefinitionById(User loginUser, long projectCode, Integer processDefinitionId) {
public Map<String, Object> deleteProcessDefinitionByCode(User loginUser, long projectCode, long code) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId);
// TODO: replace id to code
// ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@ -550,21 +552,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// check process definition is already online
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId);
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, code);
return result;
}
// check process instances is already running
List<ProcessInstance> processInstances = processInstanceService.queryByProcessDefineCodeAndStatus(processDefinition.getCode(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_ID_FAIL, processInstances.size());
putMsg(result, Status.DELETE_PROCESS_DEFINITION_BY_CODE_FAIL, processInstances.size());
return result;
}
// get the timing according to the process definition
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(processDefinitionId);
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionCode(code);
if (!schedules.isEmpty() && schedules.size() > 1) {
logger.warn("scheduler num is {},Greater than 1", schedules.size());
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
return result;
} else if (schedules.size() == 1) {
Schedule schedule = schedules.get(0);
@ -576,12 +578,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
int delete = processDefinitionMapper.deleteById(processDefinitionId);
int delete = processDefinitionMapper.deleteById(processDefinition.getId());
processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
return result;
}
@ -660,8 +662,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* batch export process definition by codes
*/
@Override
public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String processDefinitionCodes, HttpServletResponse response) {
if (StringUtils.isEmpty(processDefinitionCodes)) {
public void batchExportProcessDefinitionByCodes(User loginUser, long projectCode, String codes, HttpServletResponse response) {
if (StringUtils.isEmpty(codes)) {
return;
}
Project project = projectMapper.queryByCode(projectCode);
@ -670,7 +672,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return;
}
Set<Long> defineCodeSet = Lists.newArrayList(processDefinitionCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
List<DagDataSchedule> dagDataSchedules = processDefinitionList.stream().map(this::exportProcessDagData).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
@ -931,21 +933,21 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCode define code
* @param code process definition code
* @return task node list
*/
@Override
public Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long defineCode) {
public Map<String, Object> getTaskNodeListByDefinitionCode(User loginUser, long projectCode, long code) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(defineCode);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCode);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
DagData dagData = processService.genDagData(processDefinition);
@ -960,11 +962,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser loginUser
* @param projectCode project code
* @param defineCodes define codes
* @param codes define codes
* @return task node list
*/
@Override
public Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String defineCodes) {
public Map<String, Object> getNodeListMapByDefinitionCodes(User loginUser, long projectCode, String codes) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -972,11 +974,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
if (CollectionUtils.isEmpty(processDefinitionList)) {
logger.info("process definition not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineCodes);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
Map<Long, List<TaskDefinitionLog>> taskNodeMap = new HashMap<>();
@ -1162,20 +1164,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param processDefinitionCodes processDefinitionCodes
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
public Map<String, Object> batchCopyProcessDefinition(User loginUser,
long projectCode,
String processDefinitionCodes,
String codes,
long targetProjectCode) {
Map<String, Object> result = checkParams(loginUser, projectCode, processDefinitionCodes, targetProjectCode);
Map<String, Object> result = checkParams(loginUser, projectCode, codes, targetProjectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
List<String> failedProcessList = new ArrayList<>();
doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, processDefinitionCodes, result, true);
doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, true);
checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, true);
return result;
}
@ -1185,15 +1187,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param processDefinitionCodes processDefinitionCodes
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
public Map<String, Object> batchMoveProcessDefinition(User loginUser,
long projectCode,
String processDefinitionCodes,
String codes,
long targetProjectCode) {
Map<String, Object> result = checkParams(loginUser, projectCode, processDefinitionCodes, targetProjectCode);
Map<String, Object> result = checkParams(loginUser, projectCode, codes, targetProjectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -1201,7 +1203,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
List<String> failedProcessList = new ArrayList<>();
doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, processDefinitionCodes, result, false);
doBatchOperateProcessDefinition(loginUser, targetProjectCode, failedProcessList, codes, result, false);
checkBatchOperateResult(projectCode, targetProjectCode, result, failedProcessList, false);
return result;
}
@ -1267,12 +1269,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*
* @param loginUser login user
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @param version the version user want to switch
* @return switch process definition version result code
*/
@Override
public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, int processDefinitionId, int version) {
public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -1280,22 +1282,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (Objects.isNull(processDefinition)) {
putMsg(result
, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR
, processDefinitionId);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
return result;
}
ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper
.queryByDefinitionCodeAndVersion(processDefinition.getCode(), version);
.queryByDefinitionCodeAndVersion(code, version);
if (Objects.isNull(processDefinitionLog)) {
putMsg(result
, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR
, processDefinition.getCode()
, version);
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR, processDefinition.getCode(), version);
return result;
}
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
@ -1336,23 +1333,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param projectCode project code
* @param pageNo page number
* @param pageSize page size
* @param processDefinitionCode process definition code
* @param code process definition code
* @return the pagination process definition versions info of the certain process definition
*/
@Override
public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, long processDefinitionCode) {
public Result queryProcessDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, long code) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result,resultStatus);
putMsg(result, resultStatus);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
PageInfo<ProcessDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<ProcessDefinitionLog> page = new Page<>(pageNo, pageSize);
@ -1362,34 +1358,34 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
pageInfo.setTotalList(processDefinitionLogs);
pageInfo.setTotal((int) processDefinitionVersionsPaging.getTotal());
result.setData(pageInfo);
putMsg(result,Status.SUCCESS);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* delete one certain process definition by version number and process definition id
* delete one certain process definition by version number and process definition code
*
* @param loginUser login user info to check auth
* @param projectCode project code
* @param processDefinitionId process definition id
* @param code process definition code
* @param version version number
* @return delele result code
*/
@Override
public Map<String, Object> deleteByProcessDefinitionIdAndVersion(User loginUser, long projectCode, int processDefinitionId, int version) {
public Map<String, Object> deleteProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else {
processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(processDefinition.getCode(), version);
processDefinitionLogMapper.deleteByProcessDefinitionCodeAndVersion(code, version);
putMsg(result, Status.SUCCESS);
}
return result;

124
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -17,15 +17,12 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils.SnowFlakeException;
@ -109,37 +106,79 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
int totalSuccessNumber = 0;
List<Long> totalSuccessCode = new ArrayList<>();
Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
checkTaskDefinition(result, taskDefinitionLog);
if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setOperator(loginUser.getId());
if (taskDefinitionLog.getCode() > 0 && taskDefinitionLog.getVersion() > 0) {
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion != null) {
if (!taskDefinitionLog.equals(definitionCodeAndVersion)) {
taskDefinitionLog.setUserId(definitionCodeAndVersion.getUserId());
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinitionLog.getCode());
if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionLog.getCode());
return result;
}
taskDefinitionLog.setVersion(version + 1);
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
totalSuccessCode.add(taskDefinitionLog.getCode());
}
continue;
}
}
taskDefinitionLog.setUserId(loginUser.getId());
taskDefinitionLog.setVersion(1);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
long code = 0L;
totalSuccessCode.add(taskDefinitionLog.getCode());
newTaskDefinitionLogs.add(taskDefinitionLog);
if (taskDefinitionLog.getCode() == 0) {
long code;
try {
code = SnowFlakeUtils.getInstance().nextId();
taskDefinitionLog.setVersion(1);
taskDefinitionLog.setCode(code);
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
return result;
}
taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now);
totalSuccessCode.add(code);
}
totalSuccessCode.add(taskDefinitionLog.getCode());
newTaskDefinitionLogs.add(taskDefinitionLog);
totalSuccessNumber++;
}
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMapper.queryByDefinitionCode(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result;
}
}
}
if (!newTaskDefinitionLogs.isEmpty()) {
int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
if ((logInsert & insert) == 0) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result;
}
}
Map<String, Object> resData = new HashMap<>();
resData.put("total", totalSuccessNumber);
resData.put("code", totalSuccessCode);
@ -214,11 +253,11 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* @param loginUser login user
* @param projectCode project code
* @param taskCode task code
* @param taskDefinitionJson task definition json
* @param taskDefinitionJsonObj task definition json object
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJson) {
public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
@ -234,19 +273,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJson, TaskDefinitionLog.class);
checkTaskDefinition(result, taskDefinitionToUpdate);
if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinitionToUpdate == null) {
logger.error("taskDefinitionJson is not valid json");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return result;
}
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) {
logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
return result;
}
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
return result;
}
Date now = new Date();
taskDefinitionToUpdate.setCode(taskDefinition.getCode());
taskDefinitionToUpdate.setCode(taskCode);
taskDefinitionToUpdate.setId(taskDefinition.getId());
taskDefinitionToUpdate.setProjectCode(projectCode);
taskDefinitionToUpdate.setUserId(taskDefinition.getUserId());
taskDefinitionToUpdate.setVersion(version == null || version == 0 ? 1 : version + 1);
taskDefinitionToUpdate.setVersion(version + 1);
taskDefinitionToUpdate.setTaskType(taskDefinitionToUpdate.getTaskType().toUpperCase());
taskDefinitionToUpdate.setResourceIds(processService.getResourceIds(taskDefinitionToUpdate));
taskDefinitionToUpdate.setUpdateTime(now);
@ -264,25 +312,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
public void checkTaskNode(Map<String, Object> result, TaskNode taskNode, String taskDefinitionJson) {
if (taskNode == null) {
logger.error("taskDefinitionJson is not valid json");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return;
}
if (!CheckUtils.checkTaskNodeParameters(taskNode)) {
logger.error("task node {} parameter invalid", taskNode.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
}
}
private void checkTaskDefinition(Map<String, Object> result, TaskDefinition taskDefinition) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinition)) {
logger.error("task definition {} parameter invalid", taskDefinition.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
}
}
/**
* update task definition
*
@ -336,9 +365,20 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
public Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize) {
return null;
}
@Override
public Result queryTaskDefinitionByTaskType(User loginUser,
long projectCode,
String taskType,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize,
Integer userId) {
Integer pageSize) {
return null;
}

60
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -71,11 +71,17 @@ public class ProcessDefinitionControllerTest {
}
@Test
public void testCreateProcessDefinition() throws Exception {
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
public void testCreateProcessDefinition() {
String relationJson = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
long projectCode = 1L;
String name = "dag_test";
String description = "desc test";
@ -88,10 +94,10 @@ public class ProcessDefinitionControllerTest {
result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, json)).thenReturn(result);
locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result);
Result response = processDefinitionController.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, json);
locations, timeout, tenantCode, relationJson, taskDefinitionJson);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@ -128,9 +134,16 @@ public class ProcessDefinitionControllerTest {
@Test
public void updateProcessDefinition() {
String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]";
String relationJson = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
long projectCode = 1L;
String name = "dag_test";
@ -144,10 +157,10 @@ public class ProcessDefinitionControllerTest {
result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, json)).thenReturn(result);
locations, timeout, tenantCode, relationJson, taskDefinitionJson)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, json, ReleaseState.OFFLINE);
locations, timeout, tenantCode, relationJson, taskDefinitionJson, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@ -265,15 +278,15 @@ public class ProcessDefinitionControllerTest {
}
@Test
public void testDeleteProcessDefinitionById() {
public void testDeleteProcessDefinitionByCode() {
long projectCode = 1L;
int id = 1;
long code = 1L;
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.deleteProcessDefinitionById(user, projectCode, id)).thenReturn(result);
Result response = processDefinitionController.deleteProcessDefinitionById(user, projectCode, id);
Mockito.when(processDefinitionService.deleteProcessDefinitionByCode(user, projectCode, code)).thenReturn(result);
Result response = processDefinitionController.deleteProcessDefinitionByCode(user, projectCode, code);
Assert.assertTrue(response != null && response.isSuccess());
}
@ -333,7 +346,7 @@ public class ProcessDefinitionControllerTest {
}
@Test
public void testQueryProcessDefinitionListPaging() throws Exception {
public void testQueryProcessDefinitionListPaging() {
long projectCode = 1L;
int pageNo = 1;
int pageSize = 10;
@ -344,8 +357,8 @@ public class ProcessDefinitionControllerTest {
putMsg(result, Status.SUCCESS);
result.setData(new PageInfo<Resource>(1, 10));
Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, searchVal, pageNo, pageSize, userId)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectCode, pageNo, searchVal, userId, pageSize);
Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, searchVal, userId, pageNo, pageSize)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectCode, searchVal, userId, pageNo, pageSize);
Assert.assertTrue(response != null && response.isSuccess());
}
@ -399,17 +412,10 @@ public class ProcessDefinitionControllerTest {
long projectCode = 1L;
Map<String, Object> resultMap = new HashMap<>();
putMsg(resultMap, Status.SUCCESS);
Mockito.when(processDefinitionService.deleteByProcessDefinitionIdAndVersion(
user
, projectCode
, 1
, 10))
.thenReturn(resultMap);
Mockito.when(processDefinitionService.deleteProcessDefinitionVersion(
user, projectCode, 1, 10)).thenReturn(resultMap);
Result result = processDefinitionController.deleteProcessDefinitionVersion(
user
, projectCode
, 1
, 10);
user, projectCode, 1, 10);
Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
}

41
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -143,7 +143,7 @@ public class ProcessDefinitionServiceTest {
//project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Result map = processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", 1, 5, 0);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT.getCode(), (int)map.getCode());
Assert.assertEquals(Status.PROJECT_NOT_FOUNT.getCode(), (int) map.getCode());
putMsg(result, Status.SUCCESS, projectCode);
loginUser.setId(1);
@ -310,7 +310,7 @@ public class ProcessDefinitionServiceTest {
}
@Test
public void deleteProcessDefinitionByIdTest() {
public void deleteProcessDefinitionByCodeTest() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
@ -323,14 +323,14 @@ public class ProcessDefinitionServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUNT, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 6);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 6L);
Assert.assertEquals(Status.PROJECT_NOT_FOUNT, map.get(Constants.STATUS));
//project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processDefineMapper.selectById(1)).thenReturn(null);
Map<String, Object> instanceNotExitRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 1);
Mockito.when(processDefineMapper.queryByCode(1L)).thenReturn(null);
Map<String, Object> instanceNotExitRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 1L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
ProcessDefinition processDefinition = getProcessDefinition();
@ -338,8 +338,8 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
//user no auth
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> userNoAuthRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> userNoAuthRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
//process definition online
@ -347,21 +347,21 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dfOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.PROCESS_DEFINE_STATE_ONLINE, dfOnlineRes.get(Constants.STATUS));
//scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.selectById(46)).thenReturn(processDefinition);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
List<Schedule> schedules = new ArrayList<>();
schedules.add(getSchedule());
schedules.add(getSchedule());
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, schedulerGreaterThanOneRes.get(Constants.STATUS));
//scheduler online
schedules.clear();
@ -370,8 +370,8 @@ public class ProcessDefinitionServiceTest {
schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
//delete fail
@ -380,16 +380,17 @@ public class ProcessDefinitionServiceTest {
schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46)).thenReturn(schedules);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR, deleteFail.get(Constants.STATUS));
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, deleteFail.get(Constants.STATUS));
//delete success
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionById(loginUser, projectCode, 46);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
}
@ -594,7 +595,7 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
"", "", "", 0, "root", null);
"", "", "", 0, "root", null, null);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
}

27
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -151,23 +151,13 @@ public class TaskDefinitionServiceImplTest {
@Test
public void updateTaskDefinition () {
String updateTaskDefinitionJson = "{\n"
+ "\"name\": \"test12111\",\n"
+ "\"description\": \"test\",\n"
+ "\"taskType\": \"SHELL\",\n"
+ "\"flag\": 0,\n"
+ "\"taskParams\": \"{\\\"resourceList\\\":[],\\\"localParams\\\":[],\\\"rawScript\\\":\\\"echo 11\\\",\\\"conditionResult\\\": "
+ "{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]},\\\"dependence\\\":{}}\",\n"
+ "\"taskPriority\": 0,\n"
+ "\"workerGroup\": \"default\",\n"
+ "\"failRetryTimes\": 0,\n"
+ "\"failRetryInterval\": 1,\n"
+ "\"timeoutFlag\": 1,\n"
+ "\"timeoutNotifyStrategy\": 0,\n"
+ "\"timeout\": 0,\n"
+ "\"delayTime\": 0,\n"
+ "\"resourceIds\": \"\"\n"
+ "}";
String taskDefinitionJson = "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}";
long projectCode = 1L;
long taskCode = 1L;
@ -186,7 +176,8 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionMapper.queryByDefinitionCode(taskCode)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, updateTaskDefinitionJson);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}

29
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import com.baomidou.mybatisplus.annotation.IdType;
@ -400,6 +401,34 @@ public class TaskDefinition {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
}
@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
TaskDefinition that = (TaskDefinition) o;
return failRetryTimes == that.failRetryTimes
&& failRetryInterval == that.failRetryInterval
&& timeout == that.timeout
&& delayTime == that.delayTime
&& Objects.equals(name, that.name)
&& Objects.equals(description, that.description)
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
&& timeoutNotifyStrategy == that.timeoutNotifyStrategy
&& Objects.equals(resourceIds, that.resourceIds);
}
@Override
public int hashCode() {
return Objects.hash(name, description, taskType, taskParams, flag, taskPriority, workerGroup, failRetryTimes,
failRetryInterval, timeoutFlag, timeoutNotifyStrategy, timeout, delayTime, resourceIds);
}
@Override
public String toString() {
return "TaskDefinition{"

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -88,6 +88,16 @@ public class TaskDefinitionLog extends TaskDefinition {
this.operateTime = operateTime;
}
@Override
public boolean equals(Object o) {
return super.equals(o);
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public String toString() {
return super.toString();

Loading…
Cancel
Save