Browse Source

Fix listing processDefinition the schedulerReleaseState will never be null (#11888)

3.1.0-release
Wenjun Ruan 2 years ago committed by caishunfeng
parent
commit
7ab4412b5e
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java
  2. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  3. 35
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  4. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java
  5. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
  6. 57
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  7. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java
  8. 57
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  9. 147
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  10. 357
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  11. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  12. 38
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/PageListingResult.java
  13. 40
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java
  14. 51
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java
  15. 27
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml
  16. 20
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/BaseController.java

@ -46,6 +46,7 @@ public class BaseController {
* @param pageSize page size
* @return check result code
*/
// todo: directly throw exception
public Result checkPageParams(int pageNo, int pageSize) {
Result result = new Result();
Status resultEnum = Status.SUCCESS;

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@ -213,7 +214,8 @@ public class ProcessDefinitionController extends BaseController {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "name", required = true) String name,
@RequestParam(value = "code", required = false, defaultValue = "0") long processDefinitionCode) {
Map<String, Object> result = processDefinitionService.verifyProcessDefinitionName(loginUser, projectCode, name, processDefinitionCode);
Map<String, Object> result = processDefinitionService.verifyProcessDefinitionName(loginUser, projectCode, name,
processDefinitionCode);
return returnDataList(result);
}
@ -503,21 +505,25 @@ public class ProcessDefinitionController extends BaseController {
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_PROCESS_DEFINITION_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryProcessDefinitionListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "otherParamsJson", required = false) String otherParamsJson,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
public Result<PageInfo<ProcessDefinition>> queryProcessDefinitionListPaging(
@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "otherParamsJson", required = false) String otherParamsJson,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result<PageInfo<ProcessDefinition>> result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
}
searchVal = ParameterUtils.handleEscapes(searchVal);
return processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, searchVal,
otherParamsJson, userId, pageNo, pageSize);
PageInfo<ProcessDefinition> pageInfo = processDefinitionService.queryProcessDefinitionListPaging(
loginUser, projectCode, searchVal, otherParamsJson, userId, pageNo, pageSize);
return Result.success(pageInfo);
}
/**

35
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -17,11 +17,7 @@
package org.apache.dolphinscheduler.api.service;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
@ -29,6 +25,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.springframework.web.multipart.MultipartFile;
/**
@ -97,13 +99,13 @@ public interface ProcessDefinitionService {
* @param userId user id
* @return process definition page
*/
Result queryProcessDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
String otherParamsJson,
Integer userId,
Integer pageNo,
Integer pageSize);
PageInfo<ProcessDefinition> queryProcessDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
String otherParamsJson,
Integer userId,
Integer pageNo,
Integer pageSize);
/**
* query detail of process definition
@ -271,7 +273,8 @@ public interface ProcessDefinitionService {
* @param processTaskRelationJson process task relation json
* @return check result code
*/
Map<String, Object> checkProcessNodeList(String processTaskRelationJson, List<TaskDefinitionLog> taskDefinitionLogs);
Map<String, Object> checkProcessNodeList(String processTaskRelationJson,
List<TaskDefinitionLog> taskDefinitionLogs);
/**
* get task node details based on process definition
@ -330,7 +333,7 @@ public interface ProcessDefinitionService {
* @param limit limit
* @return tree view json data
*/
Map<String, Object> viewTree(User loginUser,long projectCode, long code, Integer limit);
Map<String, Object> viewTree(User loginUser, long projectCode, long code, Integer limit);
/**
* switch the defined process definition version
@ -456,7 +459,8 @@ public interface ProcessDefinitionService {
* @param result
* @param otherParamsJson
*/
void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result, String otherParamsJson);
void saveOtherRelation(User loginUser, ProcessDefinition processDefinition, Map<String, Object> result,
String otherParamsJson);
/**
* get Json String
@ -466,4 +470,3 @@ public interface ProcessDefinitionService {
*/
String doOtherOperateProcess(User loginUser, ProcessDefinition processDefinition);
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java

@ -64,9 +64,11 @@ public interface ProjectService {
* @param perm String
* @return true if the login user have permission to see the project
*/
Map<String, Object> checkProjectAndAuth(User loginUser, Project project, long projectCode,String perm);
Map<String, Object> checkProjectAndAuth(User loginUser, Project project, long projectCode, String perm);
boolean hasProjectAndPerm(User loginUser, Project project, Map<String, Object> result,String perm);
void checkProjectAndAuthThrowException(User loginUser, Project project, String permission);
boolean hasProjectAndPerm(User loginUser, Project project, Map<String, Object> result, String perm);
/**
* has project and permission
@ -172,4 +174,4 @@ public interface ProjectService {
*/
void checkProjectAndAuth(Result result, User loginUser, Project project, long projectCode, String perm);
}
}

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java

@ -22,8 +22,10 @@ import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
/**
@ -83,7 +85,6 @@ public interface SchedulerService {
String workerGroup,
Long environmentCode);
/**
* set schedule online or offline
*
@ -110,12 +111,14 @@ public interface SchedulerService {
* @return schedule list page
*/
Result querySchedule(User loginUser, long projectCode, long processDefineCode, String searchVal,
Integer pageNo, Integer pageSize);
Integer pageNo, Integer pageSize);
List<Schedule> queryScheduleByProcessDefinitionCodes(List<Long> processDefinitionCodes);
/**
* query schedule list
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @return schedule list
*/

