Browse Source

[Feature][dev-2.0.2-prepare] cherry-pick pull requests from dev to 2.0.2-prepare #7357 (#7358)

* [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238)

* to #7237

* rerun test

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 05aef27 and handle conflicts

* to #7065: fix ExecutorService and schedulerService (#7072)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081  (#7082)

* to #7081

* fix #7081

* to #7081

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 8ebe060 and handle conflicts

* cherry-pick 1f18444 and handle conflicts

* fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>

* add default constructor (#6780)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* to #7108 (#7109)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>
2.0.7-release
zwZjut 3 years ago committed by GitHub
parent
commit
255670fda9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
  2. 2
      docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml
  3. 2
      docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
  4. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java
  5. 58
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
  6. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  7. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  8. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  9. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  10. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  11. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
  12. 9
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  13. 173
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
  14. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  15. 79
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  16. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
  17. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  18. 11
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  19. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java
  20. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
  21. 25
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
  22. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  23. 98
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
  24. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  25. 13
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  26. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
  27. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  28. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
  29. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  30. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  31. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  32. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  33. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  34. 3
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  35. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java
  36. 2
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  37. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

2
docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml

@ -68,7 +68,7 @@ spec:
- name: TZ
value: {{ .Values.timezone }}
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }}
envFrom:
- configMapRef:

2
docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml

@ -65,7 +65,7 @@ spec:
- name: TZ
value: {{ .Values.timezone }}
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }}
envFrom:
- configMapRef:

2
docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml

@ -69,7 +69,7 @@ spec:
- name: ALERT_LISTEN_HOST
value: {{ include "dolphinscheduler.fullname" . }}-alert
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }}
envFrom:
- configMapRef:

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java

