Browse Source

[Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code (#4766)

* [Improvement-3369][api] Introduce resources, scheduler and taskinstance service interface for clear code
pull/3/MERGE
Shiwen Cheng 3 years ago committed by GitHub
parent
commit
9ae29a756f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1200
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java
  2. 509
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java
  3. 154
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java
  4. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
  5. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java
  6. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  7. 1313
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  8. 600
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  9. 208
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  10. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  11. 25
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  12. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java
  13. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  14. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
  15. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  16. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

1200
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ResourcesService.java

File diff suppressed because it is too large Load Diff

509
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/SchedulerService.java

@ -17,77 +17,18 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/** /**
* scheduler service * scheduler service
*/ */
@Service public interface SchedulerService {
public class SchedulerService extends BaseService {
private static final Logger logger = LoggerFactory.getLogger(SchedulerService.class);
@Autowired
private ProjectService projectService;
@Autowired
private ExecutorService executorService;
@Autowired
private MonitorService monitorService;
@Autowired
private ProcessService processService;
@Autowired
private ScheduleMapper scheduleMapper;
@Autowired
private ProjectMapper projectMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
/** /**
* save schedule * save schedule
@ -103,80 +44,14 @@ public class SchedulerService extends BaseService {
* @param workerGroup worker group * @param workerGroup worker group
* @return create result code * @return create result code
*/ */
@Transactional(rollbackFor = RuntimeException.class) Map<String, Object> insertSchedule(User loginUser, String projectName,
public Map<String, Object> insertSchedule(User loginUser, String projectName, Integer processDefineId,
Integer processDefineId, String schedule,
String schedule, WarningType warningType,
WarningType warningType, int warningGroupId,
int warningGroupId, FailureStrategy failureStrategy,
FailureStrategy failureStrategy, Priority processInstancePriority,
Priority processInstancePriority, String workerGroup);
String workerGroup) {
Map<String, Object> result = new HashMap();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check work flow define release state
ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Schedule scheduleObj = new Schedule();
Date now = new Date();
scheduleObj.setProjectName(projectName);
scheduleObj.setProcessDefinitionId(processDefinition.getId());
scheduleObj.setProcessDefinitionName(processDefinition.getName());
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
scheduleObj.setStartTime(scheduleParam.getStartTime());
scheduleObj.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
logger.error(scheduleParam.getCrontab() + " verify failure");
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
return result;
}
scheduleObj.setCrontab(scheduleParam.getCrontab());
scheduleObj.setWarningType(warningType);
scheduleObj.setWarningGroupId(warningGroupId);
scheduleObj.setFailureStrategy(failureStrategy);
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(processInstancePriority);
scheduleObj.setWorkerGroup(workerGroup);
scheduleMapper.insert(scheduleObj);
/**
* updateProcessInstance receivers and cc by process definition id
*/
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
// return scheduler object with ID
result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId()));
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
}
/** /**
* updateProcessInstance schedule * updateProcessInstance schedule
@ -193,95 +68,16 @@ public class SchedulerService extends BaseService {
* @param scheduleStatus schedule status * @param scheduleStatus schedule status
* @return update result code * @return update result code
*/ */
@Transactional(rollbackFor = RuntimeException.class) Map<String, Object> updateSchedule(User loginUser,
public Map<String, Object> updateSchedule(User loginUser, String projectName,
String projectName, Integer id,
Integer id, String scheduleExpression,
String scheduleExpression, WarningType warningType,
WarningType warningType, int warningGroupId,
int warningGroupId, FailureStrategy failureStrategy,
FailureStrategy failureStrategy, ReleaseState scheduleStatus,
ReleaseState scheduleStatus, Priority processInstancePriority,
Priority processInstancePriority, String workerGroup);
String workerGroup) {
Map<String, Object> result = new HashMap<String, Object>(5);
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check schedule exists
Schedule schedule = scheduleMapper.selectById(id);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
return result;
}
/**
* scheduling on-line status forbid modification
*/
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return result;
}
Date now = new Date();
// updateProcessInstance param
if (StringUtils.isNotEmpty(scheduleExpression)) {
ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
schedule.setStartTime(scheduleParam.getStartTime());
schedule.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
return result;
}
schedule.setCrontab(scheduleParam.getCrontab());
}
if (warningType != null) {
schedule.setWarningType(warningType);
}
schedule.setWarningGroupId(warningGroupId);
if (failureStrategy != null) {
schedule.setFailureStrategy(failureStrategy);
}
if (scheduleStatus != null) {
schedule.setReleaseState(scheduleStatus);
}
schedule.setWorkerGroup(workerGroup);
schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.updateById(schedule);
/**
* updateProcessInstance recipients and cc by process definition ID
*/
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
@ -293,110 +89,10 @@ public class SchedulerService extends BaseService {
* @param scheduleStatus schedule status * @param scheduleStatus schedule status
* @return publish result code * @return publish result code
*/ */
@Transactional(rollbackFor = RuntimeException.class) Map<String, Object> setScheduleState(User loginUser,
public Map<String, Object> setScheduleState(User loginUser, String projectName,
String projectName, Integer id,
Integer id, ReleaseState scheduleStatus);
ReleaseState scheduleStatus) {
Map<String, Object> result = new HashMap<String, Object>(5);
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
return result;
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
// check sub process definition release state
List<Integer> subProcessDefineIds = new ArrayList<>();
processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
if (subProcessDefineIds.size() > 0) {
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryDefinitionListByIdList(idArray);
if (subProcessDefinitionList != null && subProcessDefinitionList.size() > 0) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId());
return result;
}
}
}
}
}
// check master server exists
List<Server> masterServers = monitorService.getServerListFromZK(true);
if (masterServers.size() == 0) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
}
// set status
scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try {
switch (scheduleStatus) {
case ONLINE: {
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
}
case OFFLINE: {
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
}
default: {
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
}
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString());
}
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* query schedule * query schedule
@ -409,36 +105,7 @@ public class SchedulerService extends BaseService {
* @param searchVal search value * @param searchVal search value
* @return schedule list page * @return schedule list page
*/ */
public Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize) { Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize);
HashMap<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
return result;
}
Page<Schedule> page = new Page(pageNo, pageSize);
IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(
page, processDefineId, searchVal
);
PageInfo pageInfo = new PageInfo<Schedule>(pageNo, pageSize);
pageInfo.setTotalCount((int) scheduleIPage.getTotal());
pageInfo.setLists(scheduleIPage.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* query schedule list * query schedule list
@ -447,41 +114,7 @@ public class SchedulerService extends BaseService {
* @param projectName project name * @param projectName project name
* @return schedule list * @return schedule list
*/ */
public Map<String, Object> queryScheduleList(User loginUser, String projectName) { Map<String, Object> queryScheduleList(User loginUser, String projectName);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
List<Schedule> schedules = scheduleMapper.querySchedulerListByProjectName(projectName);
result.put(Constants.DATA_LIST, schedules);
putMsg(result, Status.SUCCESS);
return result;
}
public void setSchedule(int projectId, Schedule schedule) {
int scheduleId = schedule.getId();
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
Date startDate = schedule.getStartTime();
Date endDate = schedule.getEndTime();
String jobName = QuartzExecutors.buildJobName(scheduleId);
String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
Map<String, Object> dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule);
QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,
schedule.getCrontab(), dataMap);
}
/** /**
* delete schedule * delete schedule
@ -490,35 +123,7 @@ public class SchedulerService extends BaseService {
* @param scheduleId schedule id * @param scheduleId schedule id
* @throws RuntimeException runtime exception * @throws RuntimeException runtime exception
*/ */
public static void deleteSchedule(int projectId, int scheduleId) { void deleteSchedule(int projectId, int scheduleId);
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
String jobName = QuartzExecutors.buildJobName(scheduleId);
String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
if (!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)) {
logger.warn("set offline failure:projectId:{},scheduleId:{}", projectId, scheduleId);
throw new ServiceException("set offline failure");
}
}
/**
* check valid
*
* @param result result
* @param bool bool
* @param status status
* @return check result code
*/
private boolean checkValid(Map<String, Object> result, boolean bool, Status status) {
// timeout is valid
if (bool) {
putMsg(result, status);
return true;
}
return false;
}
/** /**
* delete schedule by id * delete schedule by id
@ -528,46 +133,7 @@ public class SchedulerService extends BaseService {
* @param scheduleId scheule id * @param scheduleId scheule id
* @return delete result code * @return delete result code
*/ */
public Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId) { Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
Schedule schedule = scheduleMapper.selectById(scheduleId);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
return result;
}
// Determine if the login user is the owner of the schedule
if (loginUser.getId() != schedule.getUserId()
&& loginUser.getUserType() != UserType.ADMIN_USER) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
// check schedule is already online
if (schedule.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
return result;
}
int delete = scheduleMapper.deleteById(scheduleId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
return result;
}
/** /**
* preview schedule * preview schedule
@ -577,24 +143,5 @@ public class SchedulerService extends BaseService {
* @param schedule schedule expression * @param schedule schedule expression
* @return the next five fire time * @return the next five fire time
*/ */
public Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule) { Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule);
Map<String, Object> result = new HashMap<>();
CronExpression cronExpression;
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
Date now = new Date();
Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime();
Date endTime = scheduleParam.getEndTime();
try {
cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return result;
}
List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
result.put(Constants.DATA_LIST, selfFireDateList.stream().map(t -> DateUtils.dateToString(t)));
putMsg(result, Status.SUCCESS);
return result;
}
} }