57
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -86,6 +86,8 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@ -120,6 +122,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
@ -148,6 +151,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYP
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -189,6 +194,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Autowired
private ProcessDefinitionDao processDefinitionDao;
@Lazy
@Autowired
private ProcessInstanceService processInstanceService;
@ -488,39 +496,44 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return process definition page
*/
@Override
public Result queryProcessDefinitionListPaging(User loginUser, long projectCode, String searchVal,
String otherParamsJson, Integer userId, Integer pageNo,
Integer pageSize) {
Result result = new Result();
public PageInfo<ProcessDefinition> queryProcessDefinitionListPaging(@NonNull User loginUser,
long projectCode,
String searchVal,
String otherParamsJson,
Integer userId,
Integer pageNo,
Integer pageSize) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> checkResult =
projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result, resultStatus);
return result;
}
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
PageListingResult<ProcessDefinition> processDefinitionsPageListingResult =
processDefinitionDao.listingProcessDefinition(
pageNo, pageSize, searchVal, userId, projectCode);
List<ProcessDefinition> processDefinitions = processDefinitionsPageListingResult.getRecords();
Page<ProcessDefinition> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(
page, searchVal, userId, project.getCode(), isAdmin(loginUser));
List<Long> processDefinitionCodes =
processDefinitions.stream().map(ProcessDefinition::getCode).collect(Collectors.toList());
Map<Long, Schedule> scheduleMap = schedulerService.queryScheduleByProcessDefinitionCodes(processDefinitionCodes)
.stream()
.collect(Collectors.toMap(Schedule::getProcessDefinitionCode, Function.identity()));
List<ProcessDefinition> records = processDefinitionIPage.getRecords();
for (ProcessDefinition pd : records) {
for (ProcessDefinition pd : processDefinitions) {
// todo: use batch query
ProcessDefinitionLog processDefinitionLog =
processDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion());
User user = userMapper.selectById(processDefinitionLog.getOperator());
pd.setModifyBy(user.getUserName());
Schedule schedule = scheduleMap.get(pd.getCode());
pd.setScheduleReleaseState(schedule == null ? null : schedule.getReleaseState());
}
processDefinitionIPage.setRecords(records);
PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) processDefinitionIPage.getTotal());
pageInfo.setTotalList(processDefinitionIPage.getRecords());
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
pageInfo.setTotal((int) processDefinitionsPageListingResult.getTotalCount());
pageInfo.setTotalList(processDefinitions);
return result;
return pageInfo;
}
/**

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java

@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT_UPDATE;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@ -53,6 +54,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -215,6 +220,18 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic
return result;
}
public void checkProjectAndAuthThrowException(@NonNull User loginUser, @Nullable Project project,
String permission) {
// todo: throw a permission exception
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_EXIST);
}
if (!canOperatorPermissions(loginUser, new Object[]{project.getId()}, AuthorizationType.PROJECTS, permission)) {
throw new ServiceException(Status.USER_NO_OPERATION_PROJECT_PERM, loginUser.getUserName(),
project.getCode());
}
}
@Override
public boolean hasProjectAndPerm(User loginUser, Project project, Map<String, Object> result, String permission) {
boolean checkResult = false;

57
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -52,11 +52,13 @@ import org.apache.dolphinscheduler.service.cron.CronUtils;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -64,6 +66,8 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -74,7 +78,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.cronutils.model.Cron;
/**
* scheduler service impl
*/
@ -110,7 +113,6 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private ProcessTaskRelationMapper processTaskRelationMapper;
/**
* save schedule
*
@ -152,7 +154,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check work flow define release state
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
result = executorService.checkProcessDefinitionValid(projectCode, processDefinition, processDefineCode,
processDefinition.getVersion());
processDefinition.getVersion());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -264,11 +266,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId,
failureStrategy, processInstancePriority, workerGroup, environmentCode);
return result;
}
/**
* set schedule online or offline
*
@ -303,16 +305,18 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
return result;
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
List<ProcessTaskRelation> processTaskRelations =
processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
if (processTaskRelations.isEmpty()) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
@ -321,7 +325,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}", processDefinition.getId(),
processDefinition.getName());
processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
@ -330,7 +334,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
if (!subProcessDefineCodes.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
processDefinitionMapper.queryByCodes(subProcessDefineCodes);
if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
@ -338,9 +342,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE,
String.valueOf(subProcessDefinition.getId()));
String.valueOf(subProcessDefinition.getId()));
return result;
}
}
@ -364,11 +368,13 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
try {
switch (scheduleStatus) {
case ONLINE:
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}",
project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
case OFFLINE:
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}",
project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
default:
@ -376,7 +382,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
result.put(Constants.MSG,
scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString(), e);
}
@ -416,7 +423,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
Page<Schedule> page = new Page<>(pageNo, pageSize);
IPage<Schedule> scheduleIPage =
scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, searchVal);
scheduleMapper.queryByProcessDefineCodePaging(page, processDefineCode, searchVal);
List<ScheduleVo> scheduleList = new ArrayList<>();
for (Schedule schedule : scheduleIPage.getRecords()) {
@ -431,6 +438,13 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
public List<Schedule> queryScheduleByProcessDefinitionCodes(@NonNull List<Long> processDefinitionCodes) {
if (CollectionUtils.isEmpty(processDefinitionCodes)) {
return Collections.emptyList();
}
return scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodes);
}
/**
* query schedule list
*
@ -573,9 +587,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
List<ZonedDateTime> selfFireDateList =
CronUtils.getSelfFireDateList(startTime, endTime, cron, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
CronUtils.getSelfFireDateList(startTime, endTime, cron, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
List<String> previewDateList =
selfFireDateList.stream().map(t -> DateUtils.dateToString(t, zoneId)).collect(Collectors.toList());
selfFireDateList.stream().map(t -> DateUtils.dateToString(t, zoneId)).collect(Collectors.toList());
result.put(Constants.DATA_LIST, previewDateList);
putMsg(result, Status.SUCCESS);
return result;
@ -607,7 +621,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
String workerGroup,
long environmentCode) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
// check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode, null);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
@ -625,7 +639,8 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId, failureStrategy, processInstancePriority, workerGroup, environmentCode);
updateSchedule(result, schedule, processDefinition, scheduleExpression, warningType, warningGroupId,
failureStrategy, processInstancePriority, workerGroup, environmentCode);
return result;
}
@ -634,7 +649,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
FailureStrategy failureStrategy, Priority processInstancePriority, String workerGroup,
long environmentCode) {
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE,
Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return;
}

147
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -17,14 +17,6 @@
package org.apache.dolphinscheduler.api.controller;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -35,8 +27,16 @@ import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -72,16 +72,18 @@ public class ProcessDefinitionControllerTest {
@Test
public void testCreateProcessDefinition() {
String relationJson = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
String relationJson =
"[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
String taskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
long projectCode = 1L;
String name = "dag_test";
String description = "desc test";
@ -93,11 +95,16 @@ public class ProcessDefinitionControllerTest {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, 1);
Mockito.when(processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson,"", ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Mockito.when(
processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, "",
ProcessExecutionTypeEnum.PARALLEL))
.thenReturn(result);
Result response = processDefinitionController.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson,"", ProcessExecutionTypeEnum.PARALLEL);
Result response =
processDefinitionController.createProcessDefinition(user, projectCode, name, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, "",
ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@ -126,7 +133,8 @@ public class ProcessDefinitionControllerTest {
long projectCode = 1L;
String name = "dag_test";
Mockito.when(processDefinitionService.verifyProcessDefinitionName(user, projectCode, name, 0)).thenReturn(result);
Mockito.when(processDefinitionService.verifyProcessDefinitionName(user, projectCode, name, 0))
.thenReturn(result);
Result response = processDefinitionController.verifyProcessDefinitionName(user, projectCode, name, 0);
Assert.assertTrue(response.isStatus(Status.PROCESS_DEFINITION_NAME_EXIST));
@ -134,16 +142,18 @@ public class ProcessDefinitionControllerTest {
@Test
public void updateProcessDefinition() {
String relationJson = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
String relationJson =
"[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1,"
+ "\"condition_type\":0,\"condition_params\":\"{}\"},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1,"
+ "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":\"{}\"}]";
String taskDefinitionJson =
"[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}";
long projectCode = 1L;
String name = "dag_test";
@ -156,11 +166,15 @@ public class ProcessDefinitionControllerTest {
putMsg(result, Status.SUCCESS);
result.put("processDefinitionId", 1);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, "", ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Mockito.when(processDefinitionService.updateProcessDefinition(user, projectCode, name, code, description,
globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, "",
ProcessExecutionTypeEnum.PARALLEL)).thenReturn(result);
Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code, description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, "", ProcessExecutionTypeEnum.PARALLEL, ReleaseState.OFFLINE);
Result response = processDefinitionController.updateProcessDefinition(user, projectCode, name, code,
description, globalParams,
locations, timeout, tenantCode, relationJson, taskDefinitionJson, "", ProcessExecutionTypeEnum.PARALLEL,
ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue());
}
@ -171,8 +185,10 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE)).thenReturn(result);
Result response = processDefinitionController.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE);
Mockito.when(processDefinitionService.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE))
.thenReturn(result);
Result response =
processDefinitionController.releaseProcessDefinition(user, projectCode, id, ReleaseState.OFFLINE);
Assert.assertTrue(response != null && response.isSuccess());
}
@ -210,7 +226,8 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.batchCopyProcessDefinition(user, projectCode, code, targetProjectCode)).thenReturn(result);
Mockito.when(processDefinitionService.batchCopyProcessDefinition(user, projectCode, code, targetProjectCode))
.thenReturn(result);
Result response = processDefinitionController.copyProcessDefinition(user, projectCode, code, targetProjectCode);
Assert.assertTrue(response != null && response.isSuccess());
@ -225,7 +242,8 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.batchMoveProcessDefinition(user, projectCode, id, targetProjectCode)).thenReturn(result);
Mockito.when(processDefinitionService.batchMoveProcessDefinition(user, projectCode, id, targetProjectCode))
.thenReturn(result);
Result response = processDefinitionController.moveProcessDefinition(user, projectCode, id, targetProjectCode);
Assert.assertTrue(response != null && response.isSuccess());
@ -285,7 +303,8 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.deleteProcessDefinitionByCode(user, projectCode, code)).thenReturn(result);
Mockito.when(processDefinitionService.deleteProcessDefinitionByCode(user, projectCode, code))
.thenReturn(result);
Result response = processDefinitionController.deleteProcessDefinitionByCode(user, projectCode, code);
Assert.assertTrue(response != null && response.isSuccess());
@ -299,7 +318,8 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCode(user, projectCode, code)).thenReturn(result);
Mockito.when(processDefinitionService.getTaskNodeListByDefinitionCode(user, projectCode, code))
.thenReturn(result);
Result response = processDefinitionController.getNodeListByDefinitionCode(user, projectCode, code);
Assert.assertTrue(response != null && response.isSuccess());
@ -313,7 +333,8 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.getNodeListMapByDefinitionCodes(user, projectCode, codeList)).thenReturn(result);
Mockito.when(processDefinitionService.getNodeListMapByDefinitionCodes(user, projectCode, codeList))
.thenReturn(result);
Result response = processDefinitionController.getNodeListMapByDefinitionCodes(user, projectCode, codeList);
Assert.assertTrue(response != null && response.isSuccess());
@ -325,7 +346,8 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.queryAllProcessDefinitionByProjectCode(user, projectCode)).thenReturn(result);
Mockito.when(processDefinitionService.queryAllProcessDefinitionByProjectCode(user, projectCode))
.thenReturn(result);
Result response = processDefinitionController.queryAllProcessDefinitionByProjectCode(user, projectCode);
Assert.assertTrue(response != null && response.isSuccess());
@ -340,7 +362,7 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.viewTree(user,projectCode, processId, limit)).thenReturn(result);
Mockito.when(processDefinitionService.viewTree(user, projectCode, processId, limit)).thenReturn(result);
Result response = processDefinitionController.viewTree(user, projectCode, processId, limit);
Assert.assertTrue(response != null && response.isSuccess());
@ -354,12 +376,12 @@ public class ProcessDefinitionControllerTest {
String searchVal = "";
int userId = 1;
Result result = new Result();
putMsg(result, Status.SUCCESS);
result.setData(new PageInfo<Resource>(1, 10));
PageInfo<ProcessDefinition> pageInfo = new PageInfo<>(1, 10);
Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, searchVal, "", userId, pageNo, pageSize)).thenReturn(result);
Result response = processDefinitionController.queryProcessDefinitionListPaging(user, projectCode, searchVal, "", userId, pageNo, pageSize);
Mockito.when(processDefinitionService.queryProcessDefinitionListPaging(user, projectCode, searchVal, "", userId,
pageNo, pageSize)).thenReturn(pageInfo);
Result<PageInfo<ProcessDefinition>> response = processDefinitionController
.queryProcessDefinitionListPaging(user, projectCode, searchVal, "", userId, pageNo, pageSize);
Assert.assertTrue(response != null && response.isSuccess());
}
@ -369,8 +391,10 @@ public class ProcessDefinitionControllerTest {
String processDefinitionIds = "1,2";
long projectCode = 1L;
HttpServletResponse response = new MockHttpServletResponse();
Mockito.doNothing().when(this.processDefinitionService).batchExportProcessDefinitionByCodes(user, projectCode, processDefinitionIds, response);
processDefinitionController.batchExportProcessDefinitionByCodes(user, projectCode, processDefinitionIds, response);
Mockito.doNothing().when(this.processDefinitionService).batchExportProcessDefinitionByCodes(user, projectCode,
processDefinitionIds, response);
processDefinitionController.batchExportProcessDefinitionByCodes(user, projectCode, processDefinitionIds,
response);
}
@Test
@ -381,18 +405,10 @@ public class ProcessDefinitionControllerTest {
putMsg(resultMap, Status.SUCCESS);
resultMap.setData(new PageInfo<ProcessDefinitionLog>(1, 10));
Mockito.when(processDefinitionService.queryProcessDefinitionVersions(
user
, projectCode
, 1
, 10
, 1))
user, projectCode, 1, 10, 1))
.thenReturn(resultMap);
Result result = processDefinitionController.queryProcessDefinitionVersions(
user
, projectCode
, 1
, 10
, 1);
user, projectCode, 1, 10, 1);
Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
}
@ -402,7 +418,8 @@ public class ProcessDefinitionControllerTest {
long projectCode = 1L;
Map<String, Object> resultMap = new HashMap<>();
putMsg(resultMap, Status.SUCCESS);
Mockito.when(processDefinitionService.switchProcessDefinitionVersion(user, projectCode, 1, 10)).thenReturn(resultMap);
Mockito.when(processDefinitionService.switchProcessDefinitionVersion(user, projectCode, 1, 10))
.thenReturn(resultMap);
Result result = processDefinitionController.switchProcessDefinitionVersion(user, projectCode, 1, 10);
Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());

357
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -25,14 +25,13 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_IMPORT;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_TREE_VIEW;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_UPDATE;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
@ -59,6 +58,8 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -92,8 +93,6 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.mock.web.MockMultipartFile;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@ -103,17 +102,19 @@ import com.google.common.collect.Lists;
@RunWith(MockitoJUnitRunner.class)
public class ProcessDefinitionServiceTest {
private static final String taskRelationJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
private static final String taskRelationJson =
"[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
private static final String taskDefinitionJson = "[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ "\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ "\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ "\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ "\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
private static final String taskDefinitionJson =
"[{\"code\":123456789,\"name\":\"test1\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ "\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ "\"timeoutNotifyStrategy\":null,\"timeout\":0,\"environmentCode\":-1},{\"code\":123451234,\"name\":\"test2\",\"version\":1,\"description\":\"\",\"delayTime\":0,\"taskType\":\"SHELL\","
+ "\"taskParams\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 2\",\"dependence\":{},\"conditionResult\":{\"successNode\":[],\"failedNode\":[]},\"waitStartTimeout\":{},"
+ "\"switchResult\":{}},\"flag\":\"YES\",\"taskPriority\":\"MEDIUM\",\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":1,\"timeoutFlag\":\"CLOSE\","
+ "\"timeoutNotifyStrategy\":\"WARN\",\"timeout\":0,\"environmentCode\":-1}]";
@InjectMocks
private ProcessDefinitionServiceImpl processDefinitionService;
@ -121,6 +122,9 @@ public class ProcessDefinitionServiceTest {
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private ProcessDefinitionDao processDefinitionDao;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@ -133,6 +137,9 @@ public class ProcessDefinitionServiceTest {
@Mock
private ScheduleMapper scheduleMapper;
@Mock
private SchedulerService schedulerService;
@Mock
private ProcessService processService;
@ -161,18 +168,21 @@ public class ProcessDefinitionServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
//project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
// project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionList(loginUser, projectCode);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
//project check auth success
// project check auth success
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
List<ProcessDefinition> resourceList = new ArrayList<>();
resourceList.add(getProcessDefinition());
Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
Map<String, Object> checkSuccessRes = processDefinitionService.queryProcessDefinitionList(loginUser, projectCode);
Map<String, Object> checkSuccessRes =
processDefinitionService.queryProcessDefinitionList(loginUser, projectCode);
Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS));
}
@ -180,38 +190,45 @@ public class ProcessDefinitionServiceTest {
@SuppressWarnings("unchecked")
public void testQueryProcessDefinitionListPaging() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
//project not found
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Result map = processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
Assert.assertEquals(Status.PROJECT_NOT_FOUND.getCode(), (int) map.getCode());
// project not found
try {
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(null);
Mockito.doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService)
.checkProjectAndAuthThrowException(loginUser, null, WORKFLOW_DEFINITION);
processDefinitionService.queryProcessDefinitionListPaging(loginUser, projectCode, "", "", 1, 5, 0);
} catch (ServiceException serviceException) {
Assert.assertEquals(Status.PROJECT_NOT_EXIST.getCode(), serviceException.getCode().intValue());
}
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
loginUser.setId(1);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Page<ProcessDefinition> page = new Page<>(1, 10);
page.setTotal(30);
Mockito.when(processDefineMapper.queryDefineListPaging(
Mockito.any(IPage.class)
, Mockito.eq("")
, Mockito.eq(loginUser.getId())
, Mockito.eq(project.getCode())
, Mockito.anyBoolean())).thenReturn(page);
Result map1 = processDefinitionService.queryProcessDefinitionListPaging(
loginUser, 1L, "", "",1, 10, loginUser.getId());
Assert.assertEquals(Status.SUCCESS.getMsg(), map1.getMsg());
Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project,
WORKFLOW_DEFINITION);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
PageListingResult<ProcessDefinition> pageListingResult = PageListingResult.<ProcessDefinition>builder()
.records(Collections.emptyList())
.currentPage(1)
.pageSize(10)
.totalCount(30)
.build();
Mockito.when(processDefinitionDao.listingProcessDefinition(
Mockito.eq(0),
Mockito.eq(10),
Mockito.eq(""),
Mockito.eq(1),
Mockito.eq(project.getCode()))).thenReturn(pageListingResult);
PageInfo<ProcessDefinition> pageInfo = processDefinitionService.queryProcessDefinitionListPaging(
loginUser, project.getCode(), "", "", 1, 0, 10);
Assert.assertNotNull(pageInfo);
}
@Test
@ -230,26 +247,31 @@ public class ProcessDefinitionServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
//project check auth fail
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
// project check auth fail
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionByCode(loginUser, 1L, 1L);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
//project check auth success, instance not exist
// project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
DagData dagData = new DagData(getProcessDefinition(), null, null);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(dagData);
Map<String, Object> instanceNotexitRes = processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 1L);
Map<String, Object> instanceNotexitRes =
processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 1L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
//instance exit
// instance exit
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Mockito.when(tenantMapper.queryById(1)).thenReturn(tenant);
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 46L);
Map<String, Object> successRes =
processDefinitionService.queryProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -267,24 +289,31 @@ public class ProcessDefinitionServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
//project check auth fail
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Map<String, Object> map = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
// project check auth fail
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Map<String, Object> map =
processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
//project check auth success, instance not exist
// project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
Map<String, Object> instanceNotExitRes = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
Map<String, Object> instanceNotExitRes =
processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
//instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test")).thenReturn(getProcessDefinition());
// instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test"))
.thenReturn(getProcessDefinition());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Map<String, Object> successRes = processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Map<String, Object> successRes =
processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test");
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -298,15 +327,18 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_BATCH_COPY)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_BATCH_COPY))
.thenReturn(result);
// copy project definition ids empty test
Map<String, Object> map = processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, StringUtils.EMPTY, 2L);
Map<String, Object> map =
processDefinitionService.batchCopyProcessDefinition(loginUser, projectCode, StringUtils.EMPTY, 2L);
Assert.assertEquals(Status.PROCESS_DEFINITION_CODES_IS_EMPTY, map.get(Constants.STATUS));
// project check auth fail
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_BATCH_COPY)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_BATCH_COPY))
.thenReturn(result);
Map<String, Object> map1 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, String.valueOf(project.getId()), 2L);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map1.get(Constants.STATUS));
@ -315,13 +347,15 @@ public class ProcessDefinitionServiceTest {
projectCode = 2L;
Project project1 = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_BATCH_COPY)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_BATCH_COPY))
.thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode);
ProcessDefinition definition = getProcessDefinition();
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Set<Long> definitionCodes =
Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
@ -346,17 +380,21 @@ public class ProcessDefinitionServiceTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, TASK_DEFINITION_MOVE)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project2, projectCode2, TASK_DEFINITION_MOVE)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, TASK_DEFINITION_MOVE))
.thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project2, projectCode2, TASK_DEFINITION_MOVE))
.thenReturn(result);
ProcessDefinition definition = getProcessDefinition();
definition.setVersion(1);
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Set<Long> definitionCodes =
Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode));
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L))
.thenReturn(getProcessTaskRelation(projectCode));
putMsg(result, Status.SUCCESS);
Map<String, Object> successRes = processDefinitionService.batchMoveProcessDefinition(
@ -374,77 +412,95 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
// project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
.thenReturn(result);
Map<String, Object> map = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 6L);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
//project check auth success, instance not exist
// project check auth success, instance not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
.thenReturn(result);
Mockito.when(processDefineMapper.queryByCode(1L)).thenReturn(null);
Map<String, Object> instanceNotExitRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 1L);
Map<String, Object> instanceNotExitRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 1L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
ProcessDefinition processDefinition = getProcessDefinition();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE)).thenReturn(result);
//user no auth
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
.thenReturn(result);
// user no auth
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> userNoAuthRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Map<String, Object> userNoAuthRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
//process definition online
// process definition online
loginUser.setUserType(UserType.ADMIN_USER);
putMsg(result, Status.SUCCESS, projectCode);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Throwable exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
String formatter = MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(), processDefinition.getName());
Throwable exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
String formatter =
MessageFormat.format(Status.PROCESS_DEFINE_STATE_ONLINE.getMsg(), processDefinition.getName());
Assertions.assertEquals(formatter, exception.getMessage());
//scheduler list elements > 1
// scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerGreaterThanOneRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
.thenReturn(1);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
Map<String, Object> schedulerGreaterThanOneRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, schedulerGreaterThanOneRes.get(Constants.STATUS));
//scheduler online
// scheduler online
Schedule schedule = getSchedule();
schedule.setReleaseState(ReleaseState.ONLINE);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedule);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(Collections.emptySet());
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
Map<String, Object> schedulerOnlineRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
//process used by other task, sub process
// process used by other task, sub process
loginUser.setUserType(UserType.ADMIN_USER);
putMsg(result, Status.SUCCESS, projectCode);
TaskMainInfo taskMainInfo = getTaskMainInfo().get(0);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
exception = Assertions.assertThrows(ServiceException.class, () -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
formatter = MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(), String.format("%s:%s", taskMainInfo.getProcessDefinitionName(), taskMainInfo.getTaskName()));
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(ImmutableSet.copyOf(getTaskMainInfo()));
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
formatter = MessageFormat.format(Status.DELETE_PROCESS_DEFINITION_USE_BY_OTHER_FAIL.getMsg(),
String.format("%s:%s", taskMainInfo.getProcessDefinitionName(), taskMainInfo.getTaskName()));
Assertions.assertEquals(formatter, exception.getMessage());
//delete success
// delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
.thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())).thenReturn(Collections.emptySet());
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> deleteSuccess = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Map<String, Object> deleteSuccess =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
}
@ -459,10 +515,10 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
// project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
Map<String, Object> map = processDefinitionService.releaseProcessDefinition(loginUser, projectCode,
6, ReleaseState.OFFLINE);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
@ -503,23 +559,25 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
// project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.PROJECT_NOT_FOUND, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_CREATE))
.thenReturn(result);
Map<String, Object> map = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf", 0);
Assert.assertEquals(Status.PROJECT_NOT_FOUND, map.get(Constants.STATUS));
//project check auth success, process not exist
// project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf", 0);
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
//process exist
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(getProcessDefinition());
// process exist
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf"))
.thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf", 0);
Assert.assertEquals(Status.PROCESS_DEFINITION_NAME_EXIST, processExistRes.get(Constants.STATUS));
@ -532,7 +590,8 @@ public class ProcessDefinitionServiceTest {
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Map<String, Object> taskEmptyRes = processDefinitionService.checkProcessNodeList(taskRelationJson, taskDefinitionLogs);
Map<String, Object> taskEmptyRes =
processDefinitionService.checkProcessNodeList(taskRelationJson, taskDefinitionLogs);
Assert.assertEquals(Status.PROCESS_DAG_IS_EMPTY, taskEmptyRes.get(Constants.STATUS));
}
@ -546,21 +605,23 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
// project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
//process definition not exist
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
// process definition not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
Map<String, Object> processDefinitionNullRes = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Map<String, Object> processDefinitionNullRes =
processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//success
// success
ProcessDefinition processDefinition = getProcessDefinition();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes = processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Map<String, Object> dataNotValidRes =
processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
}
@ -574,15 +635,17 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
//project check auth fail
// project check auth fail
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,null)).thenReturn(result);
//process definition not exist
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
// process definition not exist
String defineCodes = "46";
Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong)
.collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Map<String, Object> processNotExistRes =
processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
putMsg(result, Status.SUCCESS, projectCode);
@ -597,7 +660,8 @@ public class ProcessDefinitionServiceTest {
projects.add(project1);
Mockito.when(projectMapper.queryProjectCreatedAndAuthorizedByUserId(loginUser.getId())).thenReturn(projects);
Map<String, Object> successRes = processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Map<String, Object> successRes =
processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -611,12 +675,14 @@ public class ProcessDefinitionServiceTest {
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode,WORKFLOW_DEFINITION)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
ProcessDefinition processDefinition = getProcessDefinition();
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryAllDefinitionList(projectCode)).thenReturn(processDefinitionList);
Map<String, Object> successRes = processDefinitionService.queryAllProcessDefinitionByProjectCode(loginUser, projectCode);
Map<String, Object> successRes =
processDefinitionService.queryAllProcessDefinitionByProjectCode(loginUser, projectCode);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -626,28 +692,32 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(1);
loginUser.setTenantId(1);
loginUser.setUserType(UserType.ADMIN_USER);
long projectCode = 1;
long projectCode = 1;
Project project1 = getProject(projectCode);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, WORKFLOW_TREE_VIEW)).thenReturn(result);
//process definition not exist
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, WORKFLOW_TREE_VIEW))
.thenReturn(result);
// process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(loginUser,processDefinition.getProjectCode(),46, 10);
Map<String, Object> processDefinitionNullRes =
processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//task instance not existproject
// task instance not existproject
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(loginUser,processDefinition.getProjectCode(),46, 10);
Map<String, Object> taskNullRes =
processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(loginUser,processDefinition.getProjectCode(),46, 10);
// task instance exist
Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
@ -666,7 +736,8 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(loginUser,processDefinition.getProjectCode(), 46, 10);
Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
@ -682,10 +753,12 @@ public class ProcessDefinitionServiceTest {
long projectCode = 1L;
Project project = getProject(projectCode);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE)).thenReturn(result);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE))
.thenReturn(result);
Map<String, Object> updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
"", "", "", 0, "root", null,"",null, ProcessExecutionTypeEnum.PARALLEL);
Map<String, Object> updateResult =
processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
"", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
}
@ -732,14 +805,17 @@ public class ProcessDefinitionServiceTest {
outputStream.putNextEntry(new ZipEntry("import_sql/"));
outputStream.putNextEntry(new ZipEntry("import_sql/a.sql"));
outputStream.write("-- upstream: start_auto_dag\n-- datasource: mysql_1\nselect 1;".getBytes(StandardCharsets.UTF_8));
outputStream.write(
"-- upstream: start_auto_dag\n-- datasource: mysql_1\nselect 1;".getBytes(StandardCharsets.UTF_8));
outputStream.putNextEntry(new ZipEntry("import_sql/b.sql"));
outputStream.write("-- name: start_auto_dag\n-- datasource: mysql_1\nselect 1;".getBytes(StandardCharsets.UTF_8));
outputStream
.write("-- name: start_auto_dag\n-- datasource: mysql_1\nselect 1;".getBytes(StandardCharsets.UTF_8));
outputStream.close();
MockMultipartFile mockMultipartFile = new MockMultipartFile("import_sql.zip", byteArrayOutputStream.toByteArray());
MockMultipartFile mockMultipartFile =
new MockMultipartFile("import_sql.zip", byteArrayOutputStream.toByteArray());
DataSource dataSource = Mockito.mock(DataSource.class);
Mockito.when(dataSource.getId()).thenReturn(1);
@ -747,16 +823,21 @@ public class ProcessDefinitionServiceTest {
Mockito.when(dataSourceMapper.queryDataSourceByNameAndUserId(userId, "mysql_1")).thenReturn(dataSource);
long projectCode = 1001;
long projectCode = 1001;
Project project1 = getProject(projectCode);
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, Status.SUCCESS);
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, WORKFLOW_IMPORT)).thenReturn(result);
Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(2);
Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(1);
Mockito.when(processService.saveTaskRelation(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.anyLong(),
Mockito.eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean())).thenReturn(0);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, projectCode, WORKFLOW_IMPORT))
.thenReturn(result);
Mockito.when(processService.saveTaskDefine(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.notNull(),
Mockito.anyBoolean())).thenReturn(2);
Mockito.when(processService.saveProcessDefine(Mockito.same(loginUser), Mockito.notNull(), Mockito.notNull(),
Mockito.anyBoolean())).thenReturn(1);
Mockito.when(
processService.saveTaskRelation(Mockito.same(loginUser), Mockito.eq(projectCode), Mockito.anyLong(),
Mockito.eq(1), Mockito.notNull(), Mockito.notNull(), Mockito.anyBoolean()))
.thenReturn(0);
result = processDefinitionService.importSqlProcessDefinition(loginUser, projectCode, mockMultipartFile);
Assert.assertEquals(result.get(Constants.STATUS), Status.SUCCESS);

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -108,14 +108,12 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param searchVal searchVal
* @param userId userId
* @param projectCode projectCode
* @param isAdmin isAdmin
* @return process definition IPage
*/
IPage<ProcessDefinition> queryDefineListPaging(IPage<ProcessDefinition> page,
@Param("searchVal") String searchVal,
@Param("userId") int userId,
@Param("projectCode") long projectCode,
@Param("isAdmin") boolean isAdmin);
@Param("projectCode") long projectCode);
/**
* query all process definition list

38
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/model/PageListingResult.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.model;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
@AllArgsConstructor
public class PageListingResult<T> {
private List<T> records;
private long totalCount;
private int currentPage;
private int pageSize;
}

40
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessDefinitionDao.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import javax.annotation.Nullable;
public interface ProcessDefinitionDao {
/**
* Listing the process definition belongs to the given userId and projectCode.
* If the searchValue is not null, will used at processDefinitionName or processDefinitionDescription.
*/
// todo: Right now this method will use fuzzy query at searchVal, this will be very slow if there are exist a lot of
// processDefinition belongs to the target user/project.
PageListingResult<ProcessDefinition> listingProcessDefinition(
int pageNumber,
int pageSize,
@Nullable String searchVal,
int userId,
long projectCode);
}