@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.api.aspect;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@ -46,6 +46,8 @@ import org.springframework.web.context.request.ServletRequestAttributes;
public class AccessLogAspect {
private static final Logger logger = LoggerFactory.getLogger(AccessLogAspect.class);
private static final String TRACE_ID = "traceId";
@Pointcut("@annotation(org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation)")
public void logPointCut(){
// Do nothing because of it's a pointcut
@ -60,21 +62,24 @@ public class AccessLogAspect {
Method method = sign.getMethod();
AccessLogAnnotation annotation = method.getAnnotation(AccessLogAnnotation.class);
String tranceId = UUID.randomUUID().toString();
String traceId = UUID.randomUUID().toString();
// log request
if (!annotation.ignoreRequest()) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
String traceIdFromHeader = request.getHeader(TRACE_ID);
if (!StringUtils.isEmpty(traceIdFromHeader)) {
traceId = traceIdFromHeader;
}
// handle login info
String userName = parseLoginInfo(request);
// handle args
String argsString = parseArgs(proceedingJoinPoint, annotation);
logger.info("REQUEST TRANCE_ID:{}, LOGIN_USER:{}, URI:{}, METHOD:{}, HANDLER:{}, ARGS:{}",
tranceId,
logger.info("REQUEST TRACE_ID:{}, LOGIN_USER:{}, URI:{}, METHOD:{}, HANDLER:{}, ARGS:{}",
traceId,
userName,
request.getRequestURI(),
request.getMethod(),
@ -88,7 +93,7 @@ public class AccessLogAspect {
// log response
if (!annotation.ignoreResponse()) {
logger.info("RESPONSE TRANCE_ID:{}, BODY:{}, REQUEST DURATION:{} milliseconds", tranceId, ob, (System.currentTimeMillis() - startTime));
logger.info("RESPONSE TRACE_ID:{}, BODY:{}, REQUEST DURATION:{} milliseconds", traceId, ob, (System.currentTimeMillis() - startTime));
}
return ob;

58
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java

@ -32,6 +32,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@ -43,6 +44,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
/**
@ -82,7 +84,6 @@ public class LoggerController extends BaseController {
return loggerService.queryLog(taskInstanceId, skipNum, limit);
}
/**
* download log file
*
@ -107,4 +108,59 @@ public class LoggerController extends BaseController {
.body(logBytes);
}
/**
* query task log in specified project
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId task instance id
* @param skipNum skip number
* @param limit limit
* @return task log content
*/
@ApiOperation(value = "queryLogInSpecifiedProject", notes = "QUERY_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/{projectCode}/detail")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return returnDataList(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit));
}
/**
* download log file
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId task instance id
* @return log file content
*/
@ApiOperation(value = "downloadTaskLogInSpecifiedProject", notes = "DOWNLOAD_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/{projectCode}/download-log")
@ResponseBody
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
byte[] logBytes = loggerService.getLogBytes(loginUser, projectCode, taskInstanceId);
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"")
.body(logBytes);
}
}

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -515,7 +515,7 @@ public class ProcessDefinitionController extends BaseController {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("code") long code,
@RequestParam("limit") Integer limit) {
Map<String, Object> result = processDefinitionService.viewTree(code, limit);
Map<String, Object> result = processDefinitionService.viewTree(projectCode, code, limit);
return returnDataList(result);
}

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -344,8 +344,9 @@ public class ProcessInstanceController extends BaseController {
@ApiException(QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR)
@AccessLogAnnotation
public Result viewVariables(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
Map<String, Object> result = processInstanceService.viewVariables(id);
Map<String, Object> result = processInstanceService.viewVariables(projectCode, id);
return returnDataList(result);
}
@ -368,7 +369,7 @@ public class ProcessInstanceController extends BaseController {
public Result viewTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) throws Exception {
Map<String, Object> result = processInstanceService.viewGantt(id);
Map<String, Object> result = processInstanceService.viewGantt(projectCode, id);
return returnDataList(result);
}

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -68,11 +68,12 @@ public interface ExecutorService {
/**
* check whether the process definition can be executed
*
* @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @return check result code
*/
Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode);
Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode);
/**
* do action to process instancepause, stop, repeat, recover from pause, recover from stop

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
/**
* logger service
@ -43,4 +46,25 @@ public interface LoggerService {
*/
byte[] getLogBytes(int taskInstId);
/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit);
/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
byte[] getLogBytes(User loginUser, long projectCode, int taskInstId);
}

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -282,11 +282,12 @@ public interface ProcessDefinitionService {
/**
* Encapsulates the TreeView structure
*
* @param projectCode project code
* @param code process definition code
* @param limit limit
* @return tree view json data
*/
Map<String, Object> viewTree(long code, Integer limit);
Map<String, Object> viewTree(long projectCode, long code, Integer limit);
/**
* switch the defined process definition version

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java

@ -21,7 +21,6 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
@ -165,19 +164,21 @@ public interface ProcessInstanceService {
/**
* view process instance variables
*
* @param projectCode project code
* @param processInstanceId process instance id
* @return variables data
*/
Map<String, Object> viewVariables(Integer processInstanceId);
Map<String, Object> viewVariables(long projectCode, Integer processInstanceId);
/**
* encapsulation gantt structure
*
* @param projectCode project code
* @param processInstanceId process instance id
* @return gantt tree data
* @throws Exception exception when json parse
*/
Map<String, Object> viewGantt(Integer processInstanceId) throws Exception;
Map<String, Object> viewGantt(long projectCode, Integer processInstanceId) throws Exception;
/**
* query process instance by processDefinitionCode and stateArray

9
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -124,7 +124,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
// check process define release state
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
result = checkProcessDefinitionValid(processDefinition, processDefinitionCode);
result = checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -179,14 +179,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
/**
* check whether the process definition can be executed
*
* @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @return check result code
*/
@Override
public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode) {
public Map<String, Object> checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode) {
Map<String, Object> result = new HashMap<>();
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
@ -230,7 +231,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) {
result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionCode());
result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}

173
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java

