diff --git a/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml b/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml index 65b5dea810..b2838dff47 100644 --- a/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml +++ b/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml @@ -68,7 +68,7 @@ spec: - name: TZ value: {{ .Values.timezone }} {{- include "dolphinscheduler.database.env_vars" . | nindent 12 }} - {{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }} + {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }} {{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }} envFrom: - configMapRef: diff --git a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml index 735d3cd08c..686d260958 100644 --- a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml +++ b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml @@ -65,7 +65,7 @@ spec: - name: TZ value: {{ .Values.timezone }} {{- include "dolphinscheduler.database.env_vars" . | nindent 12 }} - {{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }} + {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }} {{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }} envFrom: - configMapRef: diff --git a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml index 60b67538be..392ae0773b 100644 --- a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml +++ b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml @@ -69,7 +69,7 @@ spec: - name: ALERT_LISTEN_HOST value: {{ include "dolphinscheduler.fullname" . }}-alert {{- include "dolphinscheduler.database.env_vars" . | nindent 12 }} - {{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }} + {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }} {{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }} envFrom: - configMapRef: diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java index a34d041242..14d75c7f74 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java @@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.api.aspect; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.lang.reflect.Method; import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -46,6 +46,8 @@ import org.springframework.web.context.request.ServletRequestAttributes; public class AccessLogAspect { private static final Logger logger = LoggerFactory.getLogger(AccessLogAspect.class); + private static final String TRACE_ID = "traceId"; + @Pointcut("@annotation(org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation)") public void logPointCut(){ // Do nothing because of it's a pointcut @@ -60,21 +62,24 @@ public class AccessLogAspect { Method method = sign.getMethod(); AccessLogAnnotation annotation = method.getAnnotation(AccessLogAnnotation.class); - String tranceId = UUID.randomUUID().toString(); + String traceId = UUID.randomUUID().toString(); // log request if (!annotation.ignoreRequest()) { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if (attributes != null) { HttpServletRequest request = attributes.getRequest(); - + String traceIdFromHeader = request.getHeader(TRACE_ID); + if (!StringUtils.isEmpty(traceIdFromHeader)) { + traceId = traceIdFromHeader; + } // handle login info String userName = parseLoginInfo(request); // handle args String argsString = parseArgs(proceedingJoinPoint, annotation); - logger.info("REQUEST TRANCE_ID:{}, LOGIN_USER:{}, URI:{}, METHOD:{}, HANDLER:{}, ARGS:{}", - tranceId, + logger.info("REQUEST TRACE_ID:{}, LOGIN_USER:{}, URI:{}, METHOD:{}, HANDLER:{}, ARGS:{}", + traceId, userName, request.getRequestURI(), request.getMethod(), @@ -88,7 +93,7 @@ public class AccessLogAspect { // log response if (!annotation.ignoreResponse()) { - logger.info("RESPONSE TRANCE_ID:{}, BODY:{}, REQUEST DURATION:{} milliseconds", tranceId, ob, (System.currentTimeMillis() - startTime)); + logger.info("RESPONSE TRACE_ID:{}, BODY:{}, REQUEST DURATION:{} milliseconds", traceId, ob, (System.currentTimeMillis() - startTime)); } return ob; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java index 3800264f6b..88c715fab6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java @@ -32,6 +32,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -43,6 +44,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; /** @@ -82,7 +84,6 @@ public class LoggerController extends BaseController { return loggerService.queryLog(taskInstanceId, skipNum, limit); } - /** * download log file * @@ -107,4 +108,59 @@ public class LoggerController extends BaseController { .body(logBytes); } + /** + * query task log in specified project + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstanceId task instance id + * @param skipNum skip number + * @param limit limit + * @return task log content + */ + @ApiOperation(value = "queryLogInSpecifiedProject", notes = "QUERY_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "limit", value = "LIMIT", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/{projectCode}/detail") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "taskInstanceId") int taskInstanceId, + @RequestParam(value = "skipLineNum") int skipNum, + @RequestParam(value = "limit") int limit) { + return returnDataList(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit)); + } + + /** + * download log file + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstanceId task instance id + * @return log file content + */ + @ApiOperation(value = "downloadTaskLogInSpecifiedProject", notes = "DOWNLOAD_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"), + @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100") + }) + @GetMapping(value = "/{projectCode}/download-log") + @ResponseBody + @ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @RequestParam(value = "taskInstanceId") int taskInstanceId) { + byte[] logBytes = loggerService.getLogBytes(loginUser, projectCode, taskInstanceId); + return ResponseEntity + .ok() + .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"") + .body(logBytes); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 3134a48688..1d7b485340 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -515,7 +515,7 @@ public class ProcessDefinitionController extends BaseController { @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @PathVariable("code") long code, @RequestParam("limit") Integer limit) { - Map result = processDefinitionService.viewTree(code, limit); + Map result = processDefinitionService.viewTree(projectCode, code, limit); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java index d7ec4056dd..b69320c227 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java @@ -344,8 +344,9 @@ public class ProcessInstanceController extends BaseController { @ApiException(QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR) @AccessLogAnnotation public Result viewVariables(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @PathVariable("id") Integer id) { - Map result = processInstanceService.viewVariables(id); + Map result = processInstanceService.viewVariables(projectCode, id); return returnDataList(result); } @@ -368,7 +369,7 @@ public class ProcessInstanceController extends BaseController { public Result viewTree(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @PathVariable("id") Integer id) throws Exception { - Map result = processInstanceService.viewGantt(id); + Map result = processInstanceService.viewGantt(projectCode, id); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 72d1892371..2fa065bca2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -68,11 +68,12 @@ public interface ExecutorService { /** * check whether the process definition can be executed * + * @param projectCode project code * @param processDefinition process definition * @param processDefineCode process definition code * @return check result code */ - Map checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode); + Map checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode); /** * do action to process instanceļ¼špause, stop, repeat, recover from pause, recover from stop diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index ef30a40303..b252522c7c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -18,6 +18,9 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.dao.entity.User; + +import java.util.Map; /** * logger service @@ -43,4 +46,25 @@ public interface LoggerService { */ byte[] getLogBytes(int taskInstId); + /** + * query log + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + Map queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit); + + /** + * get log bytes + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @return log byte array + */ + byte[] getLogBytes(User loginUser, long projectCode, int taskInstId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index c29e65d8c9..755d697026 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -282,11 +282,12 @@ public interface ProcessDefinitionService { /** * Encapsulates the TreeView structure * + * @param projectCode project code * @param code process definition code * @param limit limit * @return tree view json data */ - Map viewTree(long code, Integer limit); + Map viewTree(long projectCode, long code, Integer limit); /** * switch the defined process definition version diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 3dbf46d8b8..073e1f3818 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -21,7 +21,6 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; @@ -165,19 +164,21 @@ public interface ProcessInstanceService { /** * view process instance variables * + * @param projectCode project code * @param processInstanceId process instance id * @return variables data */ - Map viewVariables(Integer processInstanceId); + Map viewVariables(long projectCode, Integer processInstanceId); /** * encapsulation gantt structure * + * @param projectCode project code * @param processInstanceId process instance id * @return gantt tree data * @throws Exception exception when json parse */ - Map viewGantt(Integer processInstanceId) throws Exception; + Map viewGantt(long projectCode, Integer processInstanceId) throws Exception; /** * query process instance by processDefinitionCode and stateArray diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 6a4f179096..18eba02dc7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -124,7 +124,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ // check process define release state ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); - result = checkProcessDefinitionValid(processDefinition, processDefinitionCode); + result = checkProcessDefinitionValid(projectCode, processDefinition, processDefinitionCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -179,14 +179,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ /** * check whether the process definition can be executed * + * @param projectCode project code * @param processDefinition process definition * @param processDefineCode process definition code * @return check result code */ @Override - public Map checkProcessDefinitionValid(ProcessDefinition processDefinition, long processDefineCode) { + public Map checkProcessDefinitionValid(long projectCode, ProcessDefinition processDefinition, long processDefineCode) { Map result = new HashMap<>(); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { // check process definition exists putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode); } else if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { @@ -230,7 +231,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); if (executeType != ExecuteType.STOP && executeType != ExecuteType.PAUSE) { - result = checkProcessDefinitionValid(processDefinition, processInstance.getProcessDefinitionCode()); + result = checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode()); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index c9965fec02..88c5a94133 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -17,32 +17,43 @@ package org.apache.dolphinscheduler.api.service.impl; -import com.google.common.primitives.Bytes; -import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.LoggerService; +import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.apache.commons.lang.StringUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.nio.charset.StandardCharsets; -import java.util.Objects; +import com.google.common.primitives.Bytes; /** * logger service impl */ @Service -public class LoggerServiceImpl implements LoggerService { +public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService { private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class); @@ -53,6 +64,15 @@ public class LoggerServiceImpl implements LoggerService { private LogClientService logClient; + @Autowired + ProjectMapper projectMapper; + + @Autowired + ProjectService projectService; + + @Autowired + TaskDefinitionMapper taskDefinitionMapper; + @PostConstruct public void init() { if (Objects.isNull(this.logClient)) { @@ -84,10 +104,117 @@ public class LoggerServiceImpl implements LoggerService { if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { return Result.error(Status.TASK_INSTANCE_NOT_FOUND); } + Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); + String log = queryLog(taskInstance,skipLineNum,limit); + result.setData(log); + return result; + } - String host = getHost(taskInstance.getHost()); - Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); + /** + * get log size + * + * @param taskInstId task instance id + * @return log byte array + */ + @Override + public byte[] getLogBytes(int taskInstId) { + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); + if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { + throw new ServiceException("task instance is null or host is null"); + } + return getLogBytes(taskInstance); + } + + /** + * query log + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + @Override + @SuppressWarnings("unchecked") + public Map queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) { + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + // check whether the task instance can be found + TaskInstance task = processService.findTaskInstanceById(taskInstId); + if (task == null || StringUtils.isBlank(task.getHost())) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND); + return result; + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); + if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId); + return result; + } + String log = queryLog(task, skipLineNum, limit); + result.put(Constants.DATA_LIST, log); + return result; + } + + /** + * get log bytes + * + * @param loginUser login user + * @param projectCode project code + * @param taskInstId task instance id + * @return log byte array + */ + @Override + public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) { + Project project = projectMapper.queryByCode(projectCode); + //check user access for project + Map result = projectService.checkProjectAndAuth(loginUser, project, projectCode); + if (result.get(Constants.STATUS) != Status.SUCCESS) { + throw new ServiceException("user has no permission"); + } + // check whether the task instance can be found + TaskInstance task = processService.findTaskInstanceById(taskInstId); + if (task == null || StringUtils.isBlank(task.getHost())) { + throw new ServiceException("task instance is null or host is null"); + } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); + if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { + throw new ServiceException("task instance does not exist in project"); + } + return getLogBytes(task); + } + + /** + * get host + * + * @param address address + * @return old version return true ,otherwise return false + */ + private String getHost(String address) { + if (Boolean.TRUE.equals(Host.isOldVersion(address))) { + return address; + } + return Host.of(address).getIp(); + } + + /** + * query log + * + * @param taskInstance task instance + * @param skipLineNum skip line number + * @param limit limit + * @return log string data + */ + private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { + + String host = getHost(taskInstance.getHost()); logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(), Constants.RPC_PORT); @@ -104,23 +231,16 @@ public class LoggerServiceImpl implements LoggerService { log.append(logClient .rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit)); - result.setData(log.toString()); - return result; + return log.toString(); } - /** - * get log size + * get log bytes * - * @param taskInstId task instance id + * @param taskInstance task instance * @return log byte array */ - @Override - public byte[] getLogBytes(int taskInstId) { - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId); - if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { - throw new ServiceException("task instance is null or host is null"); - } + private byte[] getLogBytes(TaskInstance taskInstance) { String host = getHost(taskInstance.getHost()); byte[] head = String.format(LOG_HEAD_FORMAT, taskInstance.getLogPath(), @@ -129,17 +249,4 @@ public class LoggerServiceImpl implements LoggerService { return Bytes.concat(head, logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath())); } - - /** - * get host - * - * @param address address - * @return old version return true ,otherwise return false - */ - private String getHost(String address) { - if (Boolean.TRUE.equals(Host.isOldVersion(address))) { - return address; - } - return Host.of(address).getIp(); - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index b5e8453c40..9dff036c69 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -445,7 +445,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); } else { Tenant tenant = tenantMapper.queryById(processDefinition.getTenantId()); @@ -538,7 +538,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); // check process definition exists - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } @@ -640,7 +640,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } @@ -718,7 +718,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } @@ -773,7 +773,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } Set defineCodeSet = Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet()); List processDefinitionList = processDefinitionMapper.queryByCodes(defineCodeSet); - List dagDataSchedules = processDefinitionList.stream().map(this::exportProcessDagData).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(processDefinitionList)) { + return; + } + // check processDefinition exist in project + List processDefinitionListInProject = processDefinitionList.stream().filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList()); + List dagDataSchedules = processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(dagDataSchedules)) { downloadProcessDefinitionFile(response, dagDataSchedules); } @@ -1055,7 +1060,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { logger.info("process define not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; @@ -1091,8 +1096,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); return result; } + // check processDefinition exist in project + List processDefinitionListInProject = processDefinitionList.stream(). + filter(o -> projectCode == o.getProjectCode()).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(processDefinitionListInProject)) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes); + return result; + } Map> taskNodeMap = new HashMap<>(); - for (ProcessDefinition processDefinition : processDefinitionList) { + for (ProcessDefinition processDefinition : processDefinitionListInProject) { DagData dagData = processService.genDagData(processDefinition); taskNodeMap.put(processDefinition.getCode(), dagData.getTaskDefinitionList()); } @@ -1129,15 +1141,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * Encapsulates the TreeView structure * + * @param projectCode project code * @param code process definition code * @param limit limit * @return tree view json data */ @Override - public Map viewTree(long code, Integer limit) { + public Map viewTree(long projectCode, long code, Integer limit) { Map result = new HashMap<>(); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (null == processDefinition) { + if (null == processDefinition || projectCode != processDefinition.getProjectCode()) { logger.info("process define not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; @@ -1414,7 +1427,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (Objects.isNull(processDefinition)) { + if (Objects.isNull(processDefinition) || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR, code); return result; } @@ -1478,7 +1491,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); Page page = new Page<>(pageNo, pageSize); - IPage processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code); + IPage processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code, projectCode); List processDefinitionLogs = processDefinitionVersionsPaging.getRecords(); pageInfo.setTotalList(processDefinitionLogs); @@ -1509,7 +1522,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); } else { if (processDefinition.getVersion() == version) { @@ -1693,7 +1706,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); // check process definition exists - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 4c97374fa2..2b98e14578 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -17,6 +17,14 @@ package org.apache.dolphinscheduler.api.service.impl; +import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; +import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; +import static org.apache.dolphinscheduler.common.Constants.WARNING_GROUP_NAME; + import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.enums.Status; @@ -43,8 +51,25 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.entity.AlertGroup; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections.CollectionUtils; @@ -72,8 +97,6 @@ import org.springframework.transaction.annotation.Transactional; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import static org.apache.dolphinscheduler.common.Constants.*; - /** * process instance service impl */ @@ -125,6 +148,9 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce @Autowired private TenantMapper tenantMapper; + @Autowired + TaskDefinitionMapper taskDefinitionMapper; + /** * return top n SUCCESS process instance order by running time which started between startTime and endTime */ @@ -160,7 +186,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } - List processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS); + List processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS, projectCode); result.put(DATA_LIST, processInstances); putMsg(result, Status.SUCCESS); return result; @@ -187,7 +213,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce ProcessDefinition processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); } else { processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); @@ -295,12 +321,16 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); + ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); + return result; + } List taskInstanceList = processService.findValidTaskListByProcessId(processId); addDependResultForTaskList(taskInstanceList); Map resultMap = new HashMap<>(); resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); - ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); if (null != processDefinition && processDefinition.getWarningGroupId() != 0) { //check if exist @@ -386,6 +416,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); return result; } + + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskInstance.getTaskCode()); + if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { + putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); + return result; + } + if (!taskInstance.isSubProcess()) { putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName()); return result; @@ -437,6 +474,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); return result; } + //check process instance exists in project + ProcessDefinition processDefinition0 = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + if (processDefinition0 != null && projectCode != processDefinition0.getProjectCode()) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } //check process instance status if (!processInstance.getState().typeIsFinished()) { putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, @@ -590,6 +633,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } + ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + try { processService.removeTaskLogFile(processInstanceId); } catch (Exception e) { @@ -614,11 +663,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * view process instance variables * + * @param projectCode project code * @param processInstanceId process instance id * @return variables data */ @Override - public Map viewVariables(Integer processInstanceId) { + public Map viewVariables(long projectCode, Integer processInstanceId) { Map result = new HashMap<>(); ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); @@ -627,6 +677,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce throw new RuntimeException("workflow instance is null"); } + ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); + if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + Map timeParams = BusinessTimeUtils .getBusinessTime(processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); @@ -686,12 +742,13 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce /** * encapsulation gantt structure * + * @param projectCode project code * @param processInstanceId process instance id * @return gantt tree data * @throws Exception exception when json parse */ @Override - public Map viewGantt(Integer processInstanceId) throws Exception { + public Map viewGantt(long projectCode, Integer processInstanceId) throws Exception { Map result = new HashMap<>(); ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); @@ -704,6 +761,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion() ); + if (processDefinition != null && projectCode != processDefinition.getProjectCode()) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } GanttDto ganttDto = new GanttDto(); DAG dag = processService.genDagGraph(processDefinition); //topological sort diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index 4954fb905b..c9af32d52f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -146,7 +146,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe // check work flow define release state ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode); - result = executorService.checkProcessDefinitionValid(processDefinition, processDefineCode); + result = executorService.checkProcessDefinitionValid(projectCode,processDefinition, processDefineCode); if (result.get(Constants.STATUS) != Status.SUCCESS) { return result; } @@ -247,7 +247,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, schedule.getProcessDefinitionCode()); return result; } @@ -296,7 +296,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode()); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, scheduleObj.getProcessDefinitionCode()); return result; } @@ -396,7 +396,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefineCode); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode); return result; } @@ -606,7 +606,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode); - if (processDefinition == null) { + if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 238b1be5f5..97ecb6ce39 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -191,6 +191,10 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { + putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); + return result; + } if (taskDefinition.getFlag() == Flag.YES) { putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode); return result; @@ -342,7 +346,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - if (taskDefinition == null) { + if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); return result; } @@ -377,7 +381,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); Page page = new Page<>(pageNo, pageSize); - IPage taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode); + IPage taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode, projectCode); List taskDefinitionLogs = taskDefinitionVersionsPaging.getRecords(); pageInfo.setTotalList(taskDefinitionLogs); @@ -424,7 +428,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); - if (taskDefinition == null) { + if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode); } else { result.put(Constants.DATA_LIST, taskDefinition); @@ -520,7 +524,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code); - if (taskDefinition == null) { + if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index 9f4b8cb8e5..64ba304d44 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -29,9 +29,11 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -71,6 +73,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst @Autowired UsersService usersService; + @Autowired + TaskDefinitionMapper taskDefinitionMapper; + /** * query task list by project, process instance, task name, task start time, task end time, task status, keyword paging * @@ -171,6 +176,12 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst return result; } + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); + if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { + putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId); + return result; + } + // check whether the task instance state type is failure or cancel if (!task.getState().typeIsFailure() && !task.getState().typeIsCancel()) { putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java index 95405a4f06..5b443dc3ed 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java @@ -51,6 +51,10 @@ public class PageInfo { */ private Integer pageNo; + public PageInfo() { + + } + public PageInfo(Integer currentPage, Integer pageSize) { if (currentPage == null) { currentPage = 1; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 4737c2f8f8..d141a56d65 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -339,7 +339,7 @@ public class ProcessDefinitionControllerTest { Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); - Mockito.when(processDefinitionService.viewTree(processId, limit)).thenReturn(result); + Mockito.when(processDefinitionService.viewTree(projectCode, processId, limit)).thenReturn(result); Result response = processDefinitionController.viewTree(user, projectCode, processId, limit); Assert.assertTrue(response != null && response.isSuccess()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java index 7d6fabc127..27665ae4b2 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java @@ -23,12 +23,19 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import java.util.HashMap; +import java.util.Map; + import org.junit.Assert; import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.http.MediaType; import org.springframework.test.web.servlet.MvcResult; import org.springframework.util.LinkedMultiValueMap; @@ -39,6 +46,9 @@ import org.springframework.util.MultiValueMap; */ public class ProcessInstanceControllerTest extends AbstractControllerTest { + @MockBean(name = "processInstanceService") + private ProcessInstanceService processInstanceService; + @Test public void testQueryProcessInstanceList() throws Exception { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); @@ -146,13 +156,14 @@ public class ProcessInstanceControllerTest extends AbstractControllerTest { @Test public void testViewVariables() throws Exception { - MvcResult mvcResult = mockMvc.perform(get("/projects/{projectCode}/instance/view-variables", "cxc_1113") - .header(SESSION_ID, sessionId) - .param("processInstanceId", "1204")) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) - .andReturn(); - + Map mockResult = new HashMap<>(); + mockResult.put(Constants.STATUS, Status.SUCCESS); + PowerMockito.when(processInstanceService.viewVariables(1113L,123)).thenReturn(mockResult); + MvcResult mvcResult = mockMvc.perform(get("/projects/{projectCode}/process-instances/{id}/view-variables", "1113", "123") + .header(SESSION_ID, sessionId)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON)) + .andReturn(); Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); Assert.assertNotNull(result); Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index bb279eabab..da942fbedf 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -122,6 +122,7 @@ public class ExecutorServiceTest { processDefinition.setUserId(userId); processDefinition.setVersion(1); processDefinition.setCode(1L); + processDefinition.setProjectCode(projectCode); // processInstance processInstance.setId(processInstanceId); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index bd8aa72fef..bc46d248be 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -20,9 +20,20 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,6 +62,15 @@ public class LoggerServiceTest { @Mock private ProcessService processService; + @Mock + private ProjectMapper projectMapper; + + @Mock + private ProjectService projectService; + + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + @Before public void init() { this.loggerService.init(); @@ -113,9 +133,87 @@ public class LoggerServiceTest { } + @Test + public void testQueryLogInSpecifiedProject() { + long projectCode = 1L; + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); + Project project = getProject(projectCode); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + TaskInstance taskInstance = new TaskInstance(); + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(projectCode); + taskDefinition.setCode(1L); + //SUCCESS + taskInstance.setTaskCode(1L); + taskInstance.setId(1); + taskInstance.setHost("127.0.0.1:8080"); + taskInstance.setLogPath("/temp/log"); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); + result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1); + Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); + } + + @Test + public void testGetLogBytesInSpecifiedProject() { + long projectCode = 1L; + Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); + Project project = getProject(projectCode); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + Map result = new HashMap<>(); + putMsg(result, Status.SUCCESS, projectCode); + TaskInstance taskInstance = new TaskInstance(); + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(projectCode); + taskDefinition.setCode(1L); + //SUCCESS + taskInstance.setTaskCode(1L); + taskInstance.setId(1); + taskInstance.setHost("127.0.0.1:8080"); + taskInstance.setLogPath("/temp/log"); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); + Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); + loggerService.getLogBytes(loginUser, projectCode, 1); + } + + @After public void close() { this.loggerService.close(); } + /** + * get mock Project + * + * @param projectCode projectCode + * @return Project + */ + private Project getProject(long projectCode) { + Project project = new Project(); + project.setCode(projectCode); + project.setId(1); + project.setName("test"); + project.setUserId(1); + return project; + } + + private void putMsg(Map result, Status status, Object... statusParams) { + result.put(Constants.STATUS, status); + if (statusParams != null && statusParams.length > 0) { + result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams)); + } else { + result.put(Constants.MSG, status.getMsg()); + } + } } \ No newline at end of file diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 2229eafbaa..68fc815fae 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -573,17 +573,17 @@ public class ProcessDefinitionServiceTest { public void testViewTree() { //process definition not exist ProcessDefinition processDefinition = getProcessDefinition(); - Map processDefinitionNullRes = processDefinitionService.viewTree(46, 10); + Map processDefinitionNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10); Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS)); //task instance not exist Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); - Map taskNullRes = processDefinitionService.viewTree(46, 10); + Map taskNullRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10); Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS)); //task instance exist - Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); + Map taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); } @@ -592,7 +592,7 @@ public class ProcessDefinitionServiceTest { ProcessDefinition processDefinition = getProcessDefinition(); Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>()); - Map taskNotNuLLRes = processDefinitionService.viewTree(46, 10); + Map taskNotNuLLRes = processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10); Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index ccce682e55..eba085bf87 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; @@ -48,6 +49,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -111,6 +113,8 @@ public class ProcessInstanceServiceTest { @Mock TenantMapper tenantMapper; + @Mock + TaskDefinitionMapper taskDefinitionMapper; private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789," + "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789," @@ -255,6 +259,7 @@ public class ProcessInstanceServiceTest { ProcessInstance processInstance = getProcessInstance(); putMsg(result, Status.SUCCESS, projectCode); ProcessDefinition processDefinition = getProcessDefinition(); + processDefinition.setProjectCode(projectCode); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance); @@ -347,6 +352,9 @@ public class ProcessInstanceServiceTest { taskInstance.setProcessInstanceId(1); putMsg(result, Status.SUCCESS, projectCode); when(processService.findTaskInstanceById(1)).thenReturn(taskInstance); + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(projectCode); + when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); Map notSubprocessRes = processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode, 1); Assert.assertEquals(Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, notSubprocessRes.get(Constants.STATUS)); @@ -410,6 +418,7 @@ public class ProcessInstanceServiceTest { ProcessDefinition processDefinition = getProcessDefinition(); processDefinition.setId(1); processDefinition.setUserId(1); + processDefinition.setProjectCode(projectCode); Tenant tenant = getTenant(); when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); @@ -497,7 +506,7 @@ public class ProcessInstanceServiceTest { processInstance.setScheduleTime(new Date()); processInstance.setGlobalParams(""); when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance); - Map successRes = processInstanceService.viewVariables(1); + Map successRes = processInstanceService.viewVariables(1L,1); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } @@ -522,7 +531,7 @@ public class ProcessInstanceServiceTest { when(processService.genDagGraph(Mockito.any(ProcessDefinition.class))) .thenReturn(graph); - Map successRes = processInstanceService.viewGantt(1); + Map successRes = processInstanceService.viewGantt(0L, 1); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java index 238d1d08fc..2f8079c558 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java @@ -103,6 +103,7 @@ public class SchedulerServiceTest { Project project = getProject(projectName, projectCode); ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setProjectCode(projectCode); Schedule schedule = new Schedule(); schedule.setId(1); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index d8852b4ce0..92f1e5fcec 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -317,6 +317,7 @@ public class TaskDefinitionServiceImplTest { Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition); TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, taskDefinition.getVersion())).thenReturn(taskDefinitionLog); + Map offlineTaskResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.OFFLINE); Assert.assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java index 08fdf5fe51..5544a18ee0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java @@ -32,9 +32,11 @@ import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -81,6 +83,9 @@ public class TaskInstanceServiceTest { @Mock UsersService usersService; + @Mock + TaskDefinitionMapper taskDefinitionMapper; + @Test public void queryTaskListPaging() { long projectCode = 1L; @@ -249,6 +254,9 @@ public class TaskInstanceServiceTest { // test task not found when(projectService.checkProjectAndAuth(user, project, projectCode)).thenReturn(mockSuccess); when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null); + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setProjectCode(projectCode); + when(taskDefinitionMapper.queryByCode(task.getTaskCode())).thenReturn(taskDefinition); Map taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectCode, taskId); Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java index 038ed5d2f6..51ccb73813 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -73,9 +73,10 @@ public interface ProcessDefinitionLogMapper extends BaseMapper queryProcessDefinitionVersionsPaging(Page page, @Param("code") long code); + IPage queryProcessDefinitionVersionsPaging(Page page, @Param("code") long code, @Param("projectCode") long projectCode); /** * delete the certain process definition version by process definition id and version number diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 7be58a7422..51f60295f4 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -213,14 +213,19 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query top n process instance order by running duration * + * @param size size + * @param startTime start time + * @param startTime end time * @param status process instance status + * @param projectCode project code * @return ProcessInstance list */ List queryTopNProcessInstance(@Param("size") int size, @Param("startTime") Date startTime, @Param("endTime") Date endTime, - @Param("status") ExecutionStatus status); + @Param("status") ExecutionStatus status, + @Param("projectCode") long projectCode); /** * query process instance by processDefinitionCode and stateArray diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java index 70ca9f70c3..ab2620fb51 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java @@ -78,8 +78,9 @@ public interface TaskDefinitionLogMapper extends BaseMapper { * query the paging task definition version list by pagination info * * @param page pagination info + * @param projectCode project code * @param code process definition code * @return the paging task definition version list */ - IPage queryTaskDefinitionVersionsPaging(Page page, @Param("code") long code); + IPage queryTaskDefinitionVersionsPaging(Page page, @Param("code") long code, @Param("projectCode") long projectCode); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml index 40afa04628..3abff6c9fd 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml @@ -70,6 +70,9 @@ from t_ds_process_definition_log where code = #{code} + + and project_code = #{projectCode} + order by version desc diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 08db3af58c..77d96b5b26 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -51,6 +51,9 @@ from t_ds_process_instance where state = #{status} + + and project_code = #{projectCode} + and start_time between #{startTime} and #{endTime} order by end_time-start_time desc diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml index da6c0ea2b1..0615167a89 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml @@ -73,6 +73,9 @@ from t_ds_task_definition_log where code = #{code} + + and project_code = #{projectCode} + order by version desc diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java index f0981b0c8d..5ce86b13d1 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java @@ -159,7 +159,7 @@ public class ProcessDefinitionLogMapperTest { public void testQueryProcessDefinitionVersionsPaging() { insertOne(); Page page = new Page(1, 3); - IPage processDefinitionLogs = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, 1L); + IPage processDefinitionLogs = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, 1L,1L); Assert.assertNotEquals(processDefinitionLogs.getTotal(), 0); } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java index a492beba92..613bb8bcbc 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java @@ -393,7 +393,7 @@ public class ProcessInstanceMapperTest { ProcessInstance processInstance3 = insertOne(startTime3, endTime3); Date start = new Date(2020, 1, 1, 1, 1, 1); Date end = new Date(2021, 1, 1, 1, 1, 1); - List processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS); + List processInstances = processInstanceMapper.queryTopNProcessInstance(2, start, end, ExecutionStatus.SUCCESS,0L); Assert.assertEquals(2, processInstances.size()); Assert.assertTrue(isSortedByDuration(processInstances)); for (ProcessInstance processInstance : processInstances) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 4e3349a9eb..640abb74af 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -241,7 +241,7 @@ public class WorkflowExecuteThread implements Runnable { } private void handleEvents() { - while (this.stateEvents.size() > 0) { + while (!this.stateEvents.isEmpty()) { try { StateEvent stateEvent = this.stateEvents.peek();