51
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessDefinitionDaoImpl.java

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.repository.impl;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@Repository
public class ProcessDefinitionDaoImpl implements ProcessDefinitionDao {
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
@Override
public PageListingResult<ProcessDefinition> listingProcessDefinition(int pageNumber, int pageSize, String searchVal,
int userId, long projectCode) {
Page<ProcessDefinition> page = new Page<>(pageNumber, pageSize);
IPage<ProcessDefinition> processDefinitions =
processDefinitionMapper.queryDefineListPaging(page, searchVal, userId, projectCode);
return PageListingResult.<ProcessDefinition>builder()
.totalCount(processDefinitions.getTotal())
.currentPage(pageNumber)
.pageSize(pageSize)
.records(processDefinitions.getRecords())
.build();
}
}

27
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.xml

@ -68,23 +68,20 @@
and pd.name = #{processDefinitionName}
</select>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
SELECT td.id, td.code, td.name, td.version, td.release_state, td.project_code, td.user_id, td.description,
td.global_params, td.flag, td.warning_group_id, td.timeout, td.tenant_id, td.update_time, td.create_time,
sc.schedule_release_state, tu.user_name ,td.execution_type
FROM t_ds_process_definition td
left join (select process_definition_code,release_state as schedule_release_state from t_ds_schedules group by
process_definition_code,release_state) sc on sc.process_definition_code = td.code
left join t_ds_user tu on td.user_id = tu.id
where td.project_code = #{projectCode}
<if test=" searchVal != null and searchVal != ''">
AND (td.name like concat('%', #{searchVal}, '%')
OR td.description like concat('%', #{searchVal}, '%')
)
</if>
SELECT
<include refid="baseSql"/>
FROM t_ds_process_definition
where project_code = #{projectCode}
AND project_code = #{projectCode}
<if test=" userId != 0">
and td.user_id = #{userId}
AND user_id = #{userId}
</if>
<if test=" searchVal != null and searchVal != ''">
AND (
name like concat('%', #{searchVal}, '%') OR description like concat('%', #{searchVal}, '%')
)
</if>
order by td.update_time desc,td.id asc
order by update_time desc
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">

20
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java

@ -64,7 +64,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
* @return ProcessDefinition
*/
private ProcessDefinition insertOne(String name) {
//insertOne
// insertOne
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(atomicLong.getAndIncrement());
processDefinition.setName(name);
@ -81,9 +81,9 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
*/
@Test
public void testUpdate() {
//insertOne
// insertOne
ProcessDefinition processDefinition = insertOne("def 1");
//update
// update
processDefinition.setUpdateTime(new Date());
int update = processDefinitionMapper.updateById(processDefinition);
Assert.assertEquals(1, update);
@ -105,7 +105,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
@Test
public void testQuery() {
insertOne("def 1");
//query
// query
List<ProcessDefinition> dataSources = processDefinitionMapper.selectList(null);
Assert.assertNotEquals(dataSources.size(), 0);
}
@ -136,7 +136,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
user.setUserType(UserType.GENERAL_USER);
user.setTenantId(tenant.getId());
userMapper.insert(user);
//insertOne
// insertOne
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setName("def 1");
@ -180,7 +180,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
user.setTenantId(tenant.getId());
userMapper.insert(user);
//insertOne
// insertOne
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setName("def 1");
@ -243,7 +243,7 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
user.setTenantId(tenant.getId());
userMapper.insert(user);
//insertOne
// insertOne
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setCode(1L);
processDefinition.setName("def 1");
@ -264,7 +264,8 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
public void testQueryDefineListPaging() {
insertOne("def 1");
Page<ProcessDefinition> page = new Page(1, 3);
IPage<ProcessDefinition> processDefinitionIPage = processDefinitionMapper.queryDefineListPaging(page, "def", 101, 1010L, true);
IPage<ProcessDefinition> processDefinitionIPage =
processDefinitionMapper.queryDefineListPaging(page, "def", 101, 1010L);
Assert.assertNotEquals(processDefinitionIPage.getTotal(), 0);
}
@ -318,7 +319,8 @@ public class ProcessDefinitionMapperTest extends BaseDaoTest {
Long[] projectCodes = new Long[1];
projectCodes[0] = processDefinition.getProjectCode();
List<DefinitionGroupByUser> processDefinitions = processDefinitionMapper.countDefinitionByProjectCodes(projectCodes);
List<DefinitionGroupByUser> processDefinitions =
processDefinitionMapper.countDefinitionByProjectCodes(projectCodes);
Assert.assertNotEquals(processDefinitions.size(), 0);
}

Loading…
Cancel
Save