@ -17,32 +17,43 @@
package org.apache.dolphinscheduler.api.service.impl;
import com.google.common.primitives.Bytes;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import com.google.common.primitives.Bytes;
/**
* logger service impl
*/
@Service
public class LoggerServiceImpl implements LoggerService {
public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService {
private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class);
@ -53,6 +64,15 @@ public class LoggerServiceImpl implements LoggerService {
private LogClientService logClient;
@Autowired
ProjectMapper projectMapper;
@Autowired
ProjectService projectService;
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
@PostConstruct
public void init() {
if (Objects.isNull(this.logClient)) {
@ -84,10 +104,117 @@ public class LoggerServiceImpl implements LoggerService {
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance,skipLineNum,limit);
result.setData(log);
return result;
}
String host = getHost(taskInstance.getHost());
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
/**
* get log size
*
* @param taskInstId task instance id
* @return log byte array
*/
@Override
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
return getLogBytes(taskInstance);
}
/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
return result;
}
String log = queryLog(task, skipLineNum, limit);
result.put(Constants.DATA_LIST, log);
return result;
}
/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
@Override
public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
throw new ServiceException("user has no permission");
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
throw new ServiceException("task instance does not exist in project");
}
return getLogBytes(task);
}
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
/**
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {
String host = getHost(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
Constants.RPC_PORT);
@ -104,23 +231,16 @@ public class LoggerServiceImpl implements LoggerService {
log.append(logClient
.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit));
result.setData(log.toString());
return result;
return log.toString();
}
/**
* get log size
* get log bytes
*
* @param taskInstId task instance id
* @param taskInstance task instance
* @return log byte array
*/
@Override
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
private byte[] getLogBytes(TaskInstance taskInstance) {
String host = getHost(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
@ -129,17 +249,4 @@ public class LoggerServiceImpl implements LoggerService {
return Bytes.concat(head,
logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()));
}
/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
}

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -445,7 +445,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else {
Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId());
@ -538,7 +538,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@ -640,7 +640,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@ -718,7 +718,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@ -773,7 +773,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
Set<Long> defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet);
List<DagDataSchedule> dagDataSchedules = processDefinitionList.stream().map(this::exportProcessDagData).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processDefinitionList)) {
return;
}
// check processDefinition exist in project
List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream().filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList());
List<DagDataSchedule> dagDataSchedules = processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
downloadProcessDefinitionFile(response, dagDataSchedules);
}
@ -1055,7 +1060,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
@ -1091,8 +1096,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
// check processDefinition exist in project
List<ProcessDefinition> processDefinitionListInProject = processDefinitionList.stream().
filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
Map<Long, List<TaskDefinition>> taskNodeMap = new HashMap<>();
for (ProcessDefinition processDefinition : processDefinitionList) {
for (ProcessDefinition processDefinition : processDefinitionListInProject) {
DagData dagData = processService.genDagData(processDefinition);
taskNodeMap.put(processDefinition.getCode(), dagData.getTaskDefinitionList());
}
@ -1129,15 +1141,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* Encapsulates the TreeView structure
*
* @param projectCode project code
* @param code process definition code
* @param limit limit
* @return tree view json data
*/
@Override
public Map<String, Object> viewTree(long code, Integer limit) {
public Map<String, Object> viewTree(long projectCode, long code, Integer limit) {
Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (null == processDefinition) {
if (null == processDefinition || projectCode != processDefinition.getProjectCode()) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
@ -1414,7 +1427,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (Objects.isNull(processDefinition)) {
if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code);
return result;
}
@ -1478,7 +1491,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
PageInfo<ProcessDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<ProcessDefinitionLog> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code);
IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code, projectCode);
List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(processDefinitionLogs);
@ -1509,7 +1522,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else {
if (processDefinition.getVersion() == version) {
@ -1693,7 +1706,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
// check process definition exists
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}

79
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -17,6 +17,14 @@
package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import static org.apache.dolphinscheduler.common.Constants.WARNING_GROUP_NAME;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
@ -43,8 +51,25 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.*;
import org.apache.dolphinscheduler.dao.entity.AlertGroup;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections.CollectionUtils;
@ -72,8 +97,6 @@ import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* process instance service impl
*/
@ -125,6 +148,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
@Autowired
private TenantMapper tenantMapper;
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
/**
* return top n SUCCESS process instance order by running time which started between startTime and endTime
*/
@ -160,7 +186,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS);
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS, projectCode);
result.put(DATA_LIST, processInstances);
putMsg(result, Status.SUCCESS);
return result;
@ -187,7 +213,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
@ -295,12 +321,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId);
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
return result;
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (null != processDefinition && processDefinition.getWarningGroupId() != 0) {
//check if exist
@ -386,6 +416,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskInstance.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
if (!taskInstance.isSubProcess()) {
putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName());
return result;
@ -437,6 +474,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//check process instance exists in project
ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
@ -590,6 +633,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result;
}
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
try {
processService.removeTaskLogFile(processInstanceId);
} catch (Exception e) {
@ -614,11 +663,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* view process instance variables
*
* @param projectCode project code
* @param processInstanceId process instance id
* @return variables data
*/
@Override
public Map<String, Object> viewVariables(Integer processInstanceId) {
public Map<String, Object> viewVariables(long projectCode, Integer processInstanceId) {
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
@ -627,6 +677,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
throw new RuntimeException("workflow instance is null");
}
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
@ -686,12 +742,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
/**
* encapsulation gantt structure
*
* @param projectCode project code
* @param processInstanceId process instance id
* @return gantt tree data
* @throws Exception exception when json parse
*/
@Override
public Map<String, Object> viewGantt(Integer processInstanceId) throws Exception {
public Map<String, Object> viewGantt(long projectCode, Integer processInstanceId) throws Exception {
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId);
@ -704,6 +761,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
);
if (processDefinition != null && projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag = processService.genDagGraph(processDefinition);
//topological sort

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java

@ -146,7 +146,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
// check work flow define release state
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
result = executorService.checkProcessDefinitionValid(processDefinition, processDefineCode);
result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@ -247,7 +247,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode());
return result;
}
@ -296,7 +296,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode());
return result;
}
@ -396,7 +396,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
return result;
}
@ -606,7 +606,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -191,6 +191,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
if (taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
@ -342,7 +346,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
@ -377,7 +381,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
PageInfo<TaskDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<TaskDefinitionLog> page = new Page<>(pageNo, pageSize);
IPage<TaskDefinitionLog> taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode);
IPage<TaskDefinitionLog> taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, projectCode);
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(taskDefinitionLogs);
@ -424,7 +428,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
} else {
result.put(Constants.DATA_LIST, taskDefinition);
@ -520,7 +524,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code);
if (taskDefinition == null) {
if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
return result;
}