154
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskInstanceService.java

@ -17,57 +17,15 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/** /**
* task instance service * task instance service
*/ */
@Service public interface TaskInstanceService {
public class TaskInstanceService extends BaseService {
@Autowired
ProjectMapper projectMapper;
@Autowired
ProjectService projectService;
@Autowired
ProcessService processService;
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
ProcessInstanceService processInstanceService;
@Autowired
UsersService usersService;
/** /**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
@ -85,65 +43,10 @@ public class TaskInstanceService extends BaseService {
* @param pageSize page size * @param pageSize page size
* @return task list page * @return task list page
*/ */
public Map<String, Object> queryTaskListPaging(User loginUser, String projectName, Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate, Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
String endDate, String searchVal, ExecutionStatus stateType, String host, String endDate, String searchVal, ExecutionStatus stateType, String host,
Integer pageNo, Integer pageSize) { Integer pageNo, Integer pageSize);
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
int[] statusArray = null;
if (stateType != null) {
statusArray = new int[]{stateType.ordinal()};
}
Date start = null;
Date end = null;
if (StringUtils.isNotEmpty(startDate)) {
start = DateUtils.getScheduleDate(startDate);
if (start == null) {
return generateInvalidParamRes(result, "startDate");
}
}
if (StringUtils.isNotEmpty(endDate)) {
end = DateUtils.getScheduleDate(endDate);
if (end == null) {
return generateInvalidParamRes(result, "endDate");
}
}
Page<TaskInstance> page = new Page(pageNo, pageSize);
PageInfo pageInfo = new PageInfo<TaskInstance>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end
);
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("taskJson");
List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setDuration(DateUtils.format2Duration(taskInstance.getStartTime(), taskInstance.getEndTime()));
User executor = usersService.queryUser(taskInstance.getExecutorId());
if (null != executor) {
taskInstance.setExecutorName(executor.getUserName());
}
}
pageInfo.setTotalCount((int) taskInstanceIPage.getTotal());
pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet));
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/** /**
* change one task instance's state from failure to forced success * change one task instance's state from failure to forced success
@ -153,51 +56,6 @@ public class TaskInstanceService extends BaseService {
* @param taskInstanceId task instance id * @param taskInstanceId task instance id
* @return the result code and msg * @return the result code and msg
*/ */
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) { Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId);
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
// check user auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}
// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}
return result;
}
/***
* generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
* @param result exist result map
* @param params invalid params name
* @return update result map
*/
private Map<String, Object> generateInvalidParamRes(Map<String, Object> result, String params) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), params));
return result;
}
} }

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java

