From 9c7ad6df0acc151f35614030d769ed8152142551 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 24 Nov 2023 10:53:23 +0800 Subject: [PATCH] [Improvement] Clean up Scheduler logic (#15198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Clean up Scheduler logic * Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java Co-authored-by: 旺阳 --------- Co-authored-by: 旺阳 --- .../ProcessDefinitionController.java | 150 +---- .../api/controller/SchedulerController.java | 37 +- .../api/python/PythonGateway.java | 28 +- .../api/service/ProcessDefinitionService.java | 103 +--- .../api/service/SchedulerService.java | 43 +- .../api/service/impl/ExecutorServiceImpl.java | 3 +- .../impl/ProcessDefinitionServiceImpl.java | 545 +++--------------- .../impl/ProcessTaskRelationServiceImpl.java | 12 +- .../service/impl/SchedulerServiceImpl.java | 216 +++---- .../impl/TaskDefinitionServiceImpl.java | 16 +- .../ProcessDefinitionControllerTest.java | 12 +- .../controller/SchedulerControllerTest.java | 10 +- .../service/ExecuteFunctionServiceTest.java | 5 +- .../service/ProcessDefinitionServiceTest.java | 45 +- .../ProcessTaskRelationServiceTest.java | 6 +- .../api/service/SchedulerServiceTest.java | 63 -- .../TaskDefinitionServiceImplTest.java | 4 +- .../dao/mapper/ProcessTaskRelationMapper.java | 4 +- .../dao/mapper/ProcessTaskRelationMapper.xml | 3 +- .../mapper/ProcessTaskRelationMapperTest.java | 2 +- .../service/process/ProcessService.java | 2 +- .../service/process/ProcessServiceImpl.java | 62 +- .../service/process/ProcessServiceTest.java | 110 ++-- 23 files changed, 310 insertions(+), 1171 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 1ccd92dab6..a7701d25b3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -242,13 +242,12 @@ public class ProcessDefinitionController extends BaseController { @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson, @RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson, - @RequestParam(value = "otherParamsJson", required = false) String otherParamsJson, @RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType, @RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) { Map result = processDefinitionService.updateProcessDefinition(loginUser, projectCode, name, code, description, globalParams, - locations, timeout, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType); + locations, timeout, taskRelationJson, taskDefinitionJson, executionType); // If the update fails, the result will be returned directly if (result.get(Constants.STATUS) != Status.SUCCESS) { return returnDataList(result); @@ -256,7 +255,7 @@ public class ProcessDefinitionController extends BaseController { // Judge whether to go online after editing,0 means offline, 1 means online if (releaseState == ReleaseState.ONLINE) { - result = processDefinitionService.releaseProcessDefinition(loginUser, projectCode, code, releaseState); + processDefinitionService.onlineWorkflowDefinition(loginUser, projectCode, code); } return returnDataList(result); } @@ -343,15 +342,6 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } - /** - * release process definition - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @param releaseState release state - * @return release result code - */ @Operation(summary = "release", description = "RELEASE_PROCESS_DEFINITION_NOTES") @Parameters({ @Parameter(name = "name", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = String.class)), @@ -361,13 +351,22 @@ public class ProcessDefinitionController extends BaseController { @PostMapping(value = "/{code}/release") @ResponseStatus(HttpStatus.OK) @ApiException(RELEASE_PROCESS_DEFINITION_ERROR) - public Result releaseProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code", required = true) long code, - @RequestParam(value = "releaseState", required = true) ReleaseState releaseState) { - Map result = - processDefinitionService.releaseProcessDefinition(loginUser, projectCode, code, releaseState); - return returnDataList(result); + public Result releaseProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code", required = true) long workflowDefinitionCode, + @RequestParam(value = "releaseState", required = true) ReleaseState releaseState) { + switch (releaseState) { + case ONLINE: + processDefinitionService.onlineWorkflowDefinition(loginUser, projectCode, workflowDefinitionCode); + break; + case OFFLINE: + processDefinitionService.offlineWorkflowDefinition(loginUser, projectCode, workflowDefinitionCode); + break; + default: + throw new IllegalArgumentException( + "The releaseState " + releaseState + " is illegal, please check it."); + } + return Result.success(true); } /** @@ -712,119 +711,6 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } - /** - * create empty process definition - * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param scheduleJson scheduleJson - * @return process definition code - */ - @Operation(summary = "createEmptyProcessDefinition", description = "CREATE_EMPTY_PROCESS_NOTES") - @Parameters({ - @Parameter(name = "name", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = String.class)), - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")), - @Parameter(name = "description", description = "PROCESS_DEFINITION_DESC", required = false, schema = @Schema(implementation = String.class)) - }) - @PostMapping(value = "/empty") - @ResponseStatus(HttpStatus.OK) - @ApiException(CREATE_PROCESS_DEFINITION_ERROR) - public Result createEmptyProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "name", required = true) String name, - @RequestParam(value = "description", required = false) String description, - @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams, - @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, - @RequestParam(value = "scheduleJson", required = false) String scheduleJson, - @RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType) { - return returnDataList(processDefinitionService.createEmptyProcessDefinition(loginUser, projectCode, name, - description, globalParams, - timeout, scheduleJson, executionType)); - } - - /** - * update process definition basic info, not including task definition, task relation and location. - * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param scheduleJson scheduleJson - * @param executionType executionType - * @param releaseState releaseState - * @param otherParamsJson otherParamsJson handle other params - * @return update result code - */ - @Operation(summary = "updateBasicInfo", description = "UPDATE_PROCESS_DEFINITION_BASIC_INFO_NOTES") - @Parameters({ - @Parameter(name = "name", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = String.class)), - @Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")), - @Parameter(name = "description", description = "PROCESS_DEFINITION_DESC", required = false, schema = @Schema(implementation = String.class)), - @Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = false, schema = @Schema(implementation = ReleaseState.class)), - @Parameter(name = "otherParamsJson", description = "OTHER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class)) - }) - @PutMapping(value = "/{code}/basic-info") - @ResponseStatus(HttpStatus.OK) - @ApiException(UPDATE_PROCESS_DEFINITION_ERROR) - public Result updateProcessDefinitionBasicInfo(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @RequestParam(value = "name", required = true) String name, - @PathVariable(value = "code", required = true) long code, - @RequestParam(value = "description", required = false) String description, - @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams, - @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, - @RequestParam(value = "scheduleJson", required = false) String scheduleJson, - @RequestParam(value = "otherParamsJson", required = false) String otherParamsJson, - @RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType, - @RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) { - Map result = processDefinitionService.updateProcessDefinitionBasicInfo(loginUser, projectCode, - name, code, description, globalParams, - timeout, scheduleJson, otherParamsJson, executionType); - // If the update fails, the result will be returned directly - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return returnDataList(result); - } - - // Judge whether to go online after editing,0 means offline, 1 means online - if (releaseState == ReleaseState.ONLINE) { - result = processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState); - } - return returnDataList(result); - } - - /** - * release process definition and schedule - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @param releaseState releaseState - * @return update result code - */ - @Operation(summary = "releaseWorkflowAndSchedule", description = "RELEASE_WORKFLOW_SCHEDULE_NOTES") - @Parameters({ - @Parameter(name = "projectCode", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = long.class)), - @Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")), - @Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = true, schema = @Schema(implementation = ReleaseState.class)) - }) - @PostMapping(value = "/{code}/release-workflow") - @ResponseStatus(HttpStatus.OK) - @ApiException(RELEASE_PROCESS_DEFINITION_ERROR) - public Result releaseWorkflowAndSchedule(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code", required = true) long code, - @RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) { - return returnDataList( - processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState)); - } - /** * query process definition global variables and local variables * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java index 57b505c650..3158939ef1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java @@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; @@ -180,46 +179,30 @@ public class SchedulerController extends BaseController { return returnDataList(result); } - /** - * publish schedule setScheduleState - * - * @param loginUser login user - * @param projectCode project code - * @param id scheduler id - * @return publish result code - */ @Operation(summary = "online", description = "ONLINE_SCHEDULE_NOTES") @Parameters({ @Parameter(name = "id", description = "SCHEDULE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")) }) @PostMapping("/{id}/online") @ApiException(PUBLISH_SCHEDULE_ONLINE_ERROR) - public Result publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable("id") Integer id) { - schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE); - return Result.success(); + public Result publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable("id") Integer id) { + schedulerService.onlineScheduler(loginUser, projectCode, id); + return Result.success(true); } - /** - * offline schedule - * - * @param loginUser login user - * @param projectCode project code - * @param id schedule id - * @return operation result code - */ @Operation(summary = "offline", description = "OFFLINE_SCHEDULE_NOTES") @Parameters({ @Parameter(name = "id", description = "SCHEDULE_ID", required = true, schema = @Schema(implementation = int.class, example = "100")) }) @PostMapping("/{id}/offline") @ApiException(OFFLINE_SCHEDULE_ERROR) - public Result offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable("id") Integer id) { - schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE); - return Result.success(); + public Result offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable("id") Integer id) { + schedulerService.offlineScheduler(loginUser, projectCode, id); + return Result.success(true); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 2451a9de6e..1e4e1f5aa0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -265,11 +265,10 @@ public class PythonGateway { if (processDefinition != null) { processDefinitionCode = processDefinition.getCode(); // make sure workflow offline which could edit - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, - ReleaseState.OFFLINE); + processDefinitionService.offlineWorkflowDefinition(user, projectCode, processDefinitionCode); processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, - null, timeout, taskRelationJson, taskDefinitionJson, otherParamsJson, + null, timeout, taskRelationJson, taskDefinitionJson, executionTypeEnum); } else { Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, @@ -290,8 +289,11 @@ public class PythonGateway { warningType, warningGroupId); } - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, - ReleaseState.getEnum(releaseState)); + if (ReleaseState.ONLINE.equals(ReleaseState.getEnum(releaseState))) { + processDefinitionService.onlineWorkflowDefinition(user, projectCode, processDefinitionCode); + } else if (ReleaseState.OFFLINE.equals(ReleaseState.getEnum(releaseState))) { + processDefinitionService.offlineWorkflowDefinition(user, projectCode, processDefinitionCode); + } return processDefinitionCode; } @@ -346,8 +348,7 @@ public class PythonGateway { // create or update schedule int scheduleId; if (scheduleObj == null) { - processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, - ReleaseState.ONLINE); + processDefinitionService.onlineWorkflowDefinition(user, projectCode, workflowCode); Map result = schedulerService.insertSchedule(user, projectCode, workflowCode, schedule, WarningType.valueOf(warningType), warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, user.getTenantCode(), @@ -355,16 +356,15 @@ public class PythonGateway { scheduleId = (int) result.get("scheduleId"); } else { scheduleId = scheduleObj.getId(); - processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, - ReleaseState.OFFLINE); + processDefinitionService.offlineWorkflowDefinition(user, projectCode, workflowCode); schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType), warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, user.getTenantCode(), DEFAULT_ENVIRONMENT_CODE); } if (onlineSchedule) { // set workflow online to make sure we can set schedule online - processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE); - schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); + processDefinitionService.onlineWorkflowDefinition(user, projectCode, workflowCode); + schedulerService.onlineScheduler(user, projectCode, scheduleId); } } @@ -382,8 +382,7 @@ public class PythonGateway { processDefinitionMapper.queryByDefineName(project.getCode(), workflowName); // make sure workflow online - processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), - ReleaseState.ONLINE); + processDefinitionService.onlineWorkflowDefinition(user, project.getCode(), processDefinition.getCode()); executorService.execProcessInstance(user, project.getCode(), @@ -561,8 +560,7 @@ public class PythonGateway { // get workflow info if (processDefinition != null) { // make sure workflow online - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), - ReleaseState.ONLINE); + processDefinitionService.onlineWorkflowDefinition(user, projectCode, processDefinition.getCode()); result.put("id", processDefinition.getId()); result.put("name", processDefinition.getName()); result.put("code", processDefinition.getCode()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 6b393649e6..cf28e465ef 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -23,9 +23,7 @@ import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; -import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.User; @@ -207,7 +205,6 @@ public interface ProcessDefinitionService { * @param timeout timeout * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson - * @param otherParamsJson otherParamsJson handle other params * @return update result code */ Map updateProcessDefinition(User loginUser, @@ -220,7 +217,6 @@ public interface ProcessDefinitionService { int timeout, String taskRelationJson, String taskDefinitionJson, - String otherParamsJson, ProcessExecutionTypeEnum executionType); /** @@ -251,20 +247,6 @@ public interface ProcessDefinitionService { void deleteProcessDefinitionByCode(User loginUser, long workflowDefinitionCode); - /** - * release process definition: online / offline - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @param releaseState release state - * @return release result code - */ - Map releaseProcessDefinition(User loginUser, - long projectCode, - long code, - ReleaseState releaseState); - /** * batch export process definition by codes * @@ -414,53 +396,6 @@ public interface ProcessDefinitionService { long code, int version); - /** - * create empty process definition - * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param scheduleJson scheduleJson - * @return process definition code - */ - Map createEmptyProcessDefinition(User loginUser, - long projectCode, - String name, - String description, - String globalParams, - int timeout, - String scheduleJson, - ProcessExecutionTypeEnum executionType); - - /** - * update process definition basic info, not including task definition, task relation and location. - * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param scheduleJson scheduleJson - * @param otherParamsJson otherParamsJson handle other params - * @param executionType executionType - * @return update result code - */ - Map updateProcessDefinitionBasicInfo(User loginUser, - long projectCode, - String name, - long code, - String description, - String globalParams, - int timeout, - String scheduleJson, - String otherParamsJson, - ProcessExecutionTypeEnum executionType); - /** * update process definition basic info, not including task definition, task relation and location. * @@ -474,44 +409,14 @@ public interface ProcessDefinitionService { WorkflowUpdateRequest workflowUpdateRequest); /** - * release process definition and schedule - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @param releaseState releaseState - * @return update result code - */ - Map releaseWorkflowAndSchedule(User loginUser, - long projectCode, - long code, - ReleaseState releaseState); - - /** - * delete other relation - * @param project - * @param result - * @param processDefinition - */ - void deleteOtherRelation(Project project, Map result, ProcessDefinition processDefinition); - - /** - * save other relation - * @param loginUser - * @param processDefinition - * @param result - * @param otherParamsJson + * Online the workflow definition, it will check all sub workflow is online. */ - void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map result, - String otherParamsJson); + void onlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode); /** - * get Json String - * @param loginUser - * @param processDefinition - * @return Json String + * Offline the workflow definition. It will auto offline the scheduler. */ - String doOtherOperateProcess(User loginUser, ProcessDefinition processDefinition); + void offlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode); /** * view process variables diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java index 262ad2df5e..3d6ba3afd1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.User; @@ -125,19 +124,6 @@ public interface SchedulerService { Schedule getSchedule(User loginUser, Integer scheduleId); - /** - * set schedule online or offline - * - * @param loginUser login user - * @param projectCode project code - * @param id scheduler id - * @param scheduleStatus schedule status - */ - void setScheduleState(User loginUser, - long projectCode, - Integer id, - ReleaseState scheduleStatus); - /** * query schedule * @@ -173,15 +159,6 @@ public interface SchedulerService { */ Map queryScheduleList(User loginUser, long projectCode); - /** - * delete schedule - * - * @param projectId project id - * @param scheduleId schedule id - * @throws RuntimeException runtime exception - */ - void deleteSchedule(int projectId, int scheduleId); - /** * delete schedule by id * @@ -225,4 +202,24 @@ public interface SchedulerService { String workerGroup, String tenantCode, long environmentCode); + + /** + * Online the scheduler by scheduler id, if the related workflow definition is not online will throw exception. + */ + void onlineScheduler(User loginUser, Long projectCode, Integer schedulerId); + + /** + * Do online scheduler by workflow code, this method will not do permission check. + */ + void onlineSchedulerByWorkflowCode(Long workflowDefinitionCode); + + /** + * Offline the scheduler by scheduler id, will not offline the related workflow definition. + */ + void offlineScheduler(User loginUser, Long projectCode, Integer schedulerId); + + /** + * Do offline scheduler by workflow code, this method will not do permission check. + */ + void offlineSchedulerByWorkflowCode(Long workflowDefinitionCode); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 53530d6419..6396f1dcb9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -706,8 +706,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - List codes = new ArrayList<>(); - processService.recurseFindSubProcess(processDefinition.getCode(), codes); + List codes = processService.findAllSubWorkflowDefinitionCode(processDefinition.getCode()); if (!codes.isEmpty()) { List processDefinitionList = processDefinitionMapper.queryByCodes(codes); if (processDefinitionList != null) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index e6e36cc55b..294c5abe31 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -35,7 +35,6 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C import static org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX; import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST; import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS; import static org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX; import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS; @@ -44,7 +43,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL; import org.apache.dolphinscheduler.api.dto.DagDataSchedule; -import org.apache.dolphinscheduler.api.dto.ScheduleParam; import org.apache.dolphinscheduler.api.dto.treeview.Instance; import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest; @@ -66,14 +64,12 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ConditionType; -import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -119,12 +115,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager; -import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import java.io.BufferedOutputStream; @@ -310,7 +304,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro globalParams, locations, timeout, loginUser.getId()); processDefinition.setExecutionType(executionType); - result = createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson); + result = createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs); if (result.get(Constants.STATUS) == Status.SUCCESS) { listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(loginUser, processDefinition, taskDefinitionLogs, @@ -384,7 +378,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro protected Map createDagDefine(User loginUser, List taskRelationList, ProcessDefinition processDefinition, - List taskDefinitionLogs, String otherParamsJson) { + List taskDefinitionLogs) { Map result = new HashMap<>(); int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE); @@ -415,8 +409,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion); } - saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); - putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, processDefinition); return result; @@ -753,7 +745,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param timeout timeout * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson - * @param otherParamsJson otherParamsJson handle other params * @return update result code */ @Override @@ -768,7 +759,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int timeout, String taskRelationJson, String taskDefinitionJson, - String otherParamsJson, ProcessExecutionTypeEnum executionType) { Project project = projectMapper.queryByCode(projectCode); // check if user have write perm for project @@ -815,7 +805,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.set(projectCode, name, description, globalParams, locations, timeout); processDefinition.setExecutionType(executionType); result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, - taskDefinitionLogs, otherParamsJson); + taskDefinitionLogs); if (result.get(Constants.STATUS) == Status.SUCCESS) { listenerEventAlertManager.publishProcessDefinitionUpdatedListenerEvent(loginUser, processDefinition, taskDefinitionLogs, @@ -834,8 +824,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro */ private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition, List taskRelationList) { - List oldProcessTaskRelationList = processTaskRelationMapper - .queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()); + List oldProcessTaskRelationList = + processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()); Set oldProcessTaskRelationSet = oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet()); StringBuilder sb = new StringBuilder(); @@ -859,8 +849,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro List taskRelationList, ProcessDefinition processDefinition, ProcessDefinition processDefinitionDeepCopy, - List taskDefinitionLogs, - String otherParamsJson) { + List taskDefinitionLogs) { Map result = new HashMap<>(); int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs, Boolean.TRUE); @@ -924,7 +913,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } - saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); } else { log.info( "Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.", @@ -1079,7 +1067,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro taskDefinitionLogService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode()); // delete workflow definition log processDefinitionLogDao.deleteByWorkflowDefinitionCode(processDefinition.getCode()); - deleteOtherRelation(project, new HashMap<>(), processDefinition); // we delete the workflow definition at last to avoid using transaction here. // If delete error, we can call this interface again. @@ -1089,87 +1076,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro listenerEventAlertManager.publishProcessDefinitionDeletedListenerEvent(loginUser, project, processDefinition); } - /** - * release process definition: online / offline - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @param releaseState release state - * @return release result code - */ - @Override - @Transactional - public Map releaseProcessDefinition(User loginUser, long projectCode, long code, - ReleaseState releaseState) { - Project project = projectMapper.queryByCode(projectCode); - // check user access for project - Map result = - projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - - // check state - if (null == releaseState) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); - return result; - } - - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { - log.error("Process definition does not exist, processDefinitionCode:{}.", code); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); - return result; - } - switch (releaseState) { - case ONLINE: - List relationList = - processService.findRelationByCode(code, processDefinition.getVersion()); - if (CollectionUtils.isEmpty(relationList)) { - log.warn("Process definition has no task relation, processDefinitionCode:{}.", code); - putMsg(result, Status.PROCESS_DAG_IS_EMPTY); - return result; - } - processDefinition.setReleaseState(releaseState); - processDefinitionMapper.updateById(processDefinition); - log.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode, - code); - break; - case OFFLINE: - processDefinition.setReleaseState(releaseState); - int updateProcess = processDefinitionMapper.updateById(processDefinition); - Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code); - if (updateProcess > 0) { - log.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.", - projectCode, code); - if (schedule != null) { - // set status - schedule.setReleaseState(releaseState); - int updateSchedule = scheduleMapper.updateById(schedule); - if (updateSchedule == 0) { - log.error( - "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", - projectCode, code, schedule.getId()); - putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); - throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); - } else { - log.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", - projectCode, code, schedule.getId()); - } - schedulerService.deleteSchedule(project.getId(), schedule.getId()); - } - } - break; - default: - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); - return result; - } - - putMsg(result, Status.SUCCESS); - return result; - } - /** * batch export process definition by codes */ @@ -1282,7 +1188,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } for (DagDataSchedule dagDataSchedule : dagDataScheduleList) { - if (!checkAndImport(loginUser, projectCode, result, dagDataSchedule, EMPTY_STRING)) { + if (!checkAndImport(loginUser, projectCode, result, dagDataSchedule)) { return result; } } @@ -1441,7 +1347,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } - return createDagDefine(loginUser, processTaskRelationList, processDefinition, taskDefinitionList, EMPTY_STRING); + return createDagDefine(loginUser, processTaskRelationList, processDefinition, taskDefinitionList); } private ProcessTaskRelationLog buildNormalTaskRelation(long preTaskCode, long postTaskCode) { @@ -1499,8 +1405,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * check and import */ - protected boolean checkAndImport(User loginUser, long projectCode, Map result, - DagDataSchedule dagDataSchedule, String otherParamsJson) { + protected boolean checkAndImport(User loginUser, + long projectCode, + Map result, + DagDataSchedule dagDataSchedule) { if (!checkImportanceParams(dagDataSchedule, result)) { return false; } @@ -1598,8 +1506,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } processDefinition.setCreateTime(new Date()); processDefinition.setUpdateTime(new Date()); - Map createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition, - Lists.newArrayList(), otherParamsJson); + Map createDagResult = + createDagDefine(loginUser, taskRelationLogList, processDefinition, Lists.newArrayList()); if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) { putMsg(createDagResult, Status.SUCCESS); } else { @@ -2151,12 +2059,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro diffCode.forEach(code -> failedProcessList.add(code + "[null]")); for (ProcessDefinition processDefinition : processDefinitionList) { List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), - processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()); List taskRelationList = processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); processDefinition.setProjectCode(targetProjectCode); - String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition); if (isCopy) { log.info("Copy process definition..."); List taskDefinitionLogs = @@ -2226,8 +2132,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } } try { - result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, - otherParamsJson)); + result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs)); } catch (Exception e) { log.error("Copy process definition error, processDefinitionCode from {} to {}.", oldProcessDefinitionCode, processDefinition.getCode(), e); @@ -2238,7 +2143,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro log.info("Move process definition..."); try { result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null, - Lists.newArrayList(), otherParamsJson)); + Lists.newArrayList())); } catch (Exception e) { log.error("Move process definition error, processDefinitionCode:{}.", processDefinition.getCode(), e); @@ -2438,7 +2343,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); } - deleteOtherRelation(project, result, processDefinition); log.info( "Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.", projectCode, code, version); @@ -2447,226 +2351,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - /** - * create empty process definition - * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param scheduleJson scheduleJson - * @return process definition code - */ - @Override - @Transactional - public Map createEmptyProcessDefinition(User loginUser, - long projectCode, - String name, - String description, - String globalParams, - int timeout, - String scheduleJson, - ProcessExecutionTypeEnum executionType) { - Project project = projectMapper.queryByCode(projectCode); - // check if user have write perm for project - Map result = new HashMap<>(); - boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); - if (!hasProjectAndWritePerm) { - return result; - } - if (checkDescriptionLength(description)) { - log.warn("Parameter description is too long."); - putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); - return result; - } - // check whether the new process define name exist - ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); - if (definition != null) { - log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.", - definition.getName(), definition.getCode()); - putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); - return result; - } - - long processDefinitionCode; - try { - processDefinitionCode = CodeGenerateUtils.getInstance().genCode(); - } catch (CodeGenerateException e) { - log.error("Generate process definition code error, projectCode:{}.", projectCode, e); - putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); - return result; - } - ProcessDefinition processDefinition = - new ProcessDefinition(projectCode, name, processDefinitionCode, description, - globalParams, "", timeout, loginUser.getId()); - processDefinition.setExecutionType(executionType); - result = createEmptyDagDefine(loginUser, processDefinition); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - log.error("Create empty process definition error."); - return result; - } - - if (StringUtils.isBlank(scheduleJson)) { - return result; - } - - // save dag schedule - Map scheduleResult = createDagSchedule(loginUser, processDefinition, scheduleJson); - if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) { - Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS); - putMsg(result, scheduleResultStatus); - throw new ServiceException(scheduleResultStatus); - } - return result; - } - - protected Map createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) { - Map result = new HashMap<>(); - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); - if (insertVersion == 0) { - log.error("Save process definition error, processDefinitionCode:{}.", processDefinition.getCode()); - putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); - throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); - } - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - return result; - } - - protected Map createDagSchedule(User loginUser, ProcessDefinition processDefinition, - String scheduleJson) { - Map result = new HashMap<>(); - Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class); - if (scheduleObj == null) { - putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson); - throw new ServiceException(Status.DATA_IS_NOT_VALID); - } - Date now = new Date(); - scheduleObj.setProcessDefinitionCode(processDefinition.getCode()); - if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) { - log.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.", - processDefinition.getCode()); - putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME); - return result; - } - if (!CronUtils.isValidExpression(scheduleObj.getCrontab())) { - log.error("CronExpression verify failure, cron:{}.", scheduleObj.getCrontab()); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleObj.getCrontab()); - return result; - } - scheduleObj - .setWarningType(scheduleObj.getWarningType() == null ? WarningType.NONE : scheduleObj.getWarningType()); - scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1 : scheduleObj.getWarningGroupId()); - scheduleObj.setFailureStrategy( - scheduleObj.getFailureStrategy() == null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy()); - scheduleObj.setCreateTime(now); - scheduleObj.setUpdateTime(now); - scheduleObj.setUserId(loginUser.getId()); - scheduleObj.setReleaseState(ReleaseState.OFFLINE); - scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() == null ? Priority.MEDIUM - : scheduleObj.getProcessInstancePriority()); - scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ? "default" : scheduleObj.getWorkerGroup()); - scheduleObj - .setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode()); - scheduleMapper.insert(scheduleObj); - - putMsg(result, Status.SUCCESS); - result.put("scheduleId", scheduleObj.getId()); - return result; - } - - /** - * update process definition basic info - * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams globalParams - * @param timeout timeout - * @param scheduleJson scheduleJson - * @param otherParamsJson otherParamsJson handle other params - * @param executionType executionType - * @return update result code - */ - @Override - @Transactional - public Map updateProcessDefinitionBasicInfo(User loginUser, - long projectCode, - String name, - long code, - String description, - String globalParams, - int timeout, - String scheduleJson, - String otherParamsJson, - ProcessExecutionTypeEnum executionType) { - Project project = projectMapper.queryByCode(projectCode); - // check if user have write perm for project - Map result = new HashMap<>(); - boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); - if (!hasProjectAndWritePerm) { - return result; - } - if (checkDescriptionLength(description)) { - log.warn("Parameter description is too long."); - putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); - return result; - } - - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - // check process definition exists - if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { - log.error("Process definition does not exist, code:{}.", code); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); - return result; - } - if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - // online can not permit edit - log.warn("Process definition is not allowed to be modified due to {}, processDefinitionCode:{}.", - ReleaseState.ONLINE.getDescp(), processDefinition.getCode()); - putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); - return result; - } - if (!name.equals(processDefinition.getName())) { - // check whether the new process define name exist - ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); - if (definition != null) { - log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.", - definition.getName(), definition.getCode()); - putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); - return result; - } - } - ProcessDefinition processDefinitionDeepCopy = - JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); - processDefinition.set(projectCode, name, description, globalParams, "", timeout); - processDefinition.setExecutionType(executionType); - List taskRelationList = processTaskRelationLogMapper - .queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion()); - result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, - Lists.newArrayList(), otherParamsJson); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - log.error("Update process definition basic info error."); - return result; - } - - if (StringUtils.isBlank(scheduleJson)) { - return result; - } - // update dag schedule - Map scheduleResult = updateDagSchedule(loginUser, projectCode, code, scheduleJson); - if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) { - Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS); - putMsg(result, scheduleResultStatus); - throw new ServiceException(scheduleResultStatus); - } - return result; - } - private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition, ProcessDefinition newProcessDefinition) { // online can not permit edit @@ -2758,7 +2442,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro long projectCode = processDefinition.getProjectCode(); long processDefinitionCode = processDefinition.getCode(); List taskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); List taskRelationList = taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); @@ -2820,147 +2504,42 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; } - protected Map updateDagSchedule(User loginUser, - long projectCode, - long processDefinitionCode, - String scheduleJson) { - Map result = new HashMap<>(); - Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class); - if (schedule == null) { - putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson); - throw new ServiceException(Status.DATA_IS_NOT_VALID); - } - // set default value - FailureStrategy failureStrategy = - ObjectUtils.defaultIfNull(schedule.getFailureStrategy(), FailureStrategy.CONTINUE); - WarningType warningType = ObjectUtils.defaultIfNull(schedule.getWarningType(), WarningType.NONE); - Priority processInstancePriority = - ObjectUtils.defaultIfNull(schedule.getProcessInstancePriority(), Priority.MEDIUM); - int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId(); - String workerGroup = StringUtils.defaultIfBlank(schedule.getWorkerGroup(), DEFAULT_WORKER_GROUP); - String tenantCode = StringUtils.defaultIfBlank(schedule.getTenantCode(), Constants.DEFAULT); - long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); - - ScheduleParam param = new ScheduleParam(); - param.setStartTime(schedule.getStartTime()); - param.setEndTime(schedule.getEndTime()); - param.setCrontab(schedule.getCrontab()); - param.setTimezoneId(schedule.getTimezoneId()); - - return schedulerService.updateScheduleByProcessDefinitionCode( - loginUser, - projectCode, - processDefinitionCode, - JSONUtils.toJsonString(param), - warningType, - warningGroupId, - failureStrategy, - processInstancePriority, - workerGroup, - tenantCode, - environmentCode); - } - - /** - * release process definition and schedule - * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code - * @param releaseState releaseState - * @return update result code - */ @Transactional @Override - public Map releaseWorkflowAndSchedule(User loginUser, long projectCode, long code, - ReleaseState releaseState) { - Project project = projectMapper.queryByCode(projectCode); - // check user access for project - Map result = - projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - // check state - if (null == releaseState) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); - return result; - } + public void onlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode) { + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE); - ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { - log.error("Process definition does not exist, code:{}.", code); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code)); - return result; - } - Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code); - if (scheduleObj == null) { - log.error("Schedule cron does not exist, processDefinitionCode:{}.", code); - putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, "processDefinitionCode:" + code); - return result; - } - switch (releaseState) { - case ONLINE: - List relationList = - processService.findRelationByCode(code, processDefinition.getVersion()); - if (CollectionUtils.isEmpty(relationList)) { - log.warn("Process definition has no task relation, processDefinitionCode:{}.", code); - putMsg(result, Status.PROCESS_DAG_IS_EMPTY); - return result; - } - processDefinition.setReleaseState(releaseState); - processDefinitionMapper.updateById(processDefinition); - schedulerService.setScheduleState(loginUser, projectCode, scheduleObj.getId(), ReleaseState.ONLINE); - break; - case OFFLINE: - processDefinition.setReleaseState(releaseState); - int updateProcess = processDefinitionMapper.updateById(processDefinition); - if (updateProcess > 0) { - log.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}.", - projectCode, code, scheduleObj.getId()); - // set status - scheduleObj.setReleaseState(ReleaseState.OFFLINE); - int updateSchedule = scheduleMapper.updateById(scheduleObj); - if (updateSchedule == 0) { - log.error( - "Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}", - projectCode, code, scheduleObj.getId()); - putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); - throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR); - } - schedulerService.deleteSchedule(project.getId(), scheduleObj.getId()); - } - break; - default: - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); - return result; + ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode) + .orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode)); + + if (ReleaseState.ONLINE.equals(workflowDefinition.getReleaseState())) { + // do nothing if the workflow is already online + return; } - putMsg(result, Status.SUCCESS); - return result; - } - /** - * save other relation - * @param loginUser - * @param processDefinition - * @param result - * @param otherParamsJson - */ - @Override - public void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map result, - String otherParamsJson) { + checkWorkflowDefinitionIsValidated(workflowDefinition.getCode()); + checkAllSubWorkflowDefinitionIsOnline(workflowDefinition.getCode()); + workflowDefinition.setReleaseState(ReleaseState.ONLINE); + processDefinitionDao.updateById(workflowDefinition); } - /** - * get Json String - * @param loginUser - * @param processDefinition - * @return Json String - */ + @Transactional @Override - public String doOtherOperateProcess(User loginUser, ProcessDefinition processDefinition) { - return null; + public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode) { + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE); + + ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode) + .orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode)); + + if (ReleaseState.OFFLINE.equals(workflowDefinition.getReleaseState())) { + // do nothing if the workflow is already offline + return; + } + workflowDefinition.setReleaseState(ReleaseState.OFFLINE); + processDefinitionDao.updateById(workflowDefinition); + + schedulerService.offlineSchedulerByWorkflowCode(workflowDefinitionCode); } /** @@ -3019,7 +2598,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Set taskCodeSet = new TreeSet<>(); - processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode()) + processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()) .forEach(processTaskRelation -> { if (processTaskRelation.getPreTaskCode() > 0) { taskCodeSet.add(processTaskRelation.getPreTaskCode()); @@ -3046,15 +2625,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return localUserDefParams; } - /** - * delete other relation - * @param project - * @param result - * @param processDefinition - */ - @Override - public void deleteOtherRelation(Project project, Map result, ProcessDefinition processDefinition) { - + private void checkWorkflowDefinitionIsValidated(Long workflowDefinitionCode) { + // todo: build dag check if the dag is validated + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(workflowDefinitionCode); + if (CollectionUtils.isEmpty(processTaskRelations)) { + throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY); + } + // todo : check Workflow is validate } + private void checkAllSubWorkflowDefinitionIsOnline(Long workflowDefinitionCode) { + List allSubWorkflowDefinitionCodes = + processService.findAllSubWorkflowDefinitionCode(workflowDefinitionCode); + if (CollectionUtils.isEmpty(allSubWorkflowDefinitionCodes)) { + return; + } + for (Long subWorkflowDefinitionCode : allSubWorkflowDefinitionCodes) { + ProcessDefinition subWorkflowDefinition = processDefinitionDao.queryByCode(subWorkflowDefinitionCode) + .orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode)); + if (!ReleaseState.ONLINE.equals(subWorkflowDefinition.getReleaseState())) { + throw new ServiceException( + "SubWorkflowDefinition " + subWorkflowDefinition.getName() + " is not online"); + } + } + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java index 2f1f0ba4c0..d8c568e50a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java @@ -137,7 +137,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P } updateProcessDefiniteVersion(loginUser, result, processDefinition); List processTaskRelationList = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); List processTaskRelations = Lists.newArrayList(processTaskRelationList); if (!processTaskRelations.isEmpty()) { Map preTaskCodeMap = @@ -327,7 +327,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); List processTaskRelationList = Lists.newArrayList(processTaskRelations); if (CollectionUtils.isEmpty(processTaskRelationList)) { log.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, @@ -514,7 +514,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P long projectCode = processDefinition.getProjectCode(); long processDefinitionCode = processDefinition.getCode(); List taskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); List taskRelationList = taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); @@ -675,7 +675,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()); List processTaskRelationList = Lists.newArrayList(processTaskRelations); List processTaskRelationWaitRemove = Lists.newArrayList(); for (ProcessTaskRelation processTaskRelation : processTaskRelationList) { @@ -746,7 +746,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); + processTaskRelationMapper.queryByProcessCode(processDefinition.getCode()); List processTaskRelationList = Lists.newArrayList(processTaskRelations); processTaskRelationList .removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode()) @@ -858,7 +858,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P return result; } List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); List processTaskRelationList = Lists.newArrayList(processTaskRelations); if (CollectionUtils.isEmpty(processTaskRelationList)) { log.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 6cf9fe2307..ead8aa35b6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE; import org.apache.dolphinscheduler.api.dto.ScheduleParam; import org.apache.dolphinscheduler.api.dto.schedule.ScheduleCreateRequest; @@ -26,7 +27,6 @@ import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ExecutorService; -import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.utils.PageInfo; @@ -38,26 +38,22 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.service.cron.CronUtils; import org.apache.dolphinscheduler.service.exceptions.CronParseException; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -95,12 +91,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe @Autowired private ExecutorService executorService; - @Autowired - private MonitorService monitorService; - - @Autowired - private ProcessService processService; - @Autowired private ScheduleMapper scheduleMapper; @@ -113,9 +103,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe @Autowired private SchedulerApi schedulerApi; - @Autowired - private ProcessTaskRelationMapper processTaskRelationMapper; - @Autowired private EnvironmentMapper environmentMapper; @@ -436,123 +423,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return schedule; } - /** - * set schedule online or offline - * - * @param loginUser login user - * @param projectCode project code - * @param id scheduler id - * @param scheduleStatus schedule status - * @return publish result code - */ - @Override - @Transactional - public void setScheduleState(User loginUser, - long projectCode, - Integer id, - ReleaseState scheduleStatus) { - Project project = projectMapper.queryByCode(projectCode); - // check project auth - projectService.checkProjectAndAuthThrowException(loginUser, project, null); - - // check schedule exists - Schedule scheduleObj = scheduleMapper.selectById(id); - - if (scheduleObj == null) { - log.error("Schedule does not exist, scheduleId:{}.", id); - throw new ServiceException(Status.SCHEDULE_CRON_NOT_EXISTS, id); - } - // check schedule release state - if (scheduleObj.getReleaseState() == scheduleStatus) { - log.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.", - scheduleObj.getReleaseState().getDescp(), scheduleObj.getId()); - throw new ServiceException(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); - } - ProcessDefinition processDefinition = - processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode()); - if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { - log.error("Process definition does not exist, processDefinitionCode:{}.", - scheduleObj.getProcessDefinitionCode()); - throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, - String.valueOf(scheduleObj.getProcessDefinitionCode())); - } - List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode()); - if (processTaskRelations.isEmpty()) { - log.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode, - processDefinition.getCode()); - throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY); - } - if (scheduleStatus == ReleaseState.ONLINE) { - // check process definition release state - if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { - log.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.", - ReleaseState.ONLINE.getDescp(), processDefinition.getCode()); - throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); - } - // check sub process definition release state - List subProcessDefineCodes = new ArrayList<>(); - processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes); - if (!subProcessDefineCodes.isEmpty()) { - log.info( - "Need to check sub process definition state before change schedule state, subProcessDefineCodes:{}.", - org.apache.commons.lang.StringUtils.join(subProcessDefineCodes, ",")); - List subProcessDefinitionList = - processDefinitionMapper.queryByCodes(subProcessDefineCodes); - if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { - for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { - /** - * if there is no online process, exit directly - */ - if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { - log.warn( - "Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.", - ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode()); - throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, - String.valueOf(subProcessDefinition.getId())); - } - } - } - } - } - - // check master server exists - List masterServers = monitorService.getServerListFromRegistry(true); - - if (masterServers.isEmpty()) { - log.error("Master does not exist."); - throw new ServiceException(Status.MASTER_NOT_EXISTS); - } - - // set status - scheduleObj.setReleaseState(scheduleStatus); - - scheduleMapper.updateById(scheduleObj); - - try { - switch (scheduleStatus) { - case ONLINE: - log.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", - project.getId(), processDefinition.getId(), masterServers); - setSchedule(project.getId(), scheduleObj); - break; - case OFFLINE: - log.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", - project.getId(), processDefinition.getId(), masterServers); - deleteSchedule(project.getId(), id); - break; - default: - throw new ServiceException(Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString()); - } - } catch (Exception e) { - log.error("Set schedule state to {} error, projectCode:{}, scheduleId:{}.", scheduleStatus.getDescp(), - projectCode, scheduleObj.getId()); - Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR - : Status.OFFLINE_SCHEDULE_ERROR; - throw new ServiceException(status, e); - } - } - /** * query schedule * @@ -669,25 +539,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } - public void setSchedule(int projectId, Schedule schedule) { - log.info("Set schedule state {}, project id: {}, scheduleId: {}", schedule.getReleaseState().getDescp(), - projectId, schedule.getId()); - schedulerApi.insertOrUpdateScheduleTask(projectId, schedule); - } - - /** - * delete schedule - * - * @param projectId project id - * @param scheduleId schedule id - * @throws RuntimeException runtime exception - */ - @Override - public void deleteSchedule(int projectId, int scheduleId) { - log.info("Delete schedule of project, projectId:{}, scheduleId:{}", projectId, scheduleId); - schedulerApi.deleteScheduleTask(projectId, scheduleId); - } - /** * check valid * @@ -823,6 +674,71 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } + @Transactional + @Override + public void onlineScheduler(User loginUser, Long projectCode, Integer schedulerId) { + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE); + Schedule schedule = scheduleMapper.selectById(schedulerId); + doOnlineScheduler(schedule); + } + + @Transactional + @Override + public void onlineSchedulerByWorkflowCode(Long workflowDefinitionCode) { + Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(workflowDefinitionCode); + doOnlineScheduler(schedule); + } + + private void doOnlineScheduler(Schedule schedule) { + if (schedule == null) { + return; + } + if (ReleaseState.ONLINE.equals(schedule.getReleaseState())) { + log.debug("The schedule is already online, scheduleId:{}.", schedule.getId()); + return; + } + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()); + if (!ReleaseState.ONLINE.equals(processDefinition.getReleaseState())) { + throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); + } + + schedule.setReleaseState(ReleaseState.ONLINE); + scheduleMapper.updateById(schedule); + + Project project = projectMapper.queryByCode(processDefinition.getProjectCode()); + schedulerApi.insertOrUpdateScheduleTask(project.getId(), schedule); + } + + @Transactional + @Override + public void offlineScheduler(User loginUser, Long projectCode, Integer schedulerId) { + projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE); + Schedule schedule = scheduleMapper.selectById(schedulerId); + doOfflineScheduler(schedule); + } + + @Transactional + @Override + public void offlineSchedulerByWorkflowCode(Long workflowDefinitionCode) { + Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(workflowDefinitionCode); + doOfflineScheduler(schedule); + } + + private void doOfflineScheduler(Schedule schedule) { + if (schedule == null) { + return; + } + if (ReleaseState.OFFLINE.equals(schedule.getReleaseState())) { + log.debug("The schedule is already offline, scheduleId:{}.", schedule.getId()); + return; + } + schedule.setReleaseState(ReleaseState.OFFLINE); + scheduleMapper.updateById(schedule); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()); + Project project = projectMapper.queryByCode(processDefinition.getProjectCode()); + schedulerApi.deleteScheduleTask(project.getId(), schedule.getId()); + } + private void updateSchedule(Map result, Schedule schedule, ProcessDefinition processDefinition, String scheduleExpression, WarningType warningType, int warningGroupId, FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 303e3e62b0..ba4f584c01 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -341,7 +341,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode()); } List processTaskRelationLogList = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode) + processTaskRelationMapper.queryByProcessCode(processDefinitionCode) .stream() .map(ProcessTaskRelationLog::new) .collect(Collectors.toList()); @@ -504,8 +504,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe "Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.", taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper - .queryByProcessCode(taskDefinition.getProjectCode(), processDefinitionCode); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); List relationList = processTaskRelations.stream() .filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList()); updateDag(loginUser, processDefinitionCode, relationList, Lists.newArrayList()); @@ -571,7 +571,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionToUpdate)); } @@ -645,8 +645,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe "Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.", taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); - List processTaskRelations = processTaskRelationMapper - .queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode); + List processTaskRelations = + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog)); } this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(), @@ -894,7 +894,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) { ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0); List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode()); + processTaskRelationMapper.queryByProcessCode(taskRelation.getProcessDefinitionCode()); // set upstream code list updateUpstreamTask(new HashSet<>(queryUpStreamTaskCodeMap.keySet()), @@ -1064,7 +1064,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe taskCode); long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode(); List processTaskRelations = - processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + processTaskRelationMapper.queryByProcessCode(processDefinitionCode); updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionUpdate)); } else { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 889c1388fb..e9538667d0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -166,12 +166,12 @@ public class ProcessDefinitionControllerTest { Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description, globalParams, - locations, timeout, relationJson, taskDefinitionJson, "", + locations, timeout, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result); Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code, description, globalParams, - locations, timeout, relationJson, taskDefinitionJson, "", ProcessExecutionTypeEnum.PARALLEL, + locations, timeout, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL, ReleaseState.OFFLINE); Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } @@ -179,13 +179,13 @@ public class ProcessDefinitionControllerTest { @Test public void testReleaseProcessDefinition() { long projectCode = 1L; - int id = 1; + long id = 1L; Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); - Mockito.when(processDefinitionService.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE)) - .thenReturn(result); - Result response = + Mockito.doNothing().when(processDefinitionService) + .offlineWorkflowDefinition(user, projectCode, id); + Result response = processDefinitionController.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE); Assertions.assertTrue(response != null && response.isSuccess()); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java index 9190d45274..39dd645247 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.controller; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; @@ -31,7 +32,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Resource; @@ -122,9 +122,7 @@ public class SchedulerControllerTest extends AbstractControllerTest { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("id", "37"); - Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class), - isA(Integer.class), - isA(ReleaseState.class)); + Mockito.doNothing().when(schedulerService).onlineScheduler(any(), any(), any()); MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online", 123, 37) .header(SESSION_ID, sessionId) @@ -143,9 +141,7 @@ public class SchedulerControllerTest extends AbstractControllerTest { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("id", "28"); - Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class), - isA(Integer.class), - isA(ReleaseState.class)); + Mockito.doNothing().when(schedulerService).offlineScheduler(any(), any(), any()); MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline", 123, 28) .header(SESSION_ID, sessionId) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java index d7a65115de..23085cb3f5 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java @@ -563,9 +563,8 @@ public class ExecuteFunctionServiceTest { @Test public void testStartCheckByProcessDefinedCode() { - List ids = new ArrayList<>(); - ids.add(1L); - Mockito.doNothing().when(processService).recurseFindSubProcess(1, ids); + List ids = Lists.newArrayList(1L); + when(processService.findAllSubWorkflowDefinitionCode(1)).thenReturn(ids); List processDefinitionList = new ArrayList<>(); ProcessDefinition processDefinition = new ProcessDefinition(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 78de18c8e7..ed3a5f639b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -23,7 +23,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT; -import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE; import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; @@ -467,7 +466,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { } when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); when(processService.saveProcessDefine(user, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2); - when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)) + when(processTaskRelationMapper.queryByProcessCode(processDefinitionCode)) .thenReturn(getProcessTaskRelation()); putMsg(result, Status.SUCCESS); @@ -610,46 +609,6 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Mockito.verify(metricsCleanUpService, times(2)).cleanUpWorkflowMetricsByDefinitionCode(11L); } - @Test - public void testReleaseProcessDefinition() { - when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); - - Project project = getProject(projectCode); - - // project check auth fail - Map result = new HashMap<>(); - putMsg(result, Status.PROJECT_NOT_FOUND, projectCode); - when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_ONLINE_OFFLINE)) - .thenReturn(result); - Map map = processDefinitionService.releaseProcessDefinition(user, projectCode, - processDefinitionCode, ReleaseState.OFFLINE); - Assertions.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS)); - - // project check auth success, processes definition online - putMsg(result, Status.SUCCESS, projectCode); - when(processDefinitionMapper.queryByCode(46L)).thenReturn(getProcessDefinition()); - List processTaskRelationList = new ArrayList<>(); - ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); - processTaskRelation.setProjectCode(projectCode); - processTaskRelation.setProcessDefinitionCode(46L); - processTaskRelation.setPostTaskCode(123L); - processTaskRelationList.add(processTaskRelation); - when(processService.findRelationByCode(46L, 1)).thenReturn(processTaskRelationList); - Map onlineRes = - processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.ONLINE); - Assertions.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); - - // project check auth success, processes definition online - Map onlineWithResourceRes = - processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.ONLINE); - Assertions.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS)); - - // release error code - Map failRes = - processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.getEnum(2)); - Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS)); - } - @Test public void testVerifyProcessDefinitionName() { when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); @@ -824,7 +783,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { try { processDefinitionService.updateProcessDefinition(user, projectCode, "test", 1, - "", "", "", 0, null, "", null, ProcessExecutionTypeEnum.PARALLEL); + "", "", "", 0, null, "", ProcessExecutionTypeEnum.PARALLEL); Assertions.fail(); } catch (ServiceException ex) { Assertions.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ex.getCode()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java index 474d619cc2..8281c4b547 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java @@ -456,7 +456,7 @@ public class ProcessTaskRelationServiceTest { Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode)) .thenReturn(processTaskRelationList); Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition()); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.queryByProcessCode(1L)).thenReturn(processTaskRelationList); List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); Mockito.when(processService.saveTaskRelation(user, 1L, 1L, @@ -494,7 +494,7 @@ public class ProcessTaskRelationServiceTest { processTaskRelation.setPostTaskCode(taskCode); processTaskRelation.setPostTaskVersion(1); processTaskRelationList.add(processTaskRelation); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)) + Mockito.when(processTaskRelationMapper.queryByProcessCode(processDefinitionCode)) .thenReturn(processTaskRelationList); List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); @@ -527,7 +527,7 @@ public class ProcessTaskRelationServiceTest { List processTaskRelationList = new ArrayList<>(); processTaskRelationList.add(processTaskRelation); Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.queryByProcessCode(1L)).thenReturn(processTaskRelationList); List relationLogs = processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()); Mockito.when(processService.saveTaskRelation(user, 1L, 1L, diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java index 85d3d90fba..202eb6f365 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java @@ -28,10 +28,8 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; -import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.User; @@ -44,9 +42,6 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.service.process.ProcessService; -import java.util.ArrayList; -import java.util.List; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -123,64 +118,6 @@ public class SchedulerServiceTest extends BaseServiceTestTool { user.setId(userId); } - @Test - public void testSetScheduleState() { - Project project = getProject(); - - ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setProjectCode(projectCode); - - Schedule schedule = new Schedule(); - schedule.setId(1); - schedule.setProcessDefinitionCode(1); - schedule.setReleaseState(ReleaseState.OFFLINE); - - Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule); - Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition); - Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project); - - // schedule not exists - exception = Assertions.assertThrows(ServiceException.class, () -> { - schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE); - }); - Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode()); - - // SCHEDULE_CRON_RELEASE_NEED_NOT_CHANGE - exception = Assertions.assertThrows(ServiceException.class, () -> { - schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE); - }); - Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE.getCode(), - ((ServiceException) exception).getCode()); - - // PROCESS_DEFINE_NOT_EXIST - schedule.setProcessDefinitionCode(2); - exception = Assertions.assertThrows(ServiceException.class, () -> { - schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE); - }); - Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode()); - schedule.setProcessDefinitionCode(1); - - // online also success - ProcessTaskRelation processTaskRelation = new ProcessTaskRelation(); - List processTaskRelationList = new ArrayList<>(); - processTaskRelationList.add(processTaskRelation); - Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1)).thenReturn(processTaskRelationList); - exception = Assertions.assertThrows(ServiceException.class, () -> { - schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE); - }); - Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE.getCode(), ((ServiceException) exception).getCode()); - - // SUCCESS - Server server = new Server(); - List serverList = new ArrayList<>(); - serverList.add(server); - Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(serverList); - processDefinition.setReleaseState(ReleaseState.ONLINE); - Assertions.assertDoesNotThrow(() -> { - schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE); - }); - } - @Test public void testCreateSchedulesV2() { Project project = this.getProject(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 2644712074..e77eab8029 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -563,8 +563,8 @@ public class TaskDefinitionServiceImplTest { // saveTaskRelation List processTaskRelationLogList = getProcessTaskRelationLogList(); - Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getProjectCode()), - eq(processDefinition.getCode()))).thenReturn(processTaskRelationList); + Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getCode()))) + .thenReturn(processTaskRelationList); Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1); Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1); int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(), diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 07ff5c2d86..d8e9360f2e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -41,13 +41,11 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryByProcessCode(@Param("projectCode") long projectCode, - @Param("processCode") long processCode); + List queryByProcessCode(@Param("processCode") long processCode); /** * update diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 742d53726e..eed2246c28 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -26,8 +26,7 @@ select from t_ds_process_task_relation - WHERE project_code = #{projectCode} - and process_definition_code = #{processCode} + WHERE process_definition_code = #{processCode}