11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -29,9 +29,11 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -71,6 +73,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
@Autowired
UsersService usersService;
@Autowired
TaskDefinitionMapper taskDefinitionMapper;
/**
* query task list by project, process instance, task name, task start time, task end time, task status, keyword paging
*
@ -171,6 +176,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId);
return result;
}
// check whether the task instance state type is failure or cancel
if (!task.getState().typeIsFailure() && !task.getState().typeIsCancel()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java

@ -51,6 +51,10 @@ public class PageInfo<T> {
*/
private Integer pageNo;
public PageInfo() {
}
public PageInfo(Integer currentPage, Integer pageSize) {
if (currentPage == null) {
currentPage = 1;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java

@ -339,7 +339,7 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
Mockito.when(processDefinitionService.viewTree(processId, limit)).thenReturn(result);
Mockito.when(processDefinitionService.viewTree(projectCode, processId, limit)).thenReturn(result);
Result response = processDefinitionController.viewTree(user, projectCode, processId, limit);
Assert.assertTrue(response != null && response.isSuccess());

25
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java

@ -23,12 +23,19 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
@ -39,6 +46,9 @@ import org.springframework.util.MultiValueMap;
*/
public class ProcessInstanceControllerTest extends AbstractControllerTest {
@MockBean(name = "processInstanceService")
private ProcessInstanceService processInstanceService;
@Test
public void testQueryProcessInstanceList() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
@ -146,13 +156,14 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest {
@Test
public void testViewVariables() throws Exception {
MvcResult mvcResult = mockMvc.perform(get("/projects/{projectCode}/instance/view-variables", "cxc_1113")
.header(SESSION_ID, sessionId)
.param("processInstanceId", "1204"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Map<String, Object> mockResult = new HashMap<>();
mockResult.put(Constants.STATUS, Status.SUCCESS);
PowerMockito.when(processInstanceService.viewVariables(1113L,123)).thenReturn(mockResult);
MvcResult mvcResult = mockMvc.perform(get("/projects/{projectCode}/process-instances/{id}/view-variables", "1113", "123")
.header(SESSION_ID, sessionId))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertNotNull(result);
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -122,6 +122,7 @@ public class ExecutorServiceTest {
processDefinition.setUserId(userId);
processDefinition.setVersion(1);
processDefinition.setCode(1L);
processDefinition.setProjectCode(projectCode);
// processInstance
processInstance.setId(processInstanceId);

98
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java

@ -20,9 +20,20 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -51,6 +62,15 @@ public class LoggerServiceTest {
@Mock
private ProcessService processService;
@Mock
private ProjectMapper projectMapper;
@Mock
private ProjectService projectService;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Before
public void init() {
this.loggerService.init();
@ -113,9 +133,87 @@ public class LoggerServiceTest {
}
@Test
public void testQueryLogInSpecifiedProject() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance();
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
//SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
}
@Test
public void testGetLogBytesInSpecifiedProject() {
long projectCode = 1L;
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
Project project = getProject(projectCode);
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
TaskInstance taskInstance = new TaskInstance();
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
taskDefinition.setCode(1L);
//SUCCESS
taskInstance.setTaskCode(1L);
taskInstance.setId(1);
taskInstance.setHost("127.0.0.1:8080");
taskInstance.setLogPath("/temp/log");
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
loggerService.getLogBytes(loginUser, projectCode, 1);
}
@After
public void close() {
this.loggerService.close();
}
/**
* get mock Project
*
* @param projectCode projectCode
* @return Project
*/
private Project getProject(long projectCode) {
Project project = new Project();
project.setCode(projectCode);
project.setId(1);
project.setName("test");
project.setUserId(1);
return project;
}
private void putMsg(Map<String, Object> result, Status status, Object... statusParams) {
result.put(Constants.STATUS, status);
if (statusParams != null && statusParams.length > 0) {
result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams));
} else {
result.put(Constants.MSG, status.getMsg());
}
}
}

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -573,17 +573,17 @@ public class ProcessDefinitionServiceTest {
public void testViewTree() {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(46, 10);
Map<String, Object> processDefinitionNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
//task instance not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes = processDefinitionService.viewTree(46, 10);
Map<String, Object> taskNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}
@ -592,7 +592,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(46, 10);
Map<String, Object> taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS));
}

13
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -111,6 +113,8 @@ public class ProcessInstanceServiceTest {
@Mock
TenantMapper tenantMapper;
@Mock
TaskDefinitionMapper taskDefinitionMapper;
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
@ -255,6 +259,7 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = getProcessInstance();
putMsg(result, Status.SUCCESS, projectCode);
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setProjectCode(projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
@ -347,6 +352,9 @@ public class ProcessInstanceServiceTest {
taskInstance.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Map<String, Object> notSubprocessRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, notSubprocessRes.get(Constants.STATUS));
@ -410,6 +418,7 @@ public class ProcessInstanceServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setId(1);
processDefinition.setUserId(1);
processDefinition.setProjectCode(projectCode);
Tenant tenant = getTenant();
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
@ -497,7 +506,7 @@ public class ProcessInstanceServiceTest {
processInstance.setScheduleTime(new Date());
processInstance.setGlobalParams("");
when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
Map<String, Object> successRes = processInstanceService.viewVariables(1);
Map<String, Object> successRes = processInstanceService.viewVariables(1L,1);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@ -522,7 +531,7 @@ public class ProcessInstanceServiceTest {
when(processService.genDagGraph(Mockito.any(ProcessDefinition.class)))
.thenReturn(graph);
Map<String, Object> successRes = processInstanceService.viewGantt(1);
Map<String, Object> successRes = processInstanceService.viewGantt(0L, 1);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java

@ -103,6 +103,7 @@ public class SchedulerServiceTest {
Project project = getProject(projectName, projectCode);
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setProjectCode(projectCode);
Schedule schedule = new Schedule();
schedule.setId(1);

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -317,6 +317,7 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinition.getVersion())).thenReturn(taskDefinitionLog);
Map<String, Object> offlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS));

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java

@ -32,9 +32,11 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -81,6 +83,9 @@ public class TaskInstanceServiceTest {
@Mock
UsersService usersService;
@Mock
TaskDefinitionMapper taskDefinitionMapper;
@Test
public void queryTaskListPaging() {
long projectCode = 1L;
@ -249,6 +254,9 @@ public class TaskInstanceServiceTest {
// test task not found
when(projectService.checkProjectAndAuth(user, project, projectCode)).thenReturn(mockSuccess);
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setProjectCode(projectCode);
when(taskDefinitionMapper.queryByCode(task.getTaskCode())).thenReturn(taskDefinition);
Map<String, Object> taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS));

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -73,9 +73,10 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
*
* @param page pagination info
* @param code process definition code
* @param projectCode project code
* @return the paging process definition version list
*/
IPage<ProcessDefinitionLog> queryProcessDefinitionVersionsPaging(Page<ProcessDefinitionLog> page, @Param("code") long code);
IPage<ProcessDefinitionLog> queryProcessDefinitionVersionsPaging(Page<ProcessDefinitionLog> page, @Param("code") long code, @Param("projectCode") long projectCode);
/**
* delete the certain process definition version by process definition id and version number

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -213,14 +213,19 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
/**
* query top n process instance order by running duration
*
* @param size size
* @param startTime start time
* @param startTime end time
* @param status process instance status
* @param projectCode project code
* @return ProcessInstance list
*/
List<ProcessInstance> queryTopNProcessInstance(@Param("size") int size,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("status") ExecutionStatus status);
@Param("status") ExecutionStatus status,
@Param("projectCode") long projectCode);
/**
* query process instance by processDefinitionCode and stateArray

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -78,8 +78,9 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* query the paging task definition version list by pagination info
*
* @param page pagination info
* @param projectCode project code
* @param code process definition code
* @return the paging task definition version list
*/
IPage<TaskDefinitionLog> queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code") long code);
IPage<TaskDefinitionLog> queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code") long code, @Param("projectCode") long projectCode);
}

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -70,6 +70,9 @@
<include refid="baseSql"/>
from t_ds_process_definition_log
where code = #{code}
<if test="projectCode != 0">
and project_code = #{projectCode}
</if>
order by version desc
</select>

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -51,6 +51,9 @@
<include refid="baseSql"/>
from t_ds_process_instance
where state = #{status}
<if test="projectCode != 0">
and project_code = #{projectCode}
</if>
and start_time between
#{startTime} and #{endTime}
order by end_time-start_time desc

3
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -73,6 +73,9 @@
<include refid="baseSql"/>
from t_ds_task_definition_log
where code = #{code}
<if test="projectCode != 0">
and project_code = #{projectCode}
</if>
order by version desc
</select>
</mapper>

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java

@ -159,7 +159,7 @@ public class ProcessDefinitionLogMapperTest {
public void testQueryProcessDefinitionVersionsPaging() {
insertOne();
Page<ProcessDefinitionLog> page = new Page(1, 3);
IPage<ProcessDefinitionLog> processDefinitionLogs = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, 1L);
IPage<ProcessDefinitionLog> processDefinitionLogs = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, 1L,1L);
Assert.assertNotEquals(processDefinitionLogs.getTotal(), 0);
}

2
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -393,7 +393,7 @@ public class ProcessInstanceMapperTest {
ProcessInstance processInstance3 = insertOne(startTime3, endTime3);
Date start = new Date(2020, 1, 1, 1, 1, 1);
Date end = new Date(2021, 1, 1, 1, 1, 1);
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS);
List<ProcessInstance> processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS,0L);
Assert.assertEquals(2, processInstances.size());
Assert.assertTrue(isSortedByDuration(processInstances));
for (ProcessInstance processInstance : processInstances) {

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -241,7 +241,7 @@ public class WorkflowExecuteThread implements Runnable {
}
private void handleEvents() {
while (this.stateEvents.size() > 0) {
while (!this.stateEvents.isEmpty()) {
try {
StateEvent stateEvent = this.stateEvents.peek();

Loading…
Cancel
Save