Browse Source

[Improvement] Clean up Scheduler logic (#15198)

* Clean up Scheduler logic

* Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

Co-authored-by: 旺阳 <wang@lqwang.net>

---------

Co-authored-by: 旺阳 <wang@lqwang.net>
3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
9c7ad6df0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 150
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 37
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java
  3. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  4. 103
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  5. 43
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
  6. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  7. 545
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  8. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java
  9. 216
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  10. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  11. 12
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  12. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java
  13. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
  14. 45
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  15. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java
  16. 63
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
  17. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  18. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  19. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  20. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapperTest.java
  21. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  22. 62
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  23. 110
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

150
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -242,13 +242,12 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "taskRelationJson", required = true) String taskRelationJson,
@RequestParam(value = "taskDefinitionJson", required = true) String taskDefinitionJson,
@RequestParam(value = "otherParamsJson", required = false) String otherParamsJson,
@RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType,
@RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) {
Map<String, Object> result = processDefinitionService.updateProcessDefinition(loginUser, projectCode, name,
code, description, globalParams,
locations, timeout, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType);
locations, timeout, taskRelationJson, taskDefinitionJson, executionType);
// If the update fails, the result will be returned directly
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataList(result);
@ -256,7 +255,7 @@ public class ProcessDefinitionController extends BaseController {
// Judge whether to go online after editing,0 means offline, 1 means online
if (releaseState == ReleaseState.ONLINE) {
result = processDefinitionService.releaseProcessDefinition(loginUser, projectCode, code, releaseState);
processDefinitionService.onlineWorkflowDefinition(loginUser, projectCode, code);
}
return returnDataList(result);
}
@ -343,15 +342,6 @@ public class ProcessDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* release process definition
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@Operation(summary = "release", description = "RELEASE_PROCESS_DEFINITION_NOTES")
@Parameters({
@Parameter(name = "name", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = String.class)),
@ -361,13 +351,22 @@ public class ProcessDefinitionController extends BaseController {
@PostMapping(value = "/{code}/release")
@ResponseStatus(HttpStatus.OK)
@ApiException(RELEASE_PROCESS_DEFINITION_ERROR)
public Result releaseProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code", required = true) long code,
@RequestParam(value = "releaseState", required = true) ReleaseState releaseState) {
Map<String, Object> result =
processDefinitionService.releaseProcessDefinition(loginUser, projectCode, code, releaseState);
return returnDataList(result);
public Result<Boolean> releaseProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code", required = true) long workflowDefinitionCode,
@RequestParam(value = "releaseState", required = true) ReleaseState releaseState) {
switch (releaseState) {
case ONLINE:
processDefinitionService.onlineWorkflowDefinition(loginUser, projectCode, workflowDefinitionCode);
break;
case OFFLINE:
processDefinitionService.offlineWorkflowDefinition(loginUser, projectCode, workflowDefinitionCode);
break;
default:
throw new IllegalArgumentException(
"The releaseState " + releaseState + " is illegal, please check it.");
}
return Result.success(true);
}
/**
@ -712,119 +711,6 @@ public class ProcessDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* create empty process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param scheduleJson scheduleJson
* @return process definition code
*/
@Operation(summary = "createEmptyProcessDefinition", description = "CREATE_EMPTY_PROCESS_NOTES")
@Parameters({
@Parameter(name = "name", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")),
@Parameter(name = "description", description = "PROCESS_DEFINITION_DESC", required = false, schema = @Schema(implementation = String.class))
})
@PostMapping(value = "/empty")
@ResponseStatus(HttpStatus.OK)
@ApiException(CREATE_PROCESS_DEFINITION_ERROR)
public Result createEmptyProcessDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "name", required = true) String name,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "scheduleJson", required = false) String scheduleJson,
@RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType) {
return returnDataList(processDefinitionService.createEmptyProcessDefinition(loginUser, projectCode, name,
description, globalParams,
timeout, scheduleJson, executionType));
}
/**
* update process definition basic info, not including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param scheduleJson scheduleJson
* @param executionType executionType
* @param releaseState releaseState
* @param otherParamsJson otherParamsJson handle other params
* @return update result code
*/
@Operation(summary = "updateBasicInfo", description = "UPDATE_PROCESS_DEFINITION_BASIC_INFO_NOTES")
@Parameters({
@Parameter(name = "name", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")),
@Parameter(name = "description", description = "PROCESS_DEFINITION_DESC", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = false, schema = @Schema(implementation = ReleaseState.class)),
@Parameter(name = "otherParamsJson", description = "OTHER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class))
})
@PutMapping(value = "/{code}/basic-info")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_PROCESS_DEFINITION_ERROR)
public Result updateProcessDefinitionBasicInfo(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "name", required = true) String name,
@PathVariable(value = "code", required = true) long code,
@RequestParam(value = "description", required = false) String description,
@RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams,
@RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout,
@RequestParam(value = "scheduleJson", required = false) String scheduleJson,
@RequestParam(value = "otherParamsJson", required = false) String otherParamsJson,
@RequestParam(value = "executionType", defaultValue = "PARALLEL") ProcessExecutionTypeEnum executionType,
@RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) {
Map<String, Object> result = processDefinitionService.updateProcessDefinitionBasicInfo(loginUser, projectCode,
name, code, description, globalParams,
timeout, scheduleJson, otherParamsJson, executionType);
// If the update fails, the result will be returned directly
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return returnDataList(result);
}
// Judge whether to go online after editing,0 means offline, 1 means online
if (releaseState == ReleaseState.ONLINE) {
result = processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState);
}
return returnDataList(result);
}
/**
* release process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState releaseState
* @return update result code
*/
@Operation(summary = "releaseWorkflowAndSchedule", description = "RELEASE_WORKFLOW_SCHEDULE_NOTES")
@Parameters({
@Parameter(name = "projectCode", description = "PROCESS_DEFINITION_NAME", required = true, schema = @Schema(implementation = long.class)),
@Parameter(name = "code", description = "PROCESS_DEFINITION_CODE", required = true, schema = @Schema(implementation = long.class, example = "123456789")),
@Parameter(name = "releaseState", description = "RELEASE_PROCESS_DEFINITION_NOTES", required = true, schema = @Schema(implementation = ReleaseState.class))
})
@PostMapping(value = "/{code}/release-workflow")
@ResponseStatus(HttpStatus.OK)
@ApiException(RELEASE_PROCESS_DEFINITION_ERROR)
public Result releaseWorkflowAndSchedule(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code", required = true) long code,
@RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) {
return returnDataList(
processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState));
}
/**
* query process definition global variables and local variables
*

37
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/SchedulerController.java

@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
@ -180,46 +179,30 @@ public class SchedulerController extends BaseController {
return returnDataList(result);
}
/**
* publish schedule setScheduleState
*
* @param loginUser login user
* @param projectCode project code
* @param id scheduler id
* @return publish result code
*/
@Operation(summary = "online", description = "ONLINE_SCHEDULE_NOTES")
@Parameters({
@Parameter(name = "id", description = "SCHEDULE_ID", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@PostMapping("/{id}/online")
@ApiException(PUBLISH_SCHEDULE_ONLINE_ERROR)
public Result publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.ONLINE);
return Result.success();
public Result<Boolean> publishScheduleOnline(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
schedulerService.onlineScheduler(loginUser, projectCode, id);
return Result.success(true);
}
/**
* offline schedule
*
* @param loginUser login user
* @param projectCode project code
* @param id schedule id
* @return operation result code
*/
@Operation(summary = "offline", description = "OFFLINE_SCHEDULE_NOTES")
@Parameters({
@Parameter(name = "id", description = "SCHEDULE_ID", required = true, schema = @Schema(implementation = int.class, example = "100"))
})
@PostMapping("/{id}/offline")
@ApiException(OFFLINE_SCHEDULE_ERROR)
public Result offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
schedulerService.setScheduleState(loginUser, projectCode, id, ReleaseState.OFFLINE);
return Result.success();
public Result<Boolean> offlineSchedule(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
schedulerService.offlineScheduler(loginUser, projectCode, id);
return Result.success(true);
}
/**

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -265,11 +265,10 @@ public class PythonGateway {
if (processDefinition != null) {
processDefinitionCode = processDefinition.getCode();
// make sure workflow offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
ReleaseState.OFFLINE);
processDefinitionService.offlineWorkflowDefinition(user, projectCode, processDefinitionCode);
processDefinitionService.updateProcessDefinition(user, projectCode, name,
processDefinitionCode, description, globalParams,
null, timeout, taskRelationJson, taskDefinitionJson, otherParamsJson,
null, timeout, taskRelationJson, taskDefinitionJson,
executionTypeEnum);
} else {
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name,
@ -290,8 +289,11 @@ public class PythonGateway {
warningType,
warningGroupId);
}
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
ReleaseState.getEnum(releaseState));
if (ReleaseState.ONLINE.equals(ReleaseState.getEnum(releaseState))) {
processDefinitionService.onlineWorkflowDefinition(user, projectCode, processDefinitionCode);
} else if (ReleaseState.OFFLINE.equals(ReleaseState.getEnum(releaseState))) {
processDefinitionService.offlineWorkflowDefinition(user, projectCode, processDefinitionCode);
}
return processDefinitionCode;
}
@ -346,8 +348,7 @@ public class PythonGateway {
// create or update schedule
int scheduleId;
if (scheduleObj == null) {
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.ONLINE);
processDefinitionService.onlineWorkflowDefinition(user, projectCode, workflowCode);
Map<String, Object> result = schedulerService.insertSchedule(user, projectCode, workflowCode,
schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, user.getTenantCode(),
@ -355,16 +356,15 @@ public class PythonGateway {
scheduleId = (int) result.get("scheduleId");
} else {
scheduleId = scheduleObj.getId();
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode,
ReleaseState.OFFLINE);
processDefinitionService.offlineWorkflowDefinition(user, projectCode, workflowCode);
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, user.getTenantCode(),
DEFAULT_ENVIRONMENT_CODE);
}
if (onlineSchedule) {
// set workflow online to make sure we can set schedule online
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
processDefinitionService.onlineWorkflowDefinition(user, projectCode, workflowCode);
schedulerService.onlineScheduler(user, projectCode, scheduleId);
}
}
@ -382,8 +382,7 @@ public class PythonGateway {
processDefinitionMapper.queryByDefineName(project.getCode(), workflowName);
// make sure workflow online
processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(),
ReleaseState.ONLINE);
processDefinitionService.onlineWorkflowDefinition(user, project.getCode(), processDefinition.getCode());
executorService.execProcessInstance(user,
project.getCode(),
@ -561,8 +560,7 @@ public class PythonGateway {
// get workflow info
if (processDefinition != null) {
// make sure workflow online
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(),
ReleaseState.ONLINE);
processDefinitionService.onlineWorkflowDefinition(user, projectCode, processDefinition.getCode());
result.put("id", processDefinition.getId());
result.put("name", processDefinition.getName());
result.put("code", processDefinition.getCode());

103
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -23,9 +23,7 @@ import org.apache.dolphinscheduler.api.dto.workflow.WorkflowUpdateRequest;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
@ -207,7 +205,6 @@ public interface ProcessDefinitionService {
* @param timeout timeout
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @param otherParamsJson otherParamsJson handle other params
* @return update result code
*/
Map<String, Object> updateProcessDefinition(User loginUser,
@ -220,7 +217,6 @@ public interface ProcessDefinitionService {
int timeout,
String taskRelationJson,
String taskDefinitionJson,
String otherParamsJson,
ProcessExecutionTypeEnum executionType);
/**
@ -251,20 +247,6 @@ public interface ProcessDefinitionService {
void deleteProcessDefinitionByCode(User loginUser, long workflowDefinitionCode);
/**
* release process definition: online / offline
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
Map<String, Object> releaseProcessDefinition(User loginUser,
long projectCode,
long code,
ReleaseState releaseState);
/**
* batch export process definition by codes
*
@ -414,53 +396,6 @@ public interface ProcessDefinitionService {
long code,
int version);
/**
* create empty process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param scheduleJson scheduleJson
* @return process definition code
*/
Map<String, Object> createEmptyProcessDefinition(User loginUser,
long projectCode,
String name,
String description,
String globalParams,
int timeout,
String scheduleJson,
ProcessExecutionTypeEnum executionType);
/**
* update process definition basic info, not including task definition, task relation and location.
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param scheduleJson scheduleJson
* @param otherParamsJson otherParamsJson handle other params
* @param executionType executionType
* @return update result code
*/
Map<String, Object> updateProcessDefinitionBasicInfo(User loginUser,
long projectCode,
String name,
long code,
String description,
String globalParams,
int timeout,
String scheduleJson,
String otherParamsJson,
ProcessExecutionTypeEnum executionType);
/**
* update process definition basic info, not including task definition, task relation and location.
*
@ -474,44 +409,14 @@ public interface ProcessDefinitionService {
WorkflowUpdateRequest workflowUpdateRequest);
/**
* release process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState releaseState
* @return update result code
*/
Map<String, Object> releaseWorkflowAndSchedule(User loginUser,
long projectCode,
long code,
ReleaseState releaseState);
/**
* delete other relation
* @param project
* @param result
* @param processDefinition
*/
void deleteOtherRelation(Project project, Map<String, Object> result, ProcessDefinition processDefinition);
/**
* save other relation
* @param loginUser
* @param processDefinition
* @param result
* @param otherParamsJson
* Online the workflow definition, it will check all sub workflow is online.
*/
void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result,
String otherParamsJson);
void onlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode);
/**
* get Json String
* @param loginUser
* @param processDefinition
* @return Json String
* Offline the workflow definition. It will auto offline the scheduler.
*/
String doOtherOperateProcess(User loginUser, ProcessDefinition processDefinition);
void offlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode);
/**
* view process variables

43
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java

@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
@ -125,19 +124,6 @@ public interface SchedulerService {
Schedule getSchedule(User loginUser,
Integer scheduleId);
/**
* set schedule online or offline
*
* @param loginUser login user
* @param projectCode project code
* @param id scheduler id
* @param scheduleStatus schedule status
*/
void setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus);
/**
* query schedule
*
@ -173,15 +159,6 @@ public interface SchedulerService {
*/
Map<String, Object> queryScheduleList(User loginUser, long projectCode);
/**
* delete schedule
*
* @param projectId project id
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
void deleteSchedule(int projectId, int scheduleId);
/**
* delete schedule by id
*
@ -225,4 +202,24 @@ public interface SchedulerService {
String workerGroup,
String tenantCode,
long environmentCode);
/**
* Online the scheduler by scheduler id, if the related workflow definition is not online will throw exception.
*/
void onlineScheduler(User loginUser, Long projectCode, Integer schedulerId);
/**
* Do online scheduler by workflow code, this method will not do permission check.
*/
void onlineSchedulerByWorkflowCode(Long workflowDefinitionCode);
/**
* Offline the scheduler by scheduler id, will not offline the related workflow definition.
*/
void offlineScheduler(User loginUser, Long projectCode, Integer schedulerId);
/**
* Do offline scheduler by workflow code, this method will not do permission check.
*/
void offlineSchedulerByWorkflowCode(Long workflowDefinitionCode);
}

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -706,8 +706,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return result;
}
List<Long> codes = new ArrayList<>();
processService.recurseFindSubProcess(processDefinition.getCode(), codes);
List<Long> codes = processService.findAllSubWorkflowDefinitionCode(processDefinition.getCode());
if (!codes.isEmpty()) {
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(codes);
if (processDefinitionList != null) {

545
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -35,7 +35,6 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
import static org.apache.dolphinscheduler.common.constants.Constants.COPY_SUFFIX;
import static org.apache.dolphinscheduler.common.constants.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.common.constants.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.constants.Constants.IMPORT_SUFFIX;
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
@ -44,7 +43,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto;
import org.apache.dolphinscheduler.api.dto.workflow.WorkflowCreateRequest;
@ -66,14 +64,12 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -119,12 +115,10 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedOutputStream;
@ -310,7 +304,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
globalParams, locations, timeout, loginUser.getId());
processDefinition.setExecutionType(executionType);
result = createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs, otherParamsJson);
result = createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs);
if (result.get(Constants.STATUS) == Status.SUCCESS) {
listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(loginUser, processDefinition,
taskDefinitionLogs,
@ -384,7 +378,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
protected Map<String, Object> createDagDefine(User loginUser,
List<ProcessTaskRelationLog> taskRelationList,
ProcessDefinition processDefinition,
List<TaskDefinitionLog> taskDefinitionLogs, String otherParamsJson) {
List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(),
taskDefinitionLogs, Boolean.TRUE);
@ -415,8 +409,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion);
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
return result;
@ -753,7 +745,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param timeout timeout
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @param otherParamsJson otherParamsJson handle other params
* @return update result code
*/
@Override
@ -768,7 +759,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int timeout,
String taskRelationJson,
String taskDefinitionJson,
String otherParamsJson,
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
@ -815,7 +805,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.set(projectCode, name, description, globalParams, locations, timeout);
processDefinition.setExecutionType(executionType);
result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy,
taskDefinitionLogs, otherParamsJson);
taskDefinitionLogs);
if (result.get(Constants.STATUS) == Status.SUCCESS) {
listenerEventAlertManager.publishProcessDefinitionUpdatedListenerEvent(loginUser, processDefinition,
taskDefinitionLogs,
@ -834,8 +824,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
*/
private void taskUsedInOtherTaskValid(ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> taskRelationList) {
List<ProcessTaskRelation> oldProcessTaskRelationList = processTaskRelationMapper
.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<ProcessTaskRelation> oldProcessTaskRelationList =
processTaskRelationMapper.queryByProcessCode(processDefinition.getCode());
Set<ProcessTaskRelationLog> oldProcessTaskRelationSet =
oldProcessTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toSet());
StringBuilder sb = new StringBuilder();
@ -859,8 +849,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> taskRelationList,
ProcessDefinition processDefinition,
ProcessDefinition processDefinitionDeepCopy,
List<TaskDefinitionLog> taskDefinitionLogs,
String otherParamsJson) {
List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(),
taskDefinitionLogs, Boolean.TRUE);
@ -924,7 +913,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
} else {
log.info(
"Process definition does not need to be updated because there is no change, projectCode:{}, processCode:{}, processVersion:{}.",
@ -1079,7 +1067,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
taskDefinitionLogService.deleteTaskByWorkflowDefinitionCode(processDefinition.getCode());
// delete workflow definition log
processDefinitionLogDao.deleteByWorkflowDefinitionCode(processDefinition.getCode());
deleteOtherRelation(project, new HashMap<>(), processDefinition);
// we delete the workflow definition at last to avoid using transaction here.
// If delete error, we can call this interface again.
@ -1089,87 +1076,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
listenerEventAlertManager.publishProcessDefinitionDeletedListenerEvent(loginUser, project, processDefinition);
}
/**
* release process definition: online / offline
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@Override
@Transactional
public Map<String, Object> releaseProcessDefinition(User loginUser, long projectCode, long code,
ReleaseState releaseState) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check state
if (null == releaseState) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
log.error("Process definition does not exist, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
switch (releaseState) {
case ONLINE:
List<ProcessTaskRelation> relationList =
processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) {
log.warn("Process definition has no task relation, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
log.info("Set process definition online, projectCode:{}, processDefinitionCode:{}.", projectCode,
code);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
if (updateProcess > 0) {
log.info("Set process definition offline, projectCode:{}, processDefinitionCode:{}.",
projectCode, code);
if (schedule != null) {
// set status
schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
log.error(
"Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
} else {
log.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, schedule.getId());
}
schedulerService.deleteSchedule(project.getId(), schedule.getId());
}
}
break;
default:
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* batch export process definition by codes
*/
@ -1282,7 +1188,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
for (DagDataSchedule dagDataSchedule : dagDataScheduleList) {
if (!checkAndImport(loginUser, projectCode, result, dagDataSchedule, EMPTY_STRING)) {
if (!checkAndImport(loginUser, projectCode, result, dagDataSchedule)) {
return result;
}
}
@ -1441,7 +1347,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
return createDagDefine(loginUser, processTaskRelationList, processDefinition, taskDefinitionList, EMPTY_STRING);
return createDagDefine(loginUser, processTaskRelationList, processDefinition, taskDefinitionList);
}
private ProcessTaskRelationLog buildNormalTaskRelation(long preTaskCode, long postTaskCode) {
@ -1499,8 +1405,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* check and import
*/
protected boolean checkAndImport(User loginUser, long projectCode, Map<String, Object> result,
DagDataSchedule dagDataSchedule, String otherParamsJson) {
protected boolean checkAndImport(User loginUser,
long projectCode,
Map<String, Object> result,
DagDataSchedule dagDataSchedule) {
if (!checkImportanceParams(dagDataSchedule, result)) {
return false;
}
@ -1598,8 +1506,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
processDefinition.setCreateTime(new Date());
processDefinition.setUpdateTime(new Date());
Map<String, Object> createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition,
Lists.newArrayList(), otherParamsJson);
Map<String, Object> createDagResult =
createDagDefine(loginUser, taskRelationLogList, processDefinition, Lists.newArrayList());
if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) {
putMsg(createDagResult, Status.SUCCESS);
} else {
@ -2151,12 +2059,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
diffCode.forEach(code -> failedProcessList.add(code + "[null]"));
for (ProcessDefinition processDefinition : processDefinitionList) {
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(),
processDefinition.getCode());
processTaskRelationMapper.queryByProcessCode(processDefinition.getCode());
List<ProcessTaskRelationLog> taskRelationList =
processTaskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
processDefinition.setProjectCode(targetProjectCode);
String otherParamsJson = doOtherOperateProcess(loginUser, processDefinition);
if (isCopy) {
log.info("Copy process definition...");
List<TaskDefinitionLog> taskDefinitionLogs =
@ -2226,8 +2132,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
}
try {
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs,
otherParamsJson));
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs));
} catch (Exception e) {
log.error("Copy process definition error, processDefinitionCode from {} to {}.",
oldProcessDefinitionCode, processDefinition.getCode(), e);
@ -2238,7 +2143,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
log.info("Move process definition...");
try {
result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null,
Lists.newArrayList(), otherParamsJson));
Lists.newArrayList()));
} catch (Exception e) {
log.error("Move process definition error, processDefinitionCode:{}.",
processDefinition.getCode(), e);
@ -2438,7 +2343,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
}
deleteOtherRelation(project, result, processDefinition);
log.info(
"Delete process definition version complete, projectCode:{}, processDefinitionCode:{}, version:{}.",
projectCode, code, version);
@ -2447,226 +2351,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
/**
* create empty process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param scheduleJson scheduleJson
* @return process definition code
*/
@Override
@Transactional
public Map<String, Object> createEmptyProcessDefinition(User loginUser,
long projectCode,
String name,
String description,
String globalParams,
int timeout,
String scheduleJson,
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
Map<String, Object> result = new HashMap<>();
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
if (!hasProjectAndWritePerm) {
return result;
}
if (checkDescriptionLength(description)) {
log.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.",
definition.getName(), definition.getCode());
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
long processDefinitionCode;
try {
processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
} catch (CodeGenerateException e) {
log.error("Generate process definition code error, projectCode:{}.", projectCode, e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
return result;
}
ProcessDefinition processDefinition =
new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, "", timeout, loginUser.getId());
processDefinition.setExecutionType(executionType);
result = createEmptyDagDefine(loginUser, processDefinition);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
log.error("Create empty process definition error.");
return result;
}
if (StringUtils.isBlank(scheduleJson)) {
return result;
}
// save dag schedule
Map<String, Object> scheduleResult = createDagSchedule(loginUser, processDefinition, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS);
putMsg(result, scheduleResultStatus);
throw new ServiceException(scheduleResultStatus);
}
return result;
}
protected Map<String, Object> createEmptyDagDefine(User loginUser, ProcessDefinition processDefinition) {
Map<String, Object> result = new HashMap<>();
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
log.error("Save process definition error, processDefinitionCode:{}.", processDefinition.getCode());
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
return result;
}
protected Map<String, Object> createDagSchedule(User loginUser, ProcessDefinition processDefinition,
String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (scheduleObj == null) {
putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
Date now = new Date();
scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
log.warn("The schedule start time must not be the same as the end, processDefinitionCode:{}.",
processDefinition.getCode());
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
if (!CronUtils.isValidExpression(scheduleObj.getCrontab())) {
log.error("CronExpression verify failure, cron:{}.", scheduleObj.getCrontab());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleObj.getCrontab());
return result;
}
scheduleObj
.setWarningType(scheduleObj.getWarningType() == null ? WarningType.NONE : scheduleObj.getWarningType());
scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1 : scheduleObj.getWarningGroupId());
scheduleObj.setFailureStrategy(
scheduleObj.getFailureStrategy() == null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy());
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() == null ? Priority.MEDIUM
: scheduleObj.getProcessInstancePriority());
scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ? "default" : scheduleObj.getWorkerGroup());
scheduleObj
.setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode());
scheduleMapper.insert(scheduleObj);
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
}
/**
* update process definition basic info
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams globalParams
* @param timeout timeout
* @param scheduleJson scheduleJson
* @param otherParamsJson otherParamsJson handle other params
* @param executionType executionType
* @return update result code
*/
@Override
@Transactional
public Map<String, Object> updateProcessDefinitionBasicInfo(User loginUser,
long projectCode,
String name,
long code,
String description,
String globalParams,
int timeout,
String scheduleJson,
String otherParamsJson,
ProcessExecutionTypeEnum executionType) {
Project project = projectMapper.queryByCode(projectCode);
// check if user have write perm for project
Map<String, Object> result = new HashMap<>();
boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result);
if (!hasProjectAndWritePerm) {
return result;
}
if (checkDescriptionLength(description)) {
log.warn("Parameter description is too long.");
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
log.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
// online can not permit edit
log.warn("Process definition is not allowed to be modified due to {}, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName());
return result;
}
if (!name.equals(processDefinition.getName())) {
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
log.warn("Process definition with the same name {} already exists, processDefinitionCode:{}.",
definition.getName(), definition.getCode());
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
}
ProcessDefinition processDefinitionDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams, "", timeout);
processDefinition.setExecutionType(executionType);
List<ProcessTaskRelationLog> taskRelationList = processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(), processDefinition.getVersion());
result = updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy,
Lists.newArrayList(), otherParamsJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
log.error("Update process definition basic info error.");
return result;
}
if (StringUtils.isBlank(scheduleJson)) {
return result;
}
// update dag schedule
Map<String, Object> scheduleResult = updateDagSchedule(loginUser, projectCode, code, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS);
putMsg(result, scheduleResultStatus);
throw new ServiceException(scheduleResultStatus);
}
return result;
}
private void updateWorkflowValid(User user, ProcessDefinition oldProcessDefinition,
ProcessDefinition newProcessDefinition) {
// online can not permit edit
@ -2758,7 +2442,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
long projectCode = processDefinition.getProjectCode();
long processDefinitionCode = processDefinition.getCode();
List<ProcessTaskRelation> taskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelationLog> taskRelationList =
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
@ -2820,147 +2504,42 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return (insert & resultLog) > 0 ? Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
protected Map<String, Object> updateDagSchedule(User loginUser,
long projectCode,
long processDefinitionCode,
String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (schedule == null) {
putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
// set default value
FailureStrategy failureStrategy =
ObjectUtils.defaultIfNull(schedule.getFailureStrategy(), FailureStrategy.CONTINUE);
WarningType warningType = ObjectUtils.defaultIfNull(schedule.getWarningType(), WarningType.NONE);
Priority processInstancePriority =
ObjectUtils.defaultIfNull(schedule.getProcessInstancePriority(), Priority.MEDIUM);
int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
String workerGroup = StringUtils.defaultIfBlank(schedule.getWorkerGroup(), DEFAULT_WORKER_GROUP);
String tenantCode = StringUtils.defaultIfBlank(schedule.getTenantCode(), Constants.DEFAULT);
long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
ScheduleParam param = new ScheduleParam();
param.setStartTime(schedule.getStartTime());
param.setEndTime(schedule.getEndTime());
param.setCrontab(schedule.getCrontab());
param.setTimezoneId(schedule.getTimezoneId());
return schedulerService.updateScheduleByProcessDefinitionCode(
loginUser,
projectCode,
processDefinitionCode,
JSONUtils.toJsonString(param),
warningType,
warningGroupId,
failureStrategy,
processInstancePriority,
workerGroup,
tenantCode,
environmentCode);
}
/**
* release process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState releaseState
* @return update result code
*/
@Transactional
@Override
public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long projectCode, long code,
ReleaseState releaseState) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_ONLINE_OFFLINE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check state
if (null == releaseState) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
}
public void onlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode) {
projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
log.error("Process definition does not exist, code:{}.", code);
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(code));
return result;
}
Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
if (scheduleObj == null) {
log.error("Schedule cron does not exist, processDefinitionCode:{}.", code);
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, "processDefinitionCode:" + code);
return result;
}
switch (releaseState) {
case ONLINE:
List<ProcessTaskRelation> relationList =
processService.findRelationByCode(code, processDefinition.getVersion());
if (CollectionUtils.isEmpty(relationList)) {
log.warn("Process definition has no task relation, processDefinitionCode:{}.", code);
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
schedulerService.setScheduleState(loginUser, projectCode, scheduleObj.getId(), ReleaseState.ONLINE);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
if (updateProcess > 0) {
log.info("Set schedule offline, projectCode:{}, processDefinitionCode:{}, scheduleId:{}.",
projectCode, code, scheduleObj.getId());
// set status
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
int updateSchedule = scheduleMapper.updateById(scheduleObj);
if (updateSchedule == 0) {
log.error(
"Set schedule offline error, projectCode:{}, processDefinitionCode:{}, scheduleId:{}",
projectCode, code, scheduleObj.getId());
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}
schedulerService.deleteSchedule(project.getId(), scheduleObj.getId());
}
break;
default:
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode)
.orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode));
if (ReleaseState.ONLINE.equals(workflowDefinition.getReleaseState())) {
// do nothing if the workflow is already online
return;
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* save other relation
* @param loginUser
* @param processDefinition
* @param result
* @param otherParamsJson
*/
@Override
public void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result,
String otherParamsJson) {
checkWorkflowDefinitionIsValidated(workflowDefinition.getCode());
checkAllSubWorkflowDefinitionIsOnline(workflowDefinition.getCode());
workflowDefinition.setReleaseState(ReleaseState.ONLINE);
processDefinitionDao.updateById(workflowDefinition);
}
/**
* get Json String
* @param loginUser
* @param processDefinition
* @return Json String
*/
@Transactional
@Override
public String doOtherOperateProcess(User loginUser, ProcessDefinition processDefinition) {
return null;
public void offlineWorkflowDefinition(User loginUser, Long projectCode, Long workflowDefinitionCode) {
projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE);
ProcessDefinition workflowDefinition = processDefinitionDao.queryByCode(workflowDefinitionCode)
.orElseThrow(() -> new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode));
if (ReleaseState.OFFLINE.equals(workflowDefinition.getReleaseState())) {
// do nothing if the workflow is already offline
return;
}
workflowDefinition.setReleaseState(ReleaseState.OFFLINE);
processDefinitionDao.updateById(workflowDefinition);
schedulerService.offlineSchedulerByWorkflowCode(workflowDefinitionCode);
}
/**
@ -3019,7 +2598,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Set<Long> taskCodeSet = new TreeSet<>();
processTaskRelationMapper.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode())
processTaskRelationMapper.queryByProcessCode(processDefinition.getCode())
.forEach(processTaskRelation -> {
if (processTaskRelation.getPreTaskCode() > 0) {
taskCodeSet.add(processTaskRelation.getPreTaskCode());
@ -3046,15 +2625,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return localUserDefParams;
}
/**
* delete other relation
* @param project
* @param result
* @param processDefinition
*/
@Override
public void deleteOtherRelation(Project project, Map<String, Object> result, ProcessDefinition processDefinition) {
private void checkWorkflowDefinitionIsValidated(Long workflowDefinitionCode) {
// todo: build dag check if the dag is validated
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(workflowDefinitionCode);
if (CollectionUtils.isEmpty(processTaskRelations)) {
throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
}
// todo : check Workflow is validate
}
private void checkAllSubWorkflowDefinitionIsOnline(Long workflowDefinitionCode) {
List<Long> allSubWorkflowDefinitionCodes =
processService.findAllSubWorkflowDefinitionCode(workflowDefinitionCode);
if (CollectionUtils.isEmpty(allSubWorkflowDefinitionCodes)) {
return;
}
for (Long subWorkflowDefinitionCode : allSubWorkflowDefinitionCodes) {
ProcessDefinition subWorkflowDefinition = processDefinitionDao.queryByCode(subWorkflowDefinitionCode)
.orElseThrow(() -> new ServiceException(PROCESS_DEFINE_NOT_EXIST, workflowDefinitionCode));
if (!ReleaseState.ONLINE.equals(subWorkflowDefinition.getReleaseState())) {
throw new ServiceException(
"SubWorkflowDefinition " + subWorkflowDefinition.getName() + " is not online");
}
}
}
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

@ -137,7 +137,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
}
updateProcessDefiniteVersion(loginUser, result, processDefinition);
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelation> processTaskRelations = Lists.newArrayList(processTaskRelationList);
if (!processTaskRelations.isEmpty()) {
Map<Long, ProcessTaskRelation> preTaskCodeMap =
@ -327,7 +327,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
log.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode,
@ -514,7 +514,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
long projectCode = processDefinition.getProjectCode();
long processDefinitionCode = processDefinition.getCode();
List<ProcessTaskRelation> taskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelationLog> taskRelationList =
taskRelations.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
@ -675,7 +675,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
processTaskRelationMapper.queryByProcessCode(processDefinition.getCode());
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
List<ProcessTaskRelation> processTaskRelationWaitRemove = Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
@ -746,7 +746,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
processTaskRelationMapper.queryByProcessCode(processDefinition.getCode());
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
processTaskRelationList
.removeIf(processTaskRelation -> postTaskCodeList.contains(processTaskRelation.getPostTaskCode())
@ -858,7 +858,7 @@ public class ProcessTaskRelationServiceImpl extends BaseServiceImpl implements P
return result;
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList(processTaskRelations);
if (CollectionUtils.isEmpty(processTaskRelationList)) {
log.error("Process task relations are empty, projectCode:{}, processDefinitionCode:{}.", projectCode,

216
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.dto.schedule.ScheduleCreateRequest;
@ -26,7 +27,6 @@ import org.apache.dolphinscheduler.api.dto.schedule.ScheduleUpdateRequest;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -38,26 +38,22 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -95,12 +91,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private ExecutorService executorService;
@Autowired
private MonitorService monitorService;
@Autowired
private ProcessService processService;
@Autowired
private ScheduleMapper scheduleMapper;
@ -113,9 +103,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private SchedulerApi schedulerApi;
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
@Autowired
private EnvironmentMapper environmentMapper;
@ -436,123 +423,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return schedule;
}
/**
* set schedule online or offline
*
* @param loginUser login user
* @param projectCode project code
* @param id scheduler id
* @param scheduleStatus schedule status
* @return publish result code
*/
@Override
@Transactional
public void setScheduleState(User loginUser,
long projectCode,
Integer id,
ReleaseState scheduleStatus) {
Project project = projectMapper.queryByCode(projectCode);
// check project auth
projectService.checkProjectAndAuthThrowException(loginUser, project, null);
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
log.error("Schedule does not exist, scheduleId:{}.", id);
throw new ServiceException(Status.SCHEDULE_CRON_NOT_EXISTS, id);
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
log.warn("Schedule state does not need to change due to schedule state is already {}, scheduleId:{}.",
scheduleObj.getReleaseState().getDescp(), scheduleObj.getId());
throw new ServiceException(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
log.error("Process definition does not exist, processDefinitionCode:{}.",
scheduleObj.getProcessDefinitionCode());
throw new ServiceException(Status.PROCESS_DEFINE_NOT_EXIST,
String.valueOf(scheduleObj.getProcessDefinitionCode()));
}
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
log.error("Process task relations do not exist, projectCode:{}, processDefinitionCode:{}.", projectCode,
processDefinition.getCode());
throw new ServiceException(Status.PROCESS_DAG_IS_EMPTY);
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
log.warn("Only process definition state is {} can change schedule state, processDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), processDefinition.getCode());
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
}
// check sub process definition release state
List<Long> subProcessDefineCodes = new ArrayList<>();
processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
if (!subProcessDefineCodes.isEmpty()) {
log.info(
"Need to check sub process definition state before change schedule state, subProcessDefineCodes:{}.",
org.apache.commons.lang.StringUtils.join(subProcessDefineCodes, ","));
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
log.warn(
"Only sub process definition state is {} can change schedule state, subProcessDefinitionCode:{}.",
ReleaseState.ONLINE.getDescp(), subProcessDefinition.getCode());
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
}
}
}
}
}
// check master server exists
List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) {
log.error("Master does not exist.");
throw new ServiceException(Status.MASTER_NOT_EXISTS);
}
// set status
scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try {
switch (scheduleStatus) {
case ONLINE:
log.info("Call master client set schedule online, project id: {}, flow id: {},host: {}",
project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
case OFFLINE:
log.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}",
project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
default:
throw new ServiceException(Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
}
} catch (Exception e) {
log.error("Set schedule state to {} error, projectCode:{}, scheduleId:{}.", scheduleStatus.getDescp(),
projectCode, scheduleObj.getId());
Status status = scheduleStatus == ReleaseState.ONLINE ? Status.PUBLISH_SCHEDULE_ONLINE_ERROR
: Status.OFFLINE_SCHEDULE_ERROR;
throw new ServiceException(status, e);
}
}
/**
* query schedule
*
@ -669,25 +539,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
public void setSchedule(int projectId, Schedule schedule) {
log.info("Set schedule state {}, project id: {}, scheduleId: {}", schedule.getReleaseState().getDescp(),
projectId, schedule.getId());
schedulerApi.insertOrUpdateScheduleTask(projectId, schedule);
}
/**
* delete schedule
*
* @param projectId project id
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
@Override
public void deleteSchedule(int projectId, int scheduleId) {
log.info("Delete schedule of project, projectId:{}, scheduleId:{}", projectId, scheduleId);
schedulerApi.deleteScheduleTask(projectId, scheduleId);
}
/**
* check valid
*
@ -823,6 +674,71 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
@Transactional
@Override
public void onlineScheduler(User loginUser, Long projectCode, Integer schedulerId) {
projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE);
Schedule schedule = scheduleMapper.selectById(schedulerId);
doOnlineScheduler(schedule);
}
@Transactional
@Override
public void onlineSchedulerByWorkflowCode(Long workflowDefinitionCode) {
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(workflowDefinitionCode);
doOnlineScheduler(schedule);
}
private void doOnlineScheduler(Schedule schedule) {
if (schedule == null) {
return;
}
if (ReleaseState.ONLINE.equals(schedule.getReleaseState())) {
log.debug("The schedule is already online, scheduleId:{}.", schedule.getId());
return;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
if (!ReleaseState.ONLINE.equals(processDefinition.getReleaseState())) {
throw new ServiceException(Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
}
schedule.setReleaseState(ReleaseState.ONLINE);
scheduleMapper.updateById(schedule);
Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
schedulerApi.insertOrUpdateScheduleTask(project.getId(), schedule);
}
@Transactional
@Override
public void offlineScheduler(User loginUser, Long projectCode, Integer schedulerId) {
projectService.checkProjectAndAuthThrowException(loginUser, projectCode, WORKFLOW_ONLINE_OFFLINE);
Schedule schedule = scheduleMapper.selectById(schedulerId);
doOfflineScheduler(schedule);
}
@Transactional
@Override
public void offlineSchedulerByWorkflowCode(Long workflowDefinitionCode) {
Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(workflowDefinitionCode);
doOfflineScheduler(schedule);
}
private void doOfflineScheduler(Schedule schedule) {
if (schedule == null) {
return;
}
if (ReleaseState.OFFLINE.equals(schedule.getReleaseState())) {
log.debug("The schedule is already offline, scheduleId:{}.", schedule.getId());
return;
}
schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
Project project = projectMapper.queryByCode(processDefinition.getProjectCode());
schedulerApi.deleteScheduleTask(project.getId(), schedule.getId());
}
private void updateSchedule(Map<String, Object> result, Schedule schedule, ProcessDefinition processDefinition,
String scheduleExpression, WarningType warningType, int warningGroupId,
FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup,

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -341,7 +341,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinition.setCode(CodeGenerateUtils.getInstance().genCode());
}
List<ProcessTaskRelationLog> processTaskRelationLogList =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode)
processTaskRelationMapper.queryByProcessCode(processDefinitionCode)
.stream()
.map(ProcessTaskRelationLog::new)
.collect(Collectors.toList());
@ -504,8 +504,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
"Task definition has upstream tasks, start handle them after delete task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(taskDefinition.getProjectCode(), processDefinitionCode);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
List<ProcessTaskRelation> relationList = processTaskRelations.stream()
.filter(r -> r.getPostTaskCode() != taskCode).collect(Collectors.toList());
updateDag(loginUser, processDefinitionCode, relationList, Lists.newArrayList());
@ -571,7 +571,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations,
Lists.newArrayList(taskDefinitionToUpdate));
}
@ -645,8 +645,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
"Task definition has upstream tasks, start handle them after update task, taskDefinitionCode:{}.",
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(taskDefinitionUpdate.getProjectCode(), processDefinitionCode);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations, Lists.newArrayList(taskDefinitionLog));
}
this.updateTaskUpstreams(loginUser, taskUpdateRequest.getWorkflowCode(), taskDefinitionUpdate.getCode(),
@ -894,7 +894,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
if (MapUtils.isNotEmpty(queryUpStreamTaskCodeMap)) {
ProcessTaskRelation taskRelation = upstreamTaskRelations.get(0);
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, taskRelation.getProcessDefinitionCode());
processTaskRelationMapper.queryByProcessCode(taskRelation.getProcessDefinitionCode());
// set upstream code list
updateUpstreamTask(new HashSet<>(queryUpStreamTaskCodeMap.keySet()),
@ -1064,7 +1064,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskCode);
long processDefinitionCode = taskRelationList.get(0).getProcessDefinitionCode();
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
updateDag(loginUser, processDefinitionCode, processTaskRelations,
Lists.newArrayList(taskDefinitionUpdate));
} else {

12
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -166,12 +166,12 @@ public class ProcessDefinitionControllerTest {
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description,
globalParams,
locations, timeout, relationJson, taskDefinitionJson, "",
locations, timeout, relationJson, taskDefinitionJson,
ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code,
description, globalParams,
locations, timeout, relationJson, taskDefinitionJson, "", ProcessExecutionTypeEnum.PARALLEL,
locations, timeout, relationJson, taskDefinitionJson, ProcessExecutionTypeEnum.PARALLEL,
ReleaseState.OFFLINE);
Assertions.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@ -179,13 +179,13 @@ public class ProcessDefinitionControllerTest {
@Test
public void testReleaseProcessDefinition() {
long projectCode = 1L;
int id = 1;
long id = 1L;
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE))
.thenReturn(result);
Result response =
Mockito.doNothing().when(processDefinitionService)
.offlineWorkflowDefinition(user, projectCode, id);
Result<Boolean> response =
processDefinitionController.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE);
Assertions.assertTrue(response != null && response.isSuccess());
}

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/SchedulerControllerTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.api.controller;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isA;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.delete;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
@ -31,7 +32,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Resource;
@ -122,9 +122,7 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "37");
Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
isA(Integer.class),
isA(ReleaseState.class));
Mockito.doNothing().when(schedulerService).onlineScheduler(any(), any(), any());
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/online", 123, 37)
.header(SESSION_ID, sessionId)
@ -143,9 +141,7 @@ public class SchedulerControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "28");
Mockito.doNothing().when(schedulerService).setScheduleState(isA(User.class), isA(Long.class),
isA(Integer.class),
isA(ReleaseState.class));
Mockito.doNothing().when(schedulerService).offlineScheduler(any(), any(), any());
MvcResult mvcResult = mockMvc.perform(post("/projects/{projectCode}/schedules/{id}/offline", 123, 28)
.header(SESSION_ID, sessionId)

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java

@ -563,9 +563,8 @@ public class ExecuteFunctionServiceTest {
@Test
public void testStartCheckByProcessDefinedCode() {
List<Long> ids = new ArrayList<>();
ids.add(1L);
Mockito.doNothing().when(processService).recurseFindSubProcess(1, ids);
List<Long> ids = Lists.newArrayList(1L);
when(processService.findAllSubWorkflowDefinitionCode(1)).thenReturn(ids);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
ProcessDefinition processDefinition = new ProcessDefinition();

45
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -23,7 +23,6 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_ONLINE_OFFLINE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
@ -467,7 +466,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
}
when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
when(processService.saveProcessDefine(user, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
when(processTaskRelationMapper.queryByProcessCode(processDefinitionCode))
.thenReturn(getProcessTaskRelation());
putMsg(result, Status.SUCCESS);
@ -610,46 +609,6 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
Mockito.verify(metricsCleanUpService, times(2)).cleanUpWorkflowMetricsByDefinitionCode(11L);
}
@Test
public void testReleaseProcessDefinition() {
when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
// project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
when(projectService.checkProjectAndAuth(user, project, projectCode, WORKFLOW_ONLINE_OFFLINE))
.thenReturn(result);
Map<String, Object> map = processDefinitionService.releaseProcessDefinition(user, projectCode,
processDefinitionCode, ReleaseState.OFFLINE);
Assertions.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
// project check auth success, processes definition online
putMsg(result, Status.SUCCESS, projectCode);
when(processDefinitionMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setPostTaskCode(123L);
processTaskRelationList.add(processTaskRelation);
when(processService.findRelationByCode(46L, 1)).thenReturn(processTaskRelationList);
Map<String, Object> onlineRes =
processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.ONLINE);
Assertions.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS));
// project check auth success, processes definition online
Map<String, Object> onlineWithResourceRes =
processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.ONLINE);
Assertions.assertEquals(Status.SUCCESS, onlineWithResourceRes.get(Constants.STATUS));
// release error code
Map<String, Object> failRes =
processDefinitionService.releaseProcessDefinition(user, projectCode, 46, ReleaseState.getEnum(2));
Assertions.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failRes.get(Constants.STATUS));
}
@Test
public void testVerifyProcessDefinitionName() {
when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
@ -824,7 +783,7 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
try {
processDefinitionService.updateProcessDefinition(user, projectCode, "test", 1,
"", "", "", 0, null, "", null, ProcessExecutionTypeEnum.PARALLEL);
"", "", "", 0, null, "", ProcessExecutionTypeEnum.PARALLEL);
Assertions.fail();
} catch (ServiceException ex) {
Assertions.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ex.getCode());

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

@ -456,7 +456,7 @@ public class ProcessTaskRelationServiceTest {
Mockito.when(processTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode))
.thenReturn(processTaskRelationList);
Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(getProcessDefinition());
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.queryByProcessCode(1L)).thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(user, 1L, 1L,
@ -494,7 +494,7 @@ public class ProcessTaskRelationServiceTest {
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode))
Mockito.when(processTaskRelationMapper.queryByProcessCode(processDefinitionCode))
.thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
@ -527,7 +527,7 @@ public class ProcessTaskRelationServiceTest {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
Mockito.when(projectService.checkProjectAndAuth(user, project, projectCode, null)).thenReturn(result);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1L)).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.queryByProcessCode(1L)).thenReturn(processTaskRelationList);
List<ProcessTaskRelationLog> relationLogs =
processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList());
Mockito.when(processService.saveTaskRelation(user, 1L, 1L,

63
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java

@ -28,10 +28,8 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
@ -44,9 +42,6 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.scheduler.api.SchedulerApi;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -123,64 +118,6 @@ public class SchedulerServiceTest extends BaseServiceTestTool {
user.setId(userId);
}
@Test
public void testSetScheduleState() {
Project project = getProject();
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectCode(projectCode);
Schedule schedule = new Schedule();
schedule.setId(1);
schedule.setProcessDefinitionCode(1);
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(scheduleMapper.selectById(1)).thenReturn(schedule);
Mockito.when(processDefinitionMapper.queryByCode(1)).thenReturn(processDefinition);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
// schedule not exists
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 2, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.SCHEDULE_CRON_NOT_EXISTS.getCode(), ((ServiceException) exception).getCode());
// SCHEDULE_CRON_RELEASE_NEED_NOT_CHANGE
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.OFFLINE);
});
Assertions.assertEquals(Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE.getCode(),
((ServiceException) exception).getCode());
// PROCESS_DEFINE_NOT_EXIST
schedule.setProcessDefinitionCode(2);
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST.getCode(), ((ServiceException) exception).getCode());
schedule.setProcessDefinitionCode(1);
// online also success
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
processTaskRelationList.add(processTaskRelation);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 1)).thenReturn(processTaskRelationList);
exception = Assertions.assertThrows(ServiceException.class, () -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
Assertions.assertEquals(Status.PROCESS_DEFINE_NOT_RELEASE.getCode(), ((ServiceException) exception).getCode());
// SUCCESS
Server server = new Server();
List<Server> serverList = new ArrayList<>();
serverList.add(server);
Mockito.when(monitorService.getServerListFromRegistry(true)).thenReturn(serverList);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Assertions.assertDoesNotThrow(() -> {
schedulerService.setScheduleState(user, project.getCode(), 1, ReleaseState.ONLINE);
});
}
@Test
public void testCreateSchedulesV2() {
Project project = this.getProject();

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -563,8 +563,8 @@ public class TaskDefinitionServiceImplTest {
// saveTaskRelation
List<ProcessTaskRelationLog> processTaskRelationLogList = getProcessTaskRelationLogList();
Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getProjectCode()),
eq(processDefinition.getCode()))).thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.queryByProcessCode(eq(processDefinition.getCode())))
.thenReturn(processTaskRelationList);
Mockito.when(processTaskRelationMapper.batchInsert(isA(List.class))).thenReturn(1);
Mockito.when(processTaskRelationLogMapper.batchInsert(isA(List.class))).thenReturn(1);
int insertResult = processServiceImpl.saveTaskRelation(loginUser, processDefinition.getProjectCode(),

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -41,13 +41,11 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
/**
* process task relation by projectCode and processCode
*
* @param projectCode projectCode
* @param processCode processCode
* @return ProcessTaskRelation list
*/
@Cacheable(unless = "#result == null || #result.size() == 0")
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);
List<ProcessTaskRelation> queryByProcessCode(@Param("processCode") long processCode);
/**
* update

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -26,8 +26,7 @@
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE project_code = #{projectCode}
and process_definition_code = #{processCode}
WHERE process_definition_code = #{processCode}
</select>
<select id="queryByTaskCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapperTest.java

@ -56,7 +56,7 @@ public class ProcessTaskRelationMapperTest extends BaseDaoTest {
@Test
public void testQueryByProcessCode() {
ProcessTaskRelation processTaskRelation = insertOne();
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(1L, 1L);
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(1L);
Assertions.assertNotEquals(0, processTaskRelations.size());
}

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -86,7 +86,7 @@ public interface ProcessService {
void removeTaskLogFile(Integer processInstanceId);
void recurseFindSubProcess(long parentCode, List<Long> ids);
List<Long> findAllSubWorkflowDefinitionCode(long workflowDefinitionCode);
String getTenantForProcess(String tenantCode, int userId);

62
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -147,6 +147,7 @@ import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@ -531,51 +532,26 @@ public class ProcessServiceImpl implements ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentCode parentCode
* @param ids ids
*/
@Override
public void recurseFindSubProcess(long parentCode, List<Long> ids) {
public List<Long> findAllSubWorkflowDefinitionCode(long parentCode) {
List<TaskDefinition> taskNodeList = taskDefinitionDao.getTaskDefinitionListByDefinition(parentCode);
if (taskNodeList != null && !taskNodeList.isEmpty()) {
for (TaskDefinition taskNode : taskNodeList) {
String parameter = taskNode.getTaskParams();
ObjectNode parameterJson = JSONUtils.parseObject(parameter);
if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_CODE) != null) {
SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
ids.add(subProcessParam.getProcessDefinitionCode());
recurseFindSubProcess(subProcessParam.getProcessDefinitionCode(), ids);
}
}
}
}
/**
* get schedule time from command
*
* @param command command
* @param cmdParam cmdParam map
* @return date
*/
private Date getScheduleTime(Command command, Map<String, String> cmdParam) throws CronParseException {
Date scheduleTime = command.getScheduleTime();
if (scheduleTime == null && cmdParam != null && cmdParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE)) {
Date start = DateUtils.stringToDate(cmdParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE));
Date end = DateUtils.stringToDate(cmdParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE));
List<Schedule> schedules =
queryReleaseSchedulerListByProcessDefinitionCode(command.getProcessDefinitionCode());
List<Date> complementDateList = CronUtils.getSelfFireDateList(start, end, schedules);
if (CollectionUtils.isNotEmpty(complementDateList)) {
scheduleTime = complementDateList.get(0);
} else {
log.error("set scheduler time error: complement date list is empty, command: {}",
command.toString());
if (CollectionUtils.isEmpty(taskNodeList)) {
return Collections.emptyList();
}
List<Long> subWorkflowDefinitionCodes = new ArrayList<>();
for (TaskDefinition taskNode : taskNodeList) {
String parameter = taskNode.getTaskParams();
ObjectNode parameterJson = JSONUtils.parseObject(parameter);
if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_CODE) != null) {
SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class);
long subWorkflowDefinitionCode = subProcessParam.getProcessDefinitionCode();
subWorkflowDefinitionCodes.add(subWorkflowDefinitionCode);
subWorkflowDefinitionCodes.addAll(findAllSubWorkflowDefinitionCode(subWorkflowDefinitionCode));
}
}
return scheduleTime;
return subWorkflowDefinitionCodes;
}
/**
@ -1863,8 +1839,8 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public int switchProcessTaskRelationVersion(ProcessDefinition processDefinition) {
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper
.queryByProcessCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) {
processTaskRelationMapper.deleteByCode(processDefinition.getProjectCode(), processDefinition.getCode());
}
@ -2088,7 +2064,7 @@ public class ProcessServiceImpl implements ProcessService {
int insert = taskRelationList.size();
if (Boolean.TRUE.equals(syncDefine)) {
List<ProcessTaskRelation> processTaskRelationList =
processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
processTaskRelationMapper.queryByProcessCode(processDefinitionCode);
if (!processTaskRelationList.isEmpty()) {
Set<Integer> processTaskRelationSet =
processTaskRelationList.stream().map(ProcessTaskRelation::hashCode).collect(toSet());

110
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.C
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -210,7 +211,7 @@ public class ProcessServiceTest {
command1.setProcessDefinitionVersion(definitionVersion);
command1.setCommandParam("{\"ProcessInstanceId\":222}");
command1.setCommandType(CommandType.START_PROCESS);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
when(commandMapper.deleteById(1)).thenReturn(1);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(123);
@ -230,12 +231,12 @@ public class ProcessServiceTest {
processInstance.setProcessDefinitionCode(definitionCode);
processInstance.setProcessDefinitionVersion(definitionVersion);
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode()))
when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode()))
.thenReturn(processDefinition);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
.thenReturn(1);
Assertions.assertNotNull(processService.handleCommand(host, command1));
@ -246,7 +247,7 @@ public class ProcessServiceTest {
command2.setProcessDefinitionVersion(definitionVersion);
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
when(commandMapper.deleteById(2)).thenReturn(1);
Assertions.assertNotNull(processService.handleCommand(host, command2));
Command command3 = new Command();
@ -256,7 +257,7 @@ public class ProcessServiceTest {
command3.setProcessInstanceId(processInstanceId);
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
when(commandMapper.deleteById(3)).thenReturn(1);
Assertions.assertNotNull(processService.handleCommand(host, command3));
Command command4 = new Command();
@ -266,7 +267,7 @@ public class ProcessServiceTest {
command4.setCommandParam("{\"WaitingThreadInstanceId\":222,\"StartNodeIdList\":\"n1,n2\"}");
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
when(commandMapper.deleteById(4)).thenReturn(1);
Assertions.assertNotNull(processService.handleCommand(host, command4));
Command command5 = new Command();
@ -280,8 +281,8 @@ public class ProcessServiceTest {
command5.setCommandParam(JSONUtils.toJsonString(commandParams));
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
Mockito.when(curingGlobalParamsService.curingGlobalParams(null,
when(commandMapper.deleteById(5)).thenReturn(1);
when(curingGlobalParamsService.curingGlobalParams(null,
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.START_PROCESS,
@ -308,9 +309,9 @@ public class ProcessServiceTest {
processInstance2.setId(223);
processInstance2.setProcessDefinitionCode(11L);
processInstance2.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
when(commandMapper.deleteById(1)).thenReturn(1);
Assertions.assertNotNull(processService.handleCommand(host, command1));
Command command6 = new Command();
@ -319,20 +320,20 @@ public class ProcessServiceTest {
command6.setCommandParam("{\"ProcessInstanceId\":223}");
command6.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command6.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1,
when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1,
org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 = processService.handleCommand(host, command6);
Assertions.assertNotNull(processInstance6);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
ProcessInstance processInstance7 = new ProcessInstance();
processInstance7.setId(224);
processInstance7.setProcessDefinitionCode(11L);
processInstance7.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(224)).thenReturn(processInstance7);
when(processInstanceMapper.queryDetailById(224)).thenReturn(processInstance7);
Command command7 = new Command();
command7.setId(7);
@ -340,8 +341,8 @@ public class ProcessServiceTest {
command7.setCommandParam("{\"ProcessInstanceId\":224}");
command7.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1,
when(commandMapper.deleteById(7)).thenReturn(1);
when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1,
org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(host, command7);
Assertions.assertNotNull(processInstance8);
@ -352,7 +353,7 @@ public class ProcessServiceTest {
processDefinition2.setVersion(1);
processDefinition2.setCode(12L);
processDefinition2.setExecutionType(ProcessExecutionTypeEnum.SERIAL_PRIORITY);
Mockito.when(processDefineMapper.queryByCode(12L)).thenReturn(processDefinition2);
when(processDefineMapper.queryByCode(12L)).thenReturn(processDefinition2);
ProcessInstance processInstance9 = new ProcessInstance();
processInstance9.setId(225);
processInstance9.setProcessDefinitionCode(11L);
@ -363,11 +364,11 @@ public class ProcessServiceTest {
command9.setCommandParam("{\"ProcessInstanceId\":225}");
command9.setCommandType(CommandType.RECOVER_SERIAL_WAIT);
command9.setProcessDefinitionVersion(1);
Mockito.when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1,
when(processInstanceMapper.queryDetailById(225)).thenReturn(processInstance9);
when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1,
org.apache.dolphinscheduler.service.utils.Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(host, command9);
Assertions.assertNotNull(processInstance10);
@ -384,9 +385,9 @@ public class ProcessServiceTest {
commandParams12.put(CMD_PARAM_START_PARAMS, JSONUtils.toJsonString(startParams12));
commandParams12.put("ProcessInstanceId", "222");
command12.setCommandParam(JSONUtils.toJsonString(commandParams12));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(commandMapper.deleteById(12)).thenReturn(1);
Mockito.when(curingGlobalParamsService.curingGlobalParams(222,
when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
when(commandMapper.deleteById(12)).thenReturn(1);
when(curingGlobalParamsService.curingGlobalParams(222,
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.RECOVER_TOLERANCE_FAULT_PROCESS,
@ -429,12 +430,12 @@ public class ProcessServiceTest {
processInstance.setProcessDefinitionCode(definitionCode);
processInstance.setProcessDefinitionVersion(definitionVersion);
Mockito.when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode()))
when(processDefineMapper.queryByCode(command1.getProcessDefinitionCode()))
.thenReturn(processDefinition);
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Mockito.when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
when(triggerRelationService.saveProcessInstanceTrigger(Mockito.any(), Mockito.any()))
.thenReturn(1);
Assertions.assertThrows(ServiceException.class, () -> {
@ -447,7 +448,7 @@ public class ProcessServiceTest {
public void testGetUserById() {
User user = new User();
user.setId(123);
Mockito.when(userMapper.selectById(123)).thenReturn(user);
when(userMapper.selectById(123)).thenReturn(user);
Assertions.assertEquals(user, processService.getUserById(123));
}
@ -456,7 +457,7 @@ public class ProcessServiceTest {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(333);
taskInstance.setProcessInstanceId(222);
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(null);
when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId())).thenReturn(null);
Assertions.assertEquals("", processService.formatTaskAppId(taskInstance));
ProcessDefinition processDefinition = new ProcessDefinition();
@ -465,13 +466,13 @@ public class ProcessServiceTest {
processInstance.setId(222);
processInstance.setProcessDefinitionVersion(1);
processInstance.setProcessDefinitionCode(1L);
Mockito.when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId()))
when(processService.findProcessInstanceById(taskInstance.getProcessInstanceId()))
.thenReturn(processInstance);
Assertions.assertEquals("", processService.formatTaskAppId(taskInstance));
}
@Test
public void testRecurseFindSubProcessId() {
public void testFindAllSubWorkflowDefinitionCode() {
int parentProcessDefineId = 1;
long parentProcessDefineCode = 1L;
int parentProcessDefineVersion = 1;
@ -479,7 +480,7 @@ public class ProcessServiceTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(parentProcessDefineCode);
processDefinition.setVersion(parentProcessDefineVersion);
Mockito.when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition);
when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition);
long postTaskCode = 2L;
int postTaskVersion = 2;
@ -489,19 +490,16 @@ public class ProcessServiceTest {
processTaskRelationLog.setPostTaskCode(postTaskCode);
processTaskRelationLog.setPostTaskVersion(postTaskVersion);
relationLogList.add(processTaskRelationLog);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode,
when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode,
parentProcessDefineVersion)).thenReturn(relationLogList);
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog();
taskDefinitionLog1.setTaskParams("{\"processDefinitionCode\": 123L}");
taskDefinitionLogs.add(taskDefinitionLog1);
Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs);
when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs);
List<Long> ids = new ArrayList<>();
processService.recurseFindSubProcess(parentProcessDefineCode, ids);
Assertions.assertEquals(0, ids.size());
Assertions.assertTrue(processService.findAllSubWorkflowDefinitionCode(parentProcessDefineCode).isEmpty());
}
@Test
@ -521,19 +519,19 @@ public class ProcessServiceTest {
@Test
public void getDqRule() {
Mockito.when(dqRuleMapper.selectById(1)).thenReturn(new DqRule());
when(dqRuleMapper.selectById(1)).thenReturn(new DqRule());
Assertions.assertNotNull(processService.getDqRule(1));
}
@Test
public void getRuleInputEntry() {
Mockito.when(dqRuleInputEntryMapper.getRuleInputEntryList(1)).thenReturn(getRuleInputEntryList());
when(dqRuleInputEntryMapper.getRuleInputEntryList(1)).thenReturn(getRuleInputEntryList());
Assertions.assertNotNull(processService.getRuleInputEntry(1));
}
@Test
public void getDqExecuteSql() {
Mockito.when(dqRuleExecuteSqlMapper.getExecuteSqlList(1)).thenReturn(getRuleExecuteSqlList());
when(dqRuleExecuteSqlMapper.getExecuteSqlList(1)).thenReturn(getRuleExecuteSqlList());
Assertions.assertNotNull(processService.getDqExecuteSql(1));
}
@ -643,13 +641,13 @@ public class ProcessServiceTest {
taskDefinition.setVersion(1);
taskDefinition.setCreateTime(new Date());
taskDefinition.setUpdateTime(new Date());
Mockito.when(taskPluginManager.getParameters(any())).thenReturn(null);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(),
when(taskPluginManager.getParameters(any())).thenReturn(null);
when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskDefinition.getCode(),
taskDefinition.getVersion())).thenReturn(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);
Mockito.when(taskDefinitionMapper.queryByCodeList(Collections.singletonList(taskDefinition.getCode())))
when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode())).thenReturn(1);
when(taskDefinitionMapper.queryByCodeList(Collections.singletonList(taskDefinition.getCode())))
.thenReturn(Collections.singletonList(taskDefinition));
Mockito.when(taskDefinitionMapper.queryByCode(Mockito.anyLong())).thenReturn(taskDefinition);
when(taskDefinitionMapper.queryByCode(Mockito.anyLong())).thenReturn(taskDefinition);
int result = processService.saveTaskDefine(operator, projectCode, taskDefinitionLogs, Boolean.TRUE);
Assertions.assertEquals(0, result);
}
@ -700,8 +698,8 @@ public class ProcessServiceTest {
taskDefinitionLogs.add(taskDefinition);
taskDefinitionLogs.add(td2);
Mockito.when(taskDefinitionLogDao.queryTaskDefineLogList(any())).thenReturn(taskDefinitionLogs);
Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
when(taskDefinitionLogDao.queryTaskDefineLogList(any())).thenReturn(taskDefinitionLogs);
when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(Mockito.anyLong(), Mockito.anyInt()))
.thenReturn(list);
DAG<Long, TaskNode, TaskNodeRelation> stringTaskNodeTaskNodeRelationDAG =
@ -750,7 +748,7 @@ public class ProcessServiceTest {
@Test
public void testCreateTaskGroupQueue() {
Mockito.when(taskGroupQueueMapper.insert(Mockito.any(TaskGroupQueue.class))).thenReturn(1);
when(taskGroupQueueMapper.insert(Mockito.any(TaskGroupQueue.class))).thenReturn(1);
TaskGroupQueue taskGroupQueue =
processService.insertIntoTaskGroupQueue(1, "task name", 1, 1, 1, TaskGroupQueueStatus.WAIT_QUEUE);
Assertions.assertNotNull(taskGroupQueue);
@ -765,8 +763,8 @@ public class ProcessServiceTest {
taskInstance.setProcessInstanceId(1);
taskInstance.setTaskGroupId(taskGroupQueue.getGroupId());
Mockito.when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue);
Mockito.when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1);
when(taskGroupQueueMapper.queryByTaskId(1)).thenReturn(taskGroupQueue);
when(taskGroupQueueMapper.updateById(taskGroupQueue)).thenReturn(1);
processService.releaseTaskGroup(taskInstance);

Loading…
Cancel
Save