@ -61,7 +61,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return token list for page number and page size * @return token list for page number and page size
*/ */
public Map<String, Object> queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { public Map<String, Object> queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
PageInfo<AccessToken> pageInfo = new PageInfo<>(pageNo, pageSize); PageInfo<AccessToken> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<AccessToken> page = new Page<>(pageNo, pageSize); Page<AccessToken> page = new Page<>(pageNo, pageSize);
@ -87,7 +87,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return create result code * @return create result code
*/ */
public Map<String, Object> createToken(User loginUser, int userId, String expireTime, String token) { public Map<String, Object> createToken(User loginUser, int userId, String expireTime, String token) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
if (!hasPerm(loginUser,userId)){ if (!hasPerm(loginUser,userId)){
putMsg(result, Status.USER_NO_OPERATION_PERM); putMsg(result, Status.USER_NO_OPERATION_PERM);
@ -124,7 +124,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return token string * @return token string
*/ */
public Map<String, Object> generateToken(User loginUser, int userId, String expireTime) { public Map<String, Object> generateToken(User loginUser, int userId, String expireTime) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
if (!hasPerm(loginUser,userId)){ if (!hasPerm(loginUser,userId)){
putMsg(result, Status.USER_NO_OPERATION_PERM); putMsg(result, Status.USER_NO_OPERATION_PERM);
return result; return result;
@ -143,7 +143,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return delete result code * @return delete result code
*/ */
public Map<String, Object> delAccessTokenById(User loginUser, int id) { public Map<String, Object> delAccessTokenById(User loginUser, int id) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
AccessToken accessToken = accessTokenMapper.selectById(id); AccessToken accessToken = accessTokenMapper.selectById(id);
@ -174,7 +174,7 @@ public class AccessTokenServiceImpl extends BaseService implements AccessTokenSe
* @return update result code * @return update result code
*/ */
public Map<String, Object> updateToken(User loginUser, int id, int userId, String expireTime, String token) { public Map<String, Object> updateToken(User loginUser, int id, int userId, String expireTime, String token) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
if (!hasPerm(loginUser,userId)){ if (!hasPerm(loginUser,userId)){
putMsg(result, Status.USER_NO_OPERATION_PERM); putMsg(result, Status.USER_NO_OPERATION_PERM);
return result; return result;

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataAnalysisServiceImpl.java

@ -130,7 +130,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate
, TriFunction<Date, Date, Integer[], List<ExecuteStatusCount>> instanceStateCounter) { , TriFunction<Date, Date, Integer[], List<ExecuteStatusCount>> instanceStateCounter) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
boolean checkProject = checkProject(loginUser, projectId, result); boolean checkProject = checkProject(loginUser, projectId, result);
if (!checkProject) { if (!checkProject) {
return result; return result;
@ -193,7 +193,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
*/ */
public Map<String, Object> countCommandState(User loginUser, int projectId, String startDate, String endDate) { public Map<String, Object> countCommandState(User loginUser, int projectId, String startDate, String endDate) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
boolean checkProject = checkProject(loginUser, projectId, result); boolean checkProject = checkProject(loginUser, projectId, result);
if (!checkProject) { if (!checkProject) {
return result; return result;
@ -264,7 +264,7 @@ public class DataAnalysisServiceImpl extends BaseService implements DataAnalysis
* @return queue state count data * @return queue state count data
*/ */
public Map<String, Object> countQueueState(User loginUser, int projectId) { public Map<String, Object> countQueueState(User loginUser, int projectId) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
boolean checkProject = checkProject(loginUser, projectId, result); boolean checkProject = checkProject(loginUser, projectId, result);
if (!checkProject) { if (!checkProject) {

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -146,6 +146,9 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Autowired
private SchedulerService schedulerService;
/** /**
* create process definition * create process definition
* *
@ -273,7 +276,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Override @Override
public Map<String, Object> queryProcessDefinitionList(User loginUser, String projectName) { public Map<String, Object> queryProcessDefinitionList(User loginUser, String projectName) {
HashMap<String, Object> result = new HashMap<>(5); HashMap<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@ -399,7 +402,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
String desc, String desc,
String locations, String locations,
String connects) { String connects) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@ -514,7 +517,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) { public Map<String, Object> deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
@ -634,7 +637,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
// set status // set status
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule); scheduleMapper.updateById(schedule);
SchedulerService.deleteSchedule(project.getId(), schedule.getId()); schedulerService.deleteSchedule(project.getId(), schedule.getId());
} }
break; break;
default: default:
@ -823,7 +826,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Override @Override
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { public Map<String, Object> importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
String processMetaJson = FileUtils.file2String(file); String processMetaJson = FileUtils.file2String(file);
List<ProcessMeta> processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class); List<ProcessMeta> processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class);
@ -992,7 +995,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
} }
//recursive sub-process parameter correction map key for old process id value for new process id //recursive sub-process parameter correction map key for old process id value for new process id
Map<Integer, Integer> subProcessIdMap = new HashMap<>(20); Map<Integer, Integer> subProcessIdMap = new HashMap<>();
List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements()) List<Object> subProcessList = StreamUtils.asStream(jsonArray.elements())
.filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText()))
@ -1283,7 +1286,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
@Override @Override
public Map<String, Object> queryProcessDefinitionAllByProjectId(Integer projectId) { public Map<String, Object> queryProcessDefinitionAllByProjectId(Integer projectId) {
HashMap<String, Object> result = new HashMap<>(5); HashMap<String, Object> result = new HashMap<>();
List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(projectId); List<ProcessDefinition> resourceList = processDefineMapper.queryAllDefinitionList(projectId);
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);
@ -1494,7 +1497,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements
Integer processId, Integer processId,
Project targetProject) throws JsonProcessingException { Project targetProject) throws JsonProcessingException {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefineMapper.selectById(processId); ProcessDefinition processDefinition = processDefineMapper.selectById(processId);
if (processDefinition == null) { if (processDefinition == null) {

1313
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

File diff suppressed because it is too large Load Diff

600
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -0,0 +1,600 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.dto.ScheduleParam;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.SchedulerService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.ProcessScheduleJob;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* scheduler service impl
*/
@Service
public class SchedulerServiceImpl extends BaseService implements SchedulerService {
private static final Logger logger = LoggerFactory.getLogger(SchedulerServiceImpl.class);
@Autowired
private ProjectService projectService;
@Autowired
private ExecutorService executorService;
@Autowired
private MonitorService monitorService;
@Autowired
private ProcessService processService;
@Autowired
private ScheduleMapper scheduleMapper;
@Autowired
private ProjectMapper projectMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
/**
* save schedule
*
* @param loginUser login user
* @param projectName project name
* @param processDefineId process definition id
* @param schedule scheduler
* @param warningType warning type
* @param warningGroupId warning group id
* @param failureStrategy failure strategy
* @param processInstancePriority process instance priority
* @param workerGroup worker group
* @return create result code
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> insertSchedule(User loginUser, String projectName,
Integer processDefineId,
String schedule,
WarningType warningType,
int warningGroupId,
FailureStrategy failureStrategy,
Priority processInstancePriority,
String workerGroup) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check work flow define release state
ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
result = executorService.checkProcessDefinitionValid(processDefinition, processDefineId);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Schedule scheduleObj = new Schedule();
Date now = new Date();
scheduleObj.setProjectName(projectName);
scheduleObj.setProcessDefinitionId(processDefinition.getId());
scheduleObj.setProcessDefinitionName(processDefinition.getName());
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
scheduleObj.setStartTime(scheduleParam.getStartTime());
scheduleObj.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
logger.error("{} verify failure", scheduleParam.getCrontab());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleParam.getCrontab());
return result;
}
scheduleObj.setCrontab(scheduleParam.getCrontab());
scheduleObj.setWarningType(warningType);
scheduleObj.setWarningGroupId(warningGroupId);
scheduleObj.setFailureStrategy(failureStrategy);
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(processInstancePriority);
scheduleObj.setWorkerGroup(workerGroup);
scheduleMapper.insert(scheduleObj);
/**
* updateProcessInstance receivers and cc by process definition id
*/
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
// return scheduler object with ID
result.put(Constants.DATA_LIST, scheduleMapper.selectById(scheduleObj.getId()));
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
}
/**
* updateProcessInstance schedule
*
* @param loginUser login user
* @param projectName project name
* @param id scheduler id
* @param scheduleExpression scheduler
* @param warningType warning type
* @param warningGroupId warning group id
* @param failureStrategy failure strategy
* @param workerGroup worker group
* @param processInstancePriority process instance priority
* @param scheduleStatus schedule status
* @return update result code
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> updateSchedule(User loginUser,
String projectName,
Integer id,
String scheduleExpression,
WarningType warningType,
int warningGroupId,
FailureStrategy failureStrategy,
ReleaseState scheduleStatus,
Priority processInstancePriority,
String workerGroup) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check schedule exists
Schedule schedule = scheduleMapper.selectById(id);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(schedule.getProcessDefinitionId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionId());
return result;
}
/**
* scheduling on-line status forbid modification
*/
if (checkValid(result, schedule.getReleaseState() == ReleaseState.ONLINE, Status.SCHEDULE_CRON_ONLINE_FORBID_UPDATE)) {
return result;
}
Date now = new Date();
// updateProcessInstance param
if (StringUtils.isNotEmpty(scheduleExpression)) {
ScheduleParam scheduleParam = JSONUtils.parseObject(scheduleExpression, ScheduleParam.class);
if (DateUtils.differSec(scheduleParam.getStartTime(), scheduleParam.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
schedule.setStartTime(scheduleParam.getStartTime());
schedule.setEndTime(scheduleParam.getEndTime());
if (!org.quartz.CronExpression.isValidExpression(scheduleParam.getCrontab())) {
putMsg(result, Status.SCHEDULE_CRON_CHECK_FAILED, scheduleParam.getCrontab());
return result;
}
schedule.setCrontab(scheduleParam.getCrontab());
}
if (warningType != null) {
schedule.setWarningType(warningType);
}
schedule.setWarningGroupId(warningGroupId);
if (failureStrategy != null) {
schedule.setFailureStrategy(failureStrategy);
}
if (scheduleStatus != null) {
schedule.setReleaseState(scheduleStatus);
}
schedule.setWorkerGroup(workerGroup);
schedule.setUpdateTime(now);
schedule.setProcessInstancePriority(processInstancePriority);
scheduleMapper.updateById(schedule);
/**
* updateProcessInstance recipients and cc by process definition ID
*/
processDefinition.setWarningGroupId(warningGroupId);
processDefinitionMapper.updateById(processDefinition);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* set schedule online or offline
*
* @param loginUser login user
* @param projectName project name
* @param id scheduler id
* @param scheduleStatus schedule status
* @return publish result code
*/
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> setScheduleState(User loginUser,
String projectName,
Integer id,
ReleaseState scheduleStatus) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
// check schedule exists
Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id);
return result;
}
// check schedule release state
if (scheduleObj.getReleaseState() == scheduleStatus) {
logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus);
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(scheduleObj.getProcessDefinitionId());
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionId());
return result;
}
if (scheduleStatus == ReleaseState.ONLINE) {
// check process definition release state
if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
processDefinition.getId(), processDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName());
return result;
}
// check sub process definition release state
List<Integer> subProcessDefineIds = new ArrayList<>();
processService.recurseFindSubProcessId(scheduleObj.getProcessDefinitionId(), subProcessDefineIds);
Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]);
if (!subProcessDefineIds.isEmpty()) {
List<ProcessDefinition> subProcessDefinitionList =
processDefinitionMapper.queryDefinitionListByIdList(idArray);
if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
/**
* if there is no online process, exit directly
*/
if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
logger.info("not release process definition id: {} , name : {}",
subProcessDefinition.getId(), subProcessDefinition.getName());
putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, subProcessDefinition.getId());
return result;
}
}
}
}
}
// check master server exists
List<Server> masterServers = monitorService.getServerListFromZK(true);
if (masterServers.isEmpty()) {
putMsg(result, Status.MASTER_NOT_EXISTS);
return result;
}
// set status
scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try {
switch (scheduleStatus) {
case ONLINE:
logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
setSchedule(project.getId(), scheduleObj);
break;
case OFFLINE:
logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
deleteSchedule(project.getId(), id);
break;
default:
putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
return result;
}
} catch (Exception e) {
result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
throw new ServiceException(result.get(Constants.MSG).toString());
}
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query schedule
*
* @param loginUser login user
* @param projectName project name
* @param processDefineId process definition id
* @param pageNo page number
* @param pageSize page size
* @param searchVal search value
* @return schedule list page
*/
public Map<String, Object> querySchedule(User loginUser, String projectName, Integer processDefineId, String searchVal, Integer pageNo, Integer pageSize) {
HashMap<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
ProcessDefinition processDefinition = processService.findProcessDefineById(processDefineId);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineId);
return result;
}
Page<Schedule> page = new Page<>(pageNo, pageSize);
IPage<Schedule> scheduleIPage = scheduleMapper.queryByProcessDefineIdPaging(
page, processDefineId, searchVal
);
PageInfo<Schedule> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotalCount((int) scheduleIPage.getTotal());
pageInfo.setLists(scheduleIPage.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* query schedule list
*
* @param loginUser login user
* @param projectName project name
* @return schedule list
*/
public Map<String, Object> queryScheduleList(User loginUser, String projectName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check project auth
boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
if (!hasProjectAndPerm) {
return result;
}
List<Schedule> schedules = scheduleMapper.querySchedulerListByProjectName(projectName);
result.put(Constants.DATA_LIST, schedules);
putMsg(result, Status.SUCCESS);
return result;
}
public void setSchedule(int projectId, Schedule schedule) {
int scheduleId = schedule.getId();
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);
Date startDate = schedule.getStartTime();
Date endDate = schedule.getEndTime();
String jobName = QuartzExecutors.buildJobName(scheduleId);
String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
Map<String, Object> dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule);
QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,
schedule.getCrontab(), dataMap);
}
/**
* delete schedule
*
* @param projectId project id
* @param scheduleId schedule id
* @throws RuntimeException runtime exception
*/
public void deleteSchedule(int projectId, int scheduleId) {
logger.info("delete schedules of project id:{}, schedule id:{}", projectId, scheduleId);
String jobName = QuartzExecutors.buildJobName(scheduleId);
String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);
if (!QuartzExecutors.getInstance().deleteJob(jobName, jobGroupName)) {
logger.warn("set offline failure:projectId:{},scheduleId:{}", projectId, scheduleId);
throw new ServiceException("set offline failure");
}
}
/**
* check valid
*
* @param result result
* @param bool bool
* @param status status
* @return check result code
*/
private boolean checkValid(Map<String, Object> result, boolean bool, Status status) {
// timeout is valid
if (bool) {
putMsg(result, status);
return true;
}
return false;
}
/**
* delete schedule by id
*
* @param loginUser login user
* @param projectName project name
* @param scheduleId scheule id
* @return delete result code
*/
public Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
Schedule schedule = scheduleMapper.selectById(scheduleId);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
return result;
}
// Determine if the login user is the owner of the schedule
if (loginUser.getId() != schedule.getUserId()
&& loginUser.getUserType() != UserType.ADMIN_USER) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
// check schedule is already online
if (schedule.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
return result;
}
int delete = scheduleMapper.deleteById(scheduleId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
return result;
}
/**
* preview schedule
*
* @param loginUser login user
* @param projectName project name
* @param schedule schedule expression
* @return the next five fire time
*/
public Map<String, Object> previewSchedule(User loginUser, String projectName, String schedule) {
Map<String, Object> result = new HashMap<>();
CronExpression cronExpression;
ScheduleParam scheduleParam = JSONUtils.parseObject(schedule, ScheduleParam.class);
Date now = new Date();
Date startTime = now.after(scheduleParam.getStartTime()) ? now : scheduleParam.getStartTime();
Date endTime = scheduleParam.getEndTime();
try {
cronExpression = CronUtils.parse2CronExpression(scheduleParam.getCrontab());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
putMsg(result, Status.PARSE_TO_CRON_EXPRESSION_ERROR);
return result;
}
List<Date> selfFireDateList = CronUtils.getSelfFireDateList(startTime, endTime, cronExpression, Constants.PREVIEW_SCHEDULE_EXECUTE_COUNT);
result.put(Constants.DATA_LIST, selfFireDateList.stream().map(DateUtils::dateToString));
putMsg(result, Status.SUCCESS);
return result;
}
}

208
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.BaseService;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task instance service impl
*/
@Service
public class TaskInstanceServiceImpl extends BaseService implements TaskInstanceService {
@Autowired
ProjectMapper projectMapper;
@Autowired
ProjectService projectService;
@Autowired
ProcessService processService;
@Autowired
TaskInstanceMapper taskInstanceMapper;
@Autowired
ProcessInstanceService processInstanceService;
@Autowired
UsersService usersService;
/**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
*
* @param loginUser login user
* @param projectName project name
* @param processInstanceId process instance id
* @param searchVal search value
* @param taskName task name
* @param stateType state type
* @param host host
* @param startDate start time
* @param endDate end time
* @param pageNo page number
* @param pageSize page size
* @return task list page
*/
public Map<String, Object> queryTaskListPaging(User loginUser, String projectName,
Integer processInstanceId, String processInstanceName, String taskName, String executorName, String startDate,
String endDate, String searchVal, ExecutionStatus stateType, String host,
Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
int[] statusArray = null;
if (stateType != null) {
statusArray = new int[]{stateType.ordinal()};
}
Date start = null;
Date end = null;
if (StringUtils.isNotEmpty(startDate)) {
start = DateUtils.getScheduleDate(startDate);
if (start == null) {
return generateInvalidParamRes(result, "startDate");
}
}
if (StringUtils.isNotEmpty(endDate)) {
end = DateUtils.getScheduleDate(endDate);
if (end == null) {
return generateInvalidParamRes(result, "endDate");
}
}
Page<TaskInstance> page = new Page<>(pageNo, pageSize);
PageInfo<Map<String, Object>> pageInfo = new PageInfo<>(pageNo, pageSize);
int executorId = usersService.getUserIdByName(executorName);
IPage<TaskInstance> taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging(
page, project.getId(), processInstanceId, processInstanceName, searchVal, taskName, executorId, statusArray, host, start, end
);
Set<String> exclusionSet = new HashSet<>();
exclusionSet.add(Constants.CLASS);
exclusionSet.add("taskJson");
List<TaskInstance> taskInstanceList = taskInstanceIPage.getRecords();
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setDuration(DateUtils.format2Duration(taskInstance.getStartTime(), taskInstance.getEndTime()));
User executor = usersService.queryUser(taskInstance.getExecutorId());
if (null != executor) {
taskInstance.setExecutorName(executor.getUserName());
}
}
pageInfo.setTotalCount((int) taskInstanceIPage.getTotal());
pageInfo.setLists(CollectionUtils.getListByExclusion(taskInstanceIPage.getRecords(), exclusionSet));
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* change one task instance's state from failure to forced success
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
// check user auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}
// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}
// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}
return result;
}
/***
* generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
* @param result exist result map
* @param params invalid params name
* @return update result map
*/
private Map<String, Object> generateInvalidParamRes(Map<String, Object> result, String params) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, MessageFormat.format(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getMsg(), params));
return result;
}
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -87,7 +87,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
int queueId, int queueId,
String desc) throws Exception { String desc) throws Exception {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
if (isNotAdmin(loginUser, result)) { if (isNotAdmin(loginUser, result)) {
return result; return result;
@ -140,7 +140,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/ */
public Map<String, Object> queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { public Map<String, Object> queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) { if (isNotAdmin(loginUser, result)) {
return result; return result;
} }
@ -171,7 +171,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
public Map<String, Object> updateTenant(User loginUser, int id, String tenantCode, int queueId, public Map<String, Object> updateTenant(User loginUser, int id, String tenantCode, int queueId,
String desc) throws Exception { String desc) throws Exception {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
if (isNotAdmin(loginUser, result)) { if (isNotAdmin(loginUser, result)) {
@ -233,7 +233,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public Map<String, Object> deleteTenantById(User loginUser, int id) throws Exception { public Map<String, Object> deleteTenantById(User loginUser, int id) throws Exception {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) { if (isNotAdmin(loginUser, result)) {
return result; return result;
@ -291,7 +291,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/ */
public Map<String, Object> queryTenantList(String tenantCode) { public Map<String, Object> queryTenantList(String tenantCode) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
List<Tenant> resourceList = tenantMapper.queryByTenantCode(tenantCode); List<Tenant> resourceList = tenantMapper.queryByTenantCode(tenantCode);
if (CollectionUtils.isNotEmpty(resourceList)) { if (CollectionUtils.isNotEmpty(resourceList)) {
@ -311,7 +311,7 @@ public class TenantServiceImpl extends BaseService implements TenantService {
*/ */
public Map<String, Object> queryTenantList(User loginUser) { public Map<String, Object> queryTenantList(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
List<Tenant> resourceList = tenantMapper.selectList(null); List<Tenant> resourceList = tenantMapper.selectList(null);
result.put(Constants.DATA_LIST, resourceList); result.put(Constants.DATA_LIST, resourceList);

25
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -132,8 +132,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
String phone, String phone,
String queue, String queue,
int state) throws IOException { int state) throws IOException {
Map<String, Object> result = new HashMap<>();
Map<String, Object> result = new HashMap<>(5);
//check all user params //check all user params
String msg = this.checkUserParams(userName, userPassword, email, phone); String msg = this.checkUserParams(userName, userPassword, email, phone);
@ -295,7 +294,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return user list page * @return user list page
*/ */
public Map<String, Object> queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { public Map<String, Object> queryUserList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result; return result;
@ -337,7 +336,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
String phone, String phone,
String queue, String queue,
int state) throws IOException { int state) throws IOException {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) { if (check(result, !hasPerm(loginUser, userId), Status.USER_NO_OPERATION_PERM)) {
@ -461,7 +460,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @throws Exception exception when operate hdfs * @throws Exception exception when operate hdfs
*/ */
public Map<String, Object> deleteUserById(User loginUser, int id) throws IOException { public Map<String, Object> deleteUserById(User loginUser, int id) throws IOException {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (!isAdmin(loginUser)) { if (!isAdmin(loginUser)) {
putMsg(result, Status.USER_NO_OPERATION_PERM, id); putMsg(result, Status.USER_NO_OPERATION_PERM, id);
@ -501,7 +500,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) { public Map<String, Object> grantProject(User loginUser, int userId, String projectIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
//only admin can operate //only admin can operate
@ -550,7 +549,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) { public Map<String, Object> grantResources(User loginUser, int userId, String resourceIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result; return result;
@ -645,7 +644,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) { public Map<String, Object> grantUDFFunction(User loginUser, int userId, String udfIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
@ -691,7 +690,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) { public Map<String, Object> grantDataSource(User loginUser, int userId, String datasourceIds) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false); result.put(Constants.STATUS, false);
//only admin can operate //only admin can operate
@ -771,7 +770,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return user list * @return user list
*/ */
public Map<String, Object> queryAllGeneralUsers(User loginUser) { public Map<String, Object> queryAllGeneralUsers(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result; return result;
@ -791,7 +790,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return user list * @return user list
*/ */
public Map<String, Object> queryUserList(User loginUser) { public Map<String, Object> queryUserList(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result; return result;
@ -832,7 +831,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
*/ */
public Map<String, Object> unauthorizedUser(User loginUser, Integer alertgroupId) { public Map<String, Object> unauthorizedUser(User loginUser, Integer alertgroupId) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result; return result;
@ -867,7 +866,7 @@ public class UsersServiceImpl extends BaseService implements UsersService {
* @return authorized result code * @return authorized result code
*/ */
public Map<String, Object> authorizedUser(User loginUser, Integer alertgroupId) { public Map<String, Object> authorizedUser(User loginUser, Integer alertgroupId) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
//only admin can operate //only admin can operate
if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) { if (check(result, !isAdmin(loginUser), Status.USER_NO_OPERATION_PERM)) {
return result; return result;

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java

@ -44,4 +44,13 @@ public class RegexUtils {
Matcher isNum = pattern.matcher(str); Matcher isNum = pattern.matcher(str);
return isNum.matches(); return isNum.matches();
} }
public static String escapeNRT(String str) {
// Logging should not be vulnerable to injection attacks: Replace pattern-breaking characters
if (str != null && !str.isEmpty()) {
return str.replaceAll("[\n|\r|\t]", "_");
}
return null;
}
} }

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
@ -68,7 +69,7 @@ public class ResourcesServiceTest {
private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class); private static final Logger logger = LoggerFactory.getLogger(ResourcesServiceTest.class);
@InjectMocks @InjectMocks
private ResourcesService resourcesService; private ResourcesServiceImpl resourcesService;
@Mock @Mock
private ResourceMapper resourcesMapper; private ResourceMapper resourcesMapper;
@Mock @Mock

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.SchedulerServiceImpl;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
@ -53,7 +54,7 @@ public class SchedulerServiceTest {
@InjectMocks @InjectMocks
private SchedulerService schedulerService; private SchedulerServiceImpl schedulerService;
@Mock @Mock
private MonitorService monitorService; private MonitorService monitorService;

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.ApiApplicationServer; import org.apache.dolphinscheduler.api.ApiApplicationServer;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskInstanceServiceImpl;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
@ -62,7 +63,7 @@ public class TaskInstanceServiceTest {
private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceTest.class); private static final Logger logger = LoggerFactory.getLogger(TaskInstanceServiceTest.class);
@InjectMocks @InjectMocks
private TaskInstanceService taskInstanceService; private TaskInstanceServiceImpl taskInstanceService;
@Mock @Mock
ProjectMapper projectMapper; ProjectMapper projectMapper;

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -50,12 +50,8 @@ public class StringUtils {
return !isBlank(s); return !isBlank(s);
} }
public static String replaceNRTtoUnderline(String src) { public static String replaceNRTtoUnderline(String str) {
if (isBlank(src)) { return isBlank(str) ? str : str.replaceAll("[\n|\r|\t]", "_");
return src;
} else {
return src.replaceAll("[\n|\r|\t]", "_");
}
} }
public static String trim(String str) { public static String trim(String str) {

Loading…
Cancel
Save