From dc55b5ba6faffc3587ed0a2b78b2ceaf891844f5 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Thu, 18 Feb 2021 19:27:13 +0800 Subject: [PATCH] [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code (#4765) * [Improvement-3369][api] Introduce monitor, processinstance and queue service interface for clear code --- .../api/service/MonitorService.java | 168 +--- .../api/service/ProcessInstanceService.java | 610 +------------- .../api/service/QueueService.java | 229 +----- .../api/service/impl/MonitorServiceImpl.java | 161 ++++ .../impl/ProcessInstanceServiceImpl.java | 741 ++++++++++++++++++ .../api/service/impl/QueueServiceImpl.java | 295 +++++++ .../api/service/MonitorServiceTest.java | 4 +- .../service/ProcessInstanceServiceTest.java | 3 +- .../api/service/QueueServiceTest.java | 3 +- .../dolphinscheduler/common/Constants.java | 12 +- .../server/registry/HeartBeatTask.java | 9 +- 11 files changed, 1284 insertions(+), 951 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java index e46ca6fcf2..51cba2ccdc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java @@ -14,143 +14,51 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.dao.entity.User; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.model.WorkerServerModel; -import org.apache.dolphinscheduler.dao.MonitorDBDao; -import org.apache.dolphinscheduler.dao.entity.MonitorRecord; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import com.google.common.collect.Sets; /** * monitor service */ -@Service -public class MonitorService extends BaseService { - - @Autowired - private ZookeeperMonitor zookeeperMonitor; - - @Autowired - private MonitorDBDao monitorDBDao; - /** - * query database state - * - * @param loginUser login user - * @return data base state - */ - public Map queryDatabaseState(User loginUser) { - Map result = new HashMap<>(); - - List monitorRecordList = monitorDBDao.queryDatabaseState(); - - result.put(Constants.DATA_LIST, monitorRecordList); - putMsg(result, Status.SUCCESS); - - return result; - - } - - /** - * query master list - * - * @param loginUser login user - * @return master information list - */ - public Map queryMaster(User loginUser) { - - Map result = new HashMap<>(); - - List masterServers = getServerListFromZK(true); - result.put(Constants.DATA_LIST, masterServers); - putMsg(result,Status.SUCCESS); - - return result; - } - - /** - * query zookeeper state - * - * @param loginUser login user - * @return zookeeper information list - */ - public Map queryZookeeperState(User loginUser) { - Map result = new HashMap<>(); - - List zookeeperRecordList = zookeeperMonitor.zookeeperInfoList(); - - result.put(Constants.DATA_LIST, zookeeperRecordList); - putMsg(result, Status.SUCCESS); - - return result; - - } - - - /** - * query worker list - * - * @param loginUser login user - * @return worker information list - */ - public Map queryWorker(User loginUser) { - - Map result = new HashMap<>(); - List workerServers = getServerListFromZK(false) - .stream() - .map((Server server) -> { - WorkerServerModel model = new WorkerServerModel(); - model.setId(server.getId()); - model.setHost(server.getHost()); - model.setPort(server.getPort()); - model.setZkDirectories(Sets.newHashSet(server.getZkDirectory())); - model.setResInfo(server.getResInfo()); - model.setCreateTime(server.getCreateTime()); - model.setLastHeartbeatTime(server.getLastHeartbeatTime()); - return model; - }) - .collect(Collectors.toList()); - - Map workerHostPortServerMapping = workerServers - .stream() - .collect(Collectors.toMap( - (WorkerServerModel worker) -> { - String[] s = worker.getZkDirectories().iterator().next().split("/"); - return s[s.length - 1]; - } - , Function.identity() - , (WorkerServerModel oldOne, WorkerServerModel newOne) -> { - oldOne.getZkDirectories().addAll(newOne.getZkDirectories()); - return oldOne; - })); - - result.put(Constants.DATA_LIST, workerHostPortServerMapping.values()); - putMsg(result,Status.SUCCESS); - - return result; - } - - public List getServerListFromZK(boolean isMaster) { - - checkNotNull(zookeeperMonitor); - ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; - return zookeeperMonitor.getServersList(zkNodeType); - } - +public interface MonitorService { + + /** + * query database state + * + * @param loginUser login user + * @return data base state + */ + Map queryDatabaseState(User loginUser); + + /** + * query master list + * + * @param loginUser login user + * @return master information list + */ + Map queryMaster(User loginUser); + + /** + * query zookeeper state + * + * @param loginUser login user + * @return zookeeper information list + */ + Map queryZookeeperState(User loginUser); + + /** + * query worker list + * + * @param loginUser login user + * @return worker information list + */ + Map queryWorker(User loginUser); + + List getServerListFromZK(boolean isMaster); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index 6458a768d8..914eb2dfee 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -17,157 +17,26 @@ package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; -import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; -import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; -import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; - -import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; -import org.apache.dolphinscheduler.api.dto.gantt.Task; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DependResult; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.TaskType; -import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.process.ProcessDag; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.ParameterUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessData; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; -import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; -import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.dao.utils.DagHelper; -import org.apache.dolphinscheduler.service.process.ProcessService; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; /** * process instance service */ -@Service -public class ProcessInstanceService extends BaseService { - - - private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); - - @Autowired - ProjectMapper projectMapper; - - @Autowired - ProjectService projectService; - - @Autowired - ProcessService processService; - - @Autowired - ProcessInstanceMapper processInstanceMapper; - - @Autowired - ProcessDefinitionMapper processDefineMapper; - - @Autowired - ProcessDefinitionService processDefinitionService; - - @Autowired - ProcessDefinitionVersionService processDefinitionVersionService; - - @Autowired - ExecutorService execService; - - @Autowired - TaskInstanceMapper taskInstanceMapper; - - @Autowired - LoggerService loggerService; - - - @Autowired - UsersService usersService; +public interface ProcessInstanceService { /** * return top n SUCCESS process instance order by running time which started between startTime and endTime */ - public Map queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) { - Map result = new HashMap<>(); - - Project project = projectMapper.queryByName(projectName); - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - if (0 > size) { - putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size); - return result; - } - if (Objects.isNull(startTime)) { - putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME); - return result; - } - Date start = DateUtils.stringToDate(startTime); - if (Objects.isNull(endTime)) { - putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME); - return result; - } - Date end = DateUtils.stringToDate(endTime); - if (start == null || end == null) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate"); - return result; - } - if (start.getTime() > end.getTime()) { - putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime); - return result; - } - - List processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS); - result.put(DATA_LIST, processInstances); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime); /** * query process instance by id @@ -177,24 +46,7 @@ public class ProcessInstanceService extends BaseService { * @param processId process instance id * @return process instance detail */ - public Map queryProcessInstanceById(User loginUser, String projectName, Integer processId) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); - - ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); - processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); - result.put(DATA_LIST, processInstance); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryProcessInstanceById(User loginUser, String projectName, Integer processId); /** * paging query process instance list, filtering according to project, process definition, time range, keyword, process status @@ -211,64 +63,10 @@ public class ProcessInstanceService extends BaseService { * @param endDate end time * @return process instance list */ - public Map queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId, - String startDate, String endDate, - String searchVal, String executorName, ExecutionStatus stateType, String host, - Integer pageNo, Integer pageSize) { - - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - int[] statusArray = null; - // filter by state - if (stateType != null) { - statusArray = new int[]{stateType.ordinal()}; - } - - Date start = null; - Date end = null; - try { - if (StringUtils.isNotEmpty(startDate)) { - start = DateUtils.getScheduleDate(startDate); - } - if (StringUtils.isNotEmpty(endDate)) { - end = DateUtils.getScheduleDate(endDate); - } - } catch (Exception e) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "startDate,endDate"); - return result; - } - - Page page = new Page<>(pageNo, pageSize); - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - int executorId = usersService.getUserIdByName(executorName); - - IPage processInstanceList = - processInstanceMapper.queryProcessInstanceListPaging(page, - project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end); - - List processInstances = processInstanceList.getRecords(); - - for (ProcessInstance processInstance : processInstances) { - processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())); - User executor = usersService.queryUser(processInstance.getExecutorId()); - if (null != executor) { - processInstance.setExecutorName(executor.getUserName()); - } - } - - pageInfo.setTotalCount((int) processInstanceList.getTotal()); - pageInfo.setLists(processInstances); - result.put(DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId, + String startDate, String endDate, + String searchVal, String executorName, ExecutionStatus stateType, String host, + Integer pageNo, Integer pageSize); /** * query task list by process instance id @@ -279,71 +77,9 @@ public class ProcessInstanceService extends BaseService { * @return task list for the process instance * @throws IOException io exception */ - public Map queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); - List taskInstanceList = processService.findValidTaskListByProcessId(processId); - addDependResultForTaskList(taskInstanceList); - Map resultMap = new HashMap<>(); - resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); - resultMap.put(TASK_LIST, taskInstanceList); - result.put(DATA_LIST, resultMap); - - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * add dependent result for dependent task - */ - private void addDependResultForTaskList(List taskInstanceList) throws IOException { - for (TaskInstance taskInstance : taskInstanceList) { - if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) { - Result logResult = loggerService.queryLog( - taskInstance.getId(), 0, 4098); - if (logResult.getCode() == Status.SUCCESS.ordinal()) { - String log = logResult.getData(); - Map resultMap = parseLogForDependentResult(log); - taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap)); - } - } - } - } + Map queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException; - public Map parseLogForDependentResult(String log) throws IOException { - Map resultMap = new HashMap<>(); - if (StringUtils.isEmpty(log)) { - return resultMap; - } - - BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(log.getBytes( - StandardCharsets.UTF_8)), StandardCharsets.UTF_8)); - String line; - while ((line = br.readLine()) != null) { - if (line.contains(DEPENDENT_SPLIT)) { - String[] tmpStringArray = line.split(":\\|\\|"); - if (tmpStringArray.length != 2) { - continue; - } - String dependResultString = tmpStringArray[1]; - String[] dependStringArray = dependResultString.split(","); - if (dependStringArray.length != 2) { - continue; - } - String key = dependStringArray[0].trim(); - DependResult dependResult = DependResult.valueOf(dependStringArray[1].trim()); - resultMap.put(key, dependResult); - } - } - return resultMap; - } + Map parseLogForDependentResult(String log) throws IOException; /** * query sub process instance detail info by task id @@ -353,38 +89,7 @@ public class ProcessInstanceService extends BaseService { * @param taskId task id * @return sub process instance detail */ - public Map querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - TaskInstance taskInstance = processService.findTaskInstanceById(taskId); - if (taskInstance == null) { - putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); - return result; - } - if (!taskInstance.isSubProcess()) { - putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName()); - return result; - } - - ProcessInstance subWorkflowInstance = processService.findSubProcessInstance( - taskInstance.getProcessInstanceId(), taskInstance.getId()); - if (subWorkflowInstance == null) { - putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); - return result; - } - Map dataMap = new HashMap<>(); - dataMap.put("subProcessInstanceId", subWorkflowInstance.getId()); - result.put(DATA_LIST, dataMap); - putMsg(result, Status.SUCCESS); - return result; - } + Map querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId); /** * update process instance @@ -401,97 +106,9 @@ public class ProcessInstanceService extends BaseService { * @return update result code * @throws ParseException parse exception for json parse */ - public Map updateProcessInstance(User loginUser, String projectName, Integer processInstanceId, - String processInstanceJson, String scheduleTime, Boolean syncDefine, - Flag flag, String locations, String connects) throws ParseException { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - //check project permission - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - //check process instance exists - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - if (processInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); - return result; - } - - //check process instance status - if (!processInstance.getState().typeIsFinished()) { - putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, - processInstance.getName(), processInstance.getState().toString(), "update"); - return result; - } - Date schedule = null; - schedule = processInstance.getScheduleTime(); - if (scheduleTime != null) { - schedule = DateUtils.getScheduleDate(scheduleTime); - } - processInstance.setScheduleTime(schedule); - processInstance.setLocations(locations); - processInstance.setConnects(connects); - String globalParams = null; - String originDefParams = null; - int timeout = processInstance.getTimeout(); - ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); - if (StringUtils.isNotEmpty(processInstanceJson)) { - ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); - //check workflow json is valid - Map checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson); - if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) { - return result; - } - - originDefParams = JSONUtils.toJsonString(processData.getGlobalParams()); - List globalParamList = processData.getGlobalParams(); - Map globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, - processInstance.getCmdTypeIfComplement(), schedule); - timeout = processData.getTimeout(); - processInstance.setTimeout(timeout); - Tenant tenant = processService.getTenantForProcess(processData.getTenantId(), - processDefinition.getUserId()); - if (tenant != null) { - processInstance.setTenantCode(tenant.getTenantCode()); - } - // get the processinstancejson before saving,and then save the name and taskid - String oldJson = processInstance.getProcessInstanceJson(); - if (StringUtils.isNotEmpty(oldJson)) { - processInstanceJson = processService.changeJson(processData,oldJson); - } - processInstance.setProcessInstanceJson(processInstanceJson); - processInstance.setGlobalParams(globalParams); - } - - int update = processService.updateProcessInstance(processInstance); - int updateDefine = 1; - if (Boolean.TRUE.equals(syncDefine)) { - processDefinition.setProcessDefinitionJson(processInstanceJson); - processDefinition.setGlobalParams(originDefParams); - processDefinition.setLocations(locations); - processDefinition.setConnects(connects); - processDefinition.setTimeout(timeout); - processDefinition.setUpdateTime(new Date()); - - // add process definition version - long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); - processDefinition.setVersion(version); - updateDefine = processDefineMapper.updateById(processDefinition); - } - if (update > 0 && updateDefine > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); - } - - return result; - - } + Map updateProcessInstance(User loginUser, String projectName, Integer processInstanceId, + String processInstanceJson, String scheduleTime, Boolean syncDefine, + Flag flag, String locations, String connects) throws ParseException; /** * query parent process instance detail info by sub process instance id @@ -501,37 +118,7 @@ public class ProcessInstanceService extends BaseService { * @param subId sub process id * @return parent instance detail */ - public Map queryParentInstanceBySubId(User loginUser, String projectName, Integer subId) { - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId); - if (subInstance == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId); - return result; - } - if (subInstance.getIsSubProcess() == Flag.NO) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName()); - return result; - } - - ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId); - if (parentWorkflowInstance == null) { - putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST); - return result; - } - Map dataMap = new HashMap<>(); - dataMap.put("parentWorkflowInstance", parentWorkflowInstance.getId()); - result.put(DATA_LIST, dataMap); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryParentInstanceBySubId(User loginUser, String projectName, Integer subId); /** * delete process instance by id, at the same time,delete task instance and their mapping relation data @@ -541,38 +128,7 @@ public class ProcessInstanceService extends BaseService { * @param processInstanceId process instance id * @return delete result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) { - - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); - if (null == processInstance) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); - return result; - } - - processService.removeTaskLogFile(processInstanceId); - // delete database cascade - int delete = processService.deleteWorkProcessInstanceById(processInstanceId); - - processService.deleteAllSubWorkProcessByParentId(processInstanceId); - processService.deleteWorkProcessMapByParentId(processInstanceId); - - if (delete > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); - } - - return result; - } + Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId); /** * view process instance variables @@ -580,71 +136,7 @@ public class ProcessInstanceService extends BaseService { * @param processInstanceId process instance id * @return variables data */ - public Map viewVariables(Integer processInstanceId) { - Map result = new HashMap<>(); - - ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); - - if (processInstance == null) { - throw new RuntimeException("workflow instance is null"); - } - - Map timeParams = BusinessTimeUtils - .getBusinessTime(processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); - - String workflowInstanceJson = processInstance.getProcessInstanceJson(); - - ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class); - - String userDefinedParams = processInstance.getGlobalParams(); - - // global params - List globalParams = new ArrayList<>(); - - if (userDefinedParams != null && userDefinedParams.length() > 0) { - globalParams = JSONUtils.toList(userDefinedParams, Property.class); - } - - List taskNodeList = workflowData.getTasks(); - - // global param string - String globalParamStr = JSONUtils.toJsonString(globalParams); - globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); - globalParams = JSONUtils.toList(globalParamStr, Property.class); - for (Property property : globalParams) { - timeParams.put(property.getProp(), property.getValue()); - } - - // local params - Map> localUserDefParams = new HashMap<>(); - for (TaskNode taskNode : taskNodeList) { - String parameter = taskNode.getParams(); - Map map = JSONUtils.toMap(parameter); - String localParams = map.get(LOCAL_PARAMS); - if (localParams != null && !localParams.isEmpty()) { - localParams = ParameterUtils.convertParameterPlaceholders(localParams, timeParams); - List localParamsList = JSONUtils.toList(localParams, Property.class); - - Map localParamsMap = new HashMap<>(); - localParamsMap.put("taskType", taskNode.getType()); - localParamsMap.put("localParamsList", localParamsList); - if (CollectionUtils.isNotEmpty(localParamsList)) { - localUserDefParams.put(taskNode.getName(), localParamsMap); - } - } - - } - - Map resultMap = new HashMap<>(); - - resultMap.put(GLOBAL_PARAMS, globalParams); - resultMap.put(LOCAL_PARAMS, localUserDefParams); - - result.put(DATA_LIST, resultMap); - putMsg(result, Status.SUCCESS); - return result; - } + Map viewVariables(Integer processInstanceId); /** * encapsulation gantt structure @@ -653,67 +145,7 @@ public class ProcessInstanceService extends BaseService { * @return gantt tree data * @throws Exception exception when json parse */ - public Map viewGantt(Integer processInstanceId) throws Exception { - Map result = new HashMap<>(); - - ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); - - if (processInstance == null) { - throw new RuntimeException("workflow instance is null"); - } - - GanttDto ganttDto = new GanttDto(); - - DAG dag = processInstance2DAG(processInstance); - //topological sort - List nodeList = dag.topologicalSort(); - - ganttDto.setTaskNames(nodeList); - - List taskList = new ArrayList<>(); - for (String node : nodeList) { - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node); - if (taskInstance == null) { - continue; - } - Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); - Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); - Task task = new Task(); - task.setTaskName(taskInstance.getName()); - task.getStartDate().add(startTime.getTime()); - task.getEndDate().add(endTime.getTime()); - task.setIsoStart(startTime); - task.setIsoEnd(endTime); - task.setStatus(taskInstance.getState().toString()); - task.setExecutionDate(taskInstance.getStartTime()); - task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime())); - taskList.add(task); - } - ganttDto.setTasks(taskList); - - result.put(DATA_LIST, ganttDto); - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * process instance to DAG - * - * @param processInstance input process instance - * @return process instance dag. - */ - private static DAG processInstance2DAG(ProcessInstance processInstance) { - - String processDefinitionJson = processInstance.getProcessInstanceJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - List taskNodeList = processData.getTasks(); - - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - - return DagHelper.buildDagGraph(processDag); - } + Map viewGantt(Integer processInstanceId) throws Exception; /** * query process instance by processDefinitionId and stateArray @@ -721,9 +153,7 @@ public class ProcessInstanceService extends BaseService { * @param states states array * @return process instance list */ - public List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) { - return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states); - } + List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states); /** * query process instance by processDefinitionId @@ -731,8 +161,6 @@ public class ProcessInstanceService extends BaseService { * @param size size * @return process instance list */ - public List queryByProcessDefineId(int processDefinitionId,int size) { - return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size); - } + List queryByProcessDefineId(int processDefinitionId,int size); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java index 23de453e11..24f6189afc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java @@ -14,43 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.QueueMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import org.apache.commons.lang.StringUtils; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import java.util.Date; -import java.util.HashMap; -import java.util.List; import java.util.Map; /** * queue service */ -@Service -public class QueueService extends BaseService { - - private static final Logger logger = LoggerFactory.getLogger(QueueService.class); - - @Autowired - private QueueMapper queueMapper; - - @Autowired - private UserMapper userMapper; +public interface QueueService { /** * query queue list @@ -58,18 +33,7 @@ public class QueueService extends BaseService { * @param loginUser login user * @return queue list */ - public Map queryList(User loginUser) { - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - - List queueList = queueMapper.selectList(null); - result.put(Constants.DATA_LIST, queueList); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryList(User loginUser); /** * query queue list paging @@ -80,26 +44,7 @@ public class QueueService extends BaseService { * @param pageSize page size * @return queue list */ - public Map queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - - Page page = new Page(pageNo, pageSize); - - - IPage queueList = queueMapper.queryQueuePaging(page, searchVal); - - Integer count = (int) queueList.getTotal(); - PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); - pageInfo.setTotalCount(count); - pageInfo.setLists(queueList.getRecords()); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize); /** * create queue @@ -109,45 +54,7 @@ public class QueueService extends BaseService { * @param queueName queue name * @return create result */ - public Map createQueue(User loginUser, String queue, String queueName) { - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - - if (StringUtils.isEmpty(queue)) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue"); - return result; - } - - if (StringUtils.isEmpty(queueName)) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName"); - return result; - } - - if (checkQueueNameExist(queueName)) { - putMsg(result, Status.QUEUE_NAME_EXIST, queueName); - return result; - } - - if (checkQueueExist(queue)) { - putMsg(result, Status.QUEUE_VALUE_EXIST, queue); - return result; - } - - Queue queueObj = new Queue(); - Date now = new Date(); - - queueObj.setQueue(queue); - queueObj.setQueueName(queueName); - queueObj.setCreateTime(now); - queueObj.setUpdateTime(now); - - queueMapper.insert(queueObj); - putMsg(result, Status.SUCCESS); - - return result; - } + Map createQueue(User loginUser, String queue, String queueName); /** * update queue @@ -158,66 +65,7 @@ public class QueueService extends BaseService { * @param queueName queue name * @return update result code */ - public Map updateQueue(User loginUser, int id, String queue, String queueName) { - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - - if (StringUtils.isEmpty(queue)) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue"); - return result; - } - - if (StringUtils.isEmpty(queueName)) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName"); - return result; - } - - Queue queueObj = queueMapper.selectById(id); - if (queueObj == null) { - putMsg(result, Status.QUEUE_NOT_EXIST, id); - return result; - } - - // whether queue value or queueName is changed - if (queue.equals(queueObj.getQueue()) && queueName.equals(queueObj.getQueueName())) { - putMsg(result, Status.NEED_NOT_UPDATE_QUEUE); - return result; - } - - // check queue name is exist - if (!queueName.equals(queueObj.getQueueName()) - && checkQueueNameExist(queueName)) { - putMsg(result, Status.QUEUE_NAME_EXIST, queueName); - return result; - } - - // check queue value is exist - if (!queue.equals(queueObj.getQueue()) && checkQueueExist(queue)) { - putMsg(result, Status.QUEUE_VALUE_EXIST, queue); - return result; - } - - // check old queue using by any user - if (checkIfQueueIsInUsing(queueObj.getQueueName(), queueName)) { - //update user related old queue - Integer relatedUserNums = userMapper.updateUserQueue(queueObj.getQueueName(), queueName); - logger.info("old queue have related {} user, exec update user success.", relatedUserNums); - } - - // update queue - Date now = new Date(); - queueObj.setQueue(queue); - queueObj.setQueueName(queueName); - queueObj.setUpdateTime(now); - - queueMapper.updateById(queueObj); - - putMsg(result, Status.SUCCESS); - - return result; - } + Map updateQueue(User loginUser, int id, String queue, String queueName); /** * verify queue and queueName @@ -226,69 +74,6 @@ public class QueueService extends BaseService { * @param queueName queue name * @return true if the queue name not exists, otherwise return false */ - public Result verifyQueue(String queue, String queueName) { - Result result = new Result(); - - if (StringUtils.isEmpty(queue)) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queue"); - return result; - } - - if (StringUtils.isEmpty(queueName)) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, "queueName"); - return result; - } - - - if (checkQueueNameExist(queueName)) { - logger.error("queue name {} has exist, can't create again.", queueName); - putMsg(result, Status.QUEUE_NAME_EXIST, queueName); - return result; - } - - if (checkQueueExist(queue)) { - logger.error("queue value {} has exist, can't create again.", queue); - putMsg(result, Status.QUEUE_VALUE_EXIST, queue); - return result; - } - - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * check queue exist - * if exists return true,not exists return false - * check queue exist - * - * @param queue queue - * @return true if the queue not exists, otherwise return false - */ - private boolean checkQueueExist(String queue) { - return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(queue, null)); - } - - /** - * check queue name exist - * if exists return true,not exists return false - * - * @param queueName queue name - * @return true if the queue name not exists, otherwise return false - */ - private boolean checkQueueNameExist(String queueName) { - return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(null, queueName)); - } - - /** - * check old queue name using by any user - * if need to update user - * - * @param oldQueue old queue name - * @param newQueue new queue name - * @return true if need to update user - */ - private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) { - return !oldQueue.equals(newQueue) && CollectionUtils.isNotEmpty(userMapper.queryUserListByQueue(oldQueue)); - } + Result verifyQueue(String queue, String queueName); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java new file mode 100644 index 0000000000..c2eb9ee94f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.MonitorService; +import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.model.WorkerServerModel; +import org.apache.dolphinscheduler.dao.MonitorDBDao; +import org.apache.dolphinscheduler.dao.entity.MonitorRecord; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.google.common.collect.Sets; + +/** + * monitor service impl + */ +@Service +public class MonitorServiceImpl extends BaseService implements MonitorService { + + @Autowired + private ZookeeperMonitor zookeeperMonitor; + + @Autowired + private MonitorDBDao monitorDBDao; + + /** + * query database state + * + * @param loginUser login user + * @return data base state + */ + public Map queryDatabaseState(User loginUser) { + Map result = new HashMap<>(); + + List monitorRecordList = monitorDBDao.queryDatabaseState(); + + result.put(Constants.DATA_LIST, monitorRecordList); + putMsg(result, Status.SUCCESS); + + return result; + + } + + /** + * query master list + * + * @param loginUser login user + * @return master information list + */ + public Map queryMaster(User loginUser) { + + Map result = new HashMap<>(); + + List masterServers = getServerListFromZK(true); + result.put(Constants.DATA_LIST, masterServers); + putMsg(result,Status.SUCCESS); + + return result; + } + + /** + * query zookeeper state + * + * @param loginUser login user + * @return zookeeper information list + */ + public Map queryZookeeperState(User loginUser) { + Map result = new HashMap<>(); + + List zookeeperRecordList = zookeeperMonitor.zookeeperInfoList(); + + result.put(Constants.DATA_LIST, zookeeperRecordList); + putMsg(result, Status.SUCCESS); + + return result; + + } + + /** + * query worker list + * + * @param loginUser login user + * @return worker information list + */ + public Map queryWorker(User loginUser) { + + Map result = new HashMap<>(); + List workerServers = getServerListFromZK(false) + .stream() + .map((Server server) -> { + WorkerServerModel model = new WorkerServerModel(); + model.setId(server.getId()); + model.setHost(server.getHost()); + model.setPort(server.getPort()); + model.setZkDirectories(Sets.newHashSet(server.getZkDirectory())); + model.setResInfo(server.getResInfo()); + model.setCreateTime(server.getCreateTime()); + model.setLastHeartbeatTime(server.getLastHeartbeatTime()); + return model; + }) + .collect(Collectors.toList()); + + Map workerHostPortServerMapping = workerServers + .stream() + .collect(Collectors.toMap( + (WorkerServerModel worker) -> { + String[] s = worker.getZkDirectories().iterator().next().split("/"); + return s[s.length - 1]; + } + , Function.identity() + , (WorkerServerModel oldOne, WorkerServerModel newOne) -> { + oldOne.getZkDirectories().addAll(newOne.getZkDirectories()); + return oldOne; + })); + + result.put(Constants.DATA_LIST, workerHostPortServerMapping.values()); + putMsg(result,Status.SUCCESS); + + return result; + } + + public List getServerListFromZK(boolean isMaster) { + + checkNotNull(zookeeperMonitor); + ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; + return zookeeperMonitor.getServersList(zkNodeType); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java new file mode 100644 index 0000000000..926e3b0ff8 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; +import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; + +import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; +import org.apache.dolphinscheduler.api.dto.gantt.Task; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.ExecutorService; +import org.apache.dolphinscheduler.api.service.LoggerService; +import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService; +import org.apache.dolphinscheduler.api.service.ProcessInstanceService; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.UsersService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DependResult; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * process instance service impl + */ +@Service +public class ProcessInstanceServiceImpl extends BaseService implements ProcessInstanceService { + + @Autowired + ProjectMapper projectMapper; + + @Autowired + ProjectService projectService; + + @Autowired + ProcessService processService; + + @Autowired + ProcessInstanceMapper processInstanceMapper; + + @Autowired + ProcessDefinitionMapper processDefineMapper; + + @Autowired + ProcessDefinitionService processDefinitionService; + + @Autowired + ProcessDefinitionVersionService processDefinitionVersionService; + + @Autowired + ExecutorService execService; + + @Autowired + TaskInstanceMapper taskInstanceMapper; + + @Autowired + LoggerService loggerService; + + + @Autowired + UsersService usersService; + + /** + * return top n SUCCESS process instance order by running time which started between startTime and endTime + */ + public Map queryTopNLongestRunningProcessInstance(User loginUser, String projectName, int size, String startTime, String endTime) { + Map result = new HashMap<>(); + + Project project = projectMapper.queryByName(projectName); + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + if (0 > size) { + putMsg(result, Status.NEGTIVE_SIZE_NUMBER_ERROR, size); + return result; + } + if (Objects.isNull(startTime)) { + putMsg(result, Status.DATA_IS_NULL, Constants.START_TIME); + return result; + } + Date start = DateUtils.stringToDate(startTime); + if (Objects.isNull(endTime)) { + putMsg(result, Status.DATA_IS_NULL, Constants.END_TIME); + return result; + } + Date end = DateUtils.stringToDate(endTime); + if (start == null || end == null) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); + return result; + } + if (start.getTime() > end.getTime()) { + putMsg(result, Status.START_TIME_BIGGER_THAN_END_TIME_ERROR, startTime, endTime); + return result; + } + + List processInstances = processInstanceMapper.queryTopNProcessInstance(size, start, end, ExecutionStatus.SUCCESS); + result.put(DATA_LIST, processInstances); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query process instance by id + * + * @param loginUser login user + * @param projectName project name + * @param processId process instance id + * @return process instance detail + */ + public Map queryProcessInstanceById(User loginUser, String projectName, Integer processId) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); + + ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + processInstance.setWarningGroupId(processDefinition.getWarningGroupId()); + result.put(DATA_LIST, processInstance); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * paging query process instance list, filtering according to project, process definition, time range, keyword, process status + * + * @param loginUser login user + * @param projectName project name + * @param pageNo page number + * @param pageSize page size + * @param processDefineId process definition id + * @param searchVal search value + * @param stateType state type + * @param host host + * @param startDate start time + * @param endDate end time + * @return process instance list + */ + public Map queryProcessInstanceList(User loginUser, String projectName, Integer processDefineId, + String startDate, String endDate, + String searchVal, String executorName, ExecutionStatus stateType, String host, + Integer pageNo, Integer pageSize) { + + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + int[] statusArray = null; + // filter by state + if (stateType != null) { + statusArray = new int[]{stateType.ordinal()}; + } + + Date start = null; + Date end = null; + try { + if (StringUtils.isNotEmpty(startDate)) { + start = DateUtils.getScheduleDate(startDate); + } + if (StringUtils.isNotEmpty(endDate)) { + end = DateUtils.getScheduleDate(endDate); + } + } catch (Exception e) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.START_END_DATE); + return result; + } + + Page page = new Page<>(pageNo, pageSize); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + int executorId = usersService.getUserIdByName(executorName); + + IPage processInstanceList = + processInstanceMapper.queryProcessInstanceListPaging(page, + project.getId(), processDefineId, searchVal, executorId, statusArray, host, start, end); + + List processInstances = processInstanceList.getRecords(); + + for (ProcessInstance processInstance : processInstances) { + processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())); + User executor = usersService.queryUser(processInstance.getExecutorId()); + if (null != executor) { + processInstance.setExecutorName(executor.getUserName()); + } + } + + pageInfo.setTotalCount((int) processInstanceList.getTotal()); + pageInfo.setLists(processInstances); + result.put(DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query task list by process instance id + * + * @param loginUser login user + * @param projectName project name + * @param processId process instance id + * @return task list for the process instance + * @throws IOException io exception + */ + public Map queryTaskListByProcessId(User loginUser, String projectName, Integer processId) throws IOException { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processId); + List taskInstanceList = processService.findValidTaskListByProcessId(processId); + addDependResultForTaskList(taskInstanceList); + Map resultMap = new HashMap<>(); + resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); + resultMap.put(TASK_LIST, taskInstanceList); + result.put(DATA_LIST, resultMap); + + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * add dependent result for dependent task + */ + private void addDependResultForTaskList(List taskInstanceList) throws IOException { + for (TaskInstance taskInstance : taskInstanceList) { + if (taskInstance.getTaskType().equalsIgnoreCase(TaskType.DEPENDENT.toString())) { + Result logResult = loggerService.queryLog( + taskInstance.getId(), Constants.LOG_QUERY_SKIP_LINE_NUMBER, Constants.LOG_QUERY_LIMIT); + if (logResult.getCode() == Status.SUCCESS.ordinal()) { + String log = logResult.getData(); + Map resultMap = parseLogForDependentResult(log); + taskInstance.setDependentResult(JSONUtils.toJsonString(resultMap)); + } + } + } + } + + public Map parseLogForDependentResult(String log) throws IOException { + Map resultMap = new HashMap<>(); + if (StringUtils.isEmpty(log)) { + return resultMap; + } + + BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(log.getBytes( + StandardCharsets.UTF_8)), StandardCharsets.UTF_8)); + String line; + while ((line = br.readLine()) != null) { + if (line.contains(DEPENDENT_SPLIT)) { + String[] tmpStringArray = line.split(":\\|\\|"); + if (tmpStringArray.length != 2) { + continue; + } + String dependResultString = tmpStringArray[1]; + String[] dependStringArray = dependResultString.split(","); + if (dependStringArray.length != 2) { + continue; + } + String key = dependStringArray[0].trim(); + DependResult dependResult = DependResult.valueOf(dependStringArray[1].trim()); + resultMap.put(key, dependResult); + } + } + return resultMap; + } + + /** + * query sub process instance detail info by task id + * + * @param loginUser login user + * @param projectName project name + * @param taskId task id + * @return sub process instance detail + */ + public Map querySubProcessInstanceByTaskId(User loginUser, String projectName, Integer taskId) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + TaskInstance taskInstance = processService.findTaskInstanceById(taskId); + if (taskInstance == null) { + putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId); + return result; + } + if (!taskInstance.isSubProcess()) { + putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName()); + return result; + } + + ProcessInstance subWorkflowInstance = processService.findSubProcessInstance( + taskInstance.getProcessInstanceId(), taskInstance.getId()); + if (subWorkflowInstance == null) { + putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST, taskId); + return result; + } + Map dataMap = new HashMap<>(); + dataMap.put(Constants.SUBPROCESS_INSTANCE_ID, subWorkflowInstance.getId()); + result.put(DATA_LIST, dataMap); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * update process instance + * + * @param loginUser login user + * @param projectName project name + * @param processInstanceJson process instance json + * @param processInstanceId process instance id + * @param scheduleTime schedule time + * @param syncDefine sync define + * @param flag flag + * @param locations locations + * @param connects connects + * @return update result code + * @throws ParseException parse exception for json parse + */ + public Map updateProcessInstance(User loginUser, String projectName, Integer processInstanceId, + String processInstanceJson, String scheduleTime, Boolean syncDefine, + Flag flag, String locations, String connects) throws ParseException { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + //check project permission + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + //check process instance exists + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); + if (processInstance == null) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + + //check process instance status + if (!processInstance.getState().typeIsFinished()) { + putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, + processInstance.getName(), processInstance.getState().toString(), "update"); + return result; + } + Date schedule = null; + schedule = processInstance.getScheduleTime(); + if (scheduleTime != null) { + schedule = DateUtils.getScheduleDate(scheduleTime); + } + processInstance.setScheduleTime(schedule); + processInstance.setLocations(locations); + processInstance.setConnects(connects); + String globalParams = null; + String originDefParams = null; + int timeout = processInstance.getTimeout(); + ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); + if (StringUtils.isNotEmpty(processInstanceJson)) { + ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class); + //check workflow json is valid + Map checkFlowJson = processDefinitionService.checkProcessNodeList(processData, processInstanceJson); + if (checkFlowJson.get(Constants.STATUS) != Status.SUCCESS) { + return result; + } + + originDefParams = JSONUtils.toJsonString(processData.getGlobalParams()); + List globalParamList = processData.getGlobalParams(); + Map globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, + processInstance.getCmdTypeIfComplement(), schedule); + timeout = processData.getTimeout(); + processInstance.setTimeout(timeout); + Tenant tenant = processService.getTenantForProcess(processData.getTenantId(), + processDefinition.getUserId()); + if (tenant != null) { + processInstance.setTenantCode(tenant.getTenantCode()); + } + // get the processinstancejson before saving,and then save the name and taskid + String oldJson = processInstance.getProcessInstanceJson(); + if (StringUtils.isNotEmpty(oldJson)) { + processInstanceJson = processService.changeJson(processData,oldJson); + } + processInstance.setProcessInstanceJson(processInstanceJson); + processInstance.setGlobalParams(globalParams); + } + + int update = processService.updateProcessInstance(processInstance); + int updateDefine = 1; + if (Boolean.TRUE.equals(syncDefine)) { + processDefinition.setProcessDefinitionJson(processInstanceJson); + processDefinition.setGlobalParams(originDefParams); + processDefinition.setLocations(locations); + processDefinition.setConnects(connects); + processDefinition.setTimeout(timeout); + processDefinition.setUpdateTime(new Date()); + + // add process definition version + long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); + processDefinition.setVersion(version); + updateDefine = processDefineMapper.updateById(processDefinition); + } + if (update > 0 && updateDefine > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); + } + + return result; + + } + + /** + * query parent process instance detail info by sub process instance id + * + * @param loginUser login user + * @param projectName project name + * @param subId sub process id + * @return parent instance detail + */ + public Map queryParentInstanceBySubId(User loginUser, String projectName, Integer subId) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + ProcessInstance subInstance = processService.findProcessInstanceDetailById(subId); + if (subInstance == null) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, subId); + return result; + } + if (subInstance.getIsSubProcess() == Flag.NO) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_SUB_PROCESS_INSTANCE, subInstance.getName()); + return result; + } + + ProcessInstance parentWorkflowInstance = processService.findParentProcessInstance(subId); + if (parentWorkflowInstance == null) { + putMsg(result, Status.SUB_PROCESS_INSTANCE_NOT_EXIST); + return result; + } + Map dataMap = new HashMap<>(); + dataMap.put(Constants.PARENT_WORKFLOW_INSTANCE, parentWorkflowInstance.getId()); + result.put(DATA_LIST, dataMap); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * delete process instance by id, at the same time,delete task instance and their mapping relation data + * + * @param loginUser login user + * @param projectName project name + * @param processInstanceId process instance id + * @return delete result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId) { + + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId); + if (null == processInstance) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + + processService.removeTaskLogFile(processInstanceId); + // delete database cascade + int delete = processService.deleteWorkProcessInstanceById(processInstanceId); + + processService.deleteAllSubWorkProcessByParentId(processInstanceId); + processService.deleteWorkProcessMapByParentId(processInstanceId); + + if (delete > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); + } + + return result; + } + + /** + * view process instance variables + * + * @param processInstanceId process instance id + * @return variables data + */ + public Map viewVariables(Integer processInstanceId) { + Map result = new HashMap<>(); + + ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); + + if (processInstance == null) { + throw new RuntimeException("workflow instance is null"); + } + + Map timeParams = BusinessTimeUtils + .getBusinessTime(processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + + String workflowInstanceJson = processInstance.getProcessInstanceJson(); + + ProcessData workflowData = JSONUtils.parseObject(workflowInstanceJson, ProcessData.class); + + String userDefinedParams = processInstance.getGlobalParams(); + + // global params + List globalParams = new ArrayList<>(); + + if (userDefinedParams != null && userDefinedParams.length() > 0) { + globalParams = JSONUtils.toList(userDefinedParams, Property.class); + } + + List taskNodeList = workflowData.getTasks(); + + // global param string + String globalParamStr = JSONUtils.toJsonString(globalParams); + globalParamStr = ParameterUtils.convertParameterPlaceholders(globalParamStr, timeParams); + globalParams = JSONUtils.toList(globalParamStr, Property.class); + for (Property property : globalParams) { + timeParams.put(property.getProp(), property.getValue()); + } + + // local params + Map> localUserDefParams = new HashMap<>(); + for (TaskNode taskNode : taskNodeList) { + String parameter = taskNode.getParams(); + Map map = JSONUtils.toMap(parameter); + String localParams = map.get(LOCAL_PARAMS); + if (localParams != null && !localParams.isEmpty()) { + localParams = ParameterUtils.convertParameterPlaceholders(localParams, timeParams); + List localParamsList = JSONUtils.toList(localParams, Property.class); + + Map localParamsMap = new HashMap<>(); + localParamsMap.put(Constants.TASK_TYPE, taskNode.getType()); + localParamsMap.put(Constants.LOCAL_PARAMS_LIST, localParamsList); + if (CollectionUtils.isNotEmpty(localParamsList)) { + localUserDefParams.put(taskNode.getName(), localParamsMap); + } + } + + } + + Map resultMap = new HashMap<>(); + + resultMap.put(GLOBAL_PARAMS, globalParams); + resultMap.put(LOCAL_PARAMS, localUserDefParams); + + result.put(DATA_LIST, resultMap); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * encapsulation gantt structure + * + * @param processInstanceId process instance id + * @return gantt tree data + * @throws Exception exception when json parse + */ + public Map viewGantt(Integer processInstanceId) throws Exception { + Map result = new HashMap<>(); + + ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); + + if (processInstance == null) { + throw new RuntimeException("workflow instance is null"); + } + + GanttDto ganttDto = new GanttDto(); + + DAG dag = processInstance2DAG(processInstance); + //topological sort + List nodeList = dag.topologicalSort(); + + ganttDto.setTaskNames(nodeList); + + List taskList = new ArrayList<>(); + for (String node : nodeList) { + TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstanceId, node); + if (taskInstance == null) { + continue; + } + Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); + Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); + Task task = new Task(); + task.setTaskName(taskInstance.getName()); + task.getStartDate().add(startTime.getTime()); + task.getEndDate().add(endTime.getTime()); + task.setIsoStart(startTime); + task.setIsoEnd(endTime); + task.setStatus(taskInstance.getState().toString()); + task.setExecutionDate(taskInstance.getStartTime()); + task.setDuration(DateUtils.format2Readable(endTime.getTime() - startTime.getTime())); + taskList.add(task); + } + ganttDto.setTasks(taskList); + + result.put(DATA_LIST, ganttDto); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * process instance to DAG + * + * @param processInstance input process instance + * @return process instance dag. + */ + private static DAG processInstance2DAG(ProcessInstance processInstance) { + + String processDefinitionJson = processInstance.getProcessInstanceJson(); + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + List taskNodeList = processData.getTasks(); + + ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); + + return DagHelper.buildDagGraph(processDag); + } + + /** + * query process instance by processDefinitionId and stateArray + * @param processDefinitionId processDefinitionId + * @param states states array + * @return process instance list + */ + public List queryByProcessDefineIdAndStatus(int processDefinitionId, int[] states) { + return processInstanceMapper.queryByProcessDefineIdAndStatus(processDefinitionId, states); + } + + /** + * query process instance by processDefinitionId + * @param processDefinitionId processDefinitionId + * @param size size + * @return process instance list + */ + public List queryByProcessDefineId(int processDefinitionId,int size) { + return processInstanceMapper.queryByProcessDefineId(processDefinitionId, size); + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java new file mode 100644 index 0000000000..5a2e943370 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.QueueService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.QueueMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * queue service impl + */ +@Service +public class QueueServiceImpl extends BaseService implements QueueService { + + private static final Logger logger = LoggerFactory.getLogger(QueueServiceImpl.class); + + @Autowired + private QueueMapper queueMapper; + + @Autowired + private UserMapper userMapper; + + /** + * query queue list + * + * @param loginUser login user + * @return queue list + */ + public Map queryList(User loginUser) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + List queueList = queueMapper.selectList(null); + result.put(Constants.DATA_LIST, queueList); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * query queue list paging + * + * @param loginUser login user + * @param pageNo page number + * @param searchVal search value + * @param pageSize page size + * @return queue list + */ + public Map queryList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + Page page = new Page<>(pageNo, pageSize); + + IPage queueList = queueMapper.queryQueuePaging(page, searchVal); + + Integer count = (int) queueList.getTotal(); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotalCount(count); + pageInfo.setLists(queueList.getRecords()); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * create queue + * + * @param loginUser login user + * @param queue queue + * @param queueName queue name + * @return create result + */ + public Map createQueue(User loginUser, String queue, String queueName) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + if (StringUtils.isEmpty(queue)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE); + return result; + } + + if (StringUtils.isEmpty(queueName)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME); + return result; + } + + if (checkQueueNameExist(queueName)) { + putMsg(result, Status.QUEUE_NAME_EXIST, queueName); + return result; + } + + if (checkQueueExist(queue)) { + putMsg(result, Status.QUEUE_VALUE_EXIST, queue); + return result; + } + + Queue queueObj = new Queue(); + Date now = new Date(); + + queueObj.setQueue(queue); + queueObj.setQueueName(queueName); + queueObj.setCreateTime(now); + queueObj.setUpdateTime(now); + + queueMapper.insert(queueObj); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * update queue + * + * @param loginUser login user + * @param queue queue + * @param id queue id + * @param queueName queue name + * @return update result code + */ + public Map updateQueue(User loginUser, int id, String queue, String queueName) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + if (StringUtils.isEmpty(queue)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE); + return result; + } + + if (StringUtils.isEmpty(queueName)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME); + return result; + } + + Queue queueObj = queueMapper.selectById(id); + if (queueObj == null) { + putMsg(result, Status.QUEUE_NOT_EXIST, id); + return result; + } + + // whether queue value or queueName is changed + if (queue.equals(queueObj.getQueue()) && queueName.equals(queueObj.getQueueName())) { + putMsg(result, Status.NEED_NOT_UPDATE_QUEUE); + return result; + } + + // check queue name is exist + if (!queueName.equals(queueObj.getQueueName()) + && checkQueueNameExist(queueName)) { + putMsg(result, Status.QUEUE_NAME_EXIST, queueName); + return result; + } + + // check queue value is exist + if (!queue.equals(queueObj.getQueue()) && checkQueueExist(queue)) { + putMsg(result, Status.QUEUE_VALUE_EXIST, queue); + return result; + } + + // check old queue using by any user + if (checkIfQueueIsInUsing(queueObj.getQueueName(), queueName)) { + //update user related old queue + Integer relatedUserNums = userMapper.updateUserQueue(queueObj.getQueueName(), queueName); + logger.info("old queue have related {} user, exec update user success.", relatedUserNums); + } + + // update queue + Date now = new Date(); + queueObj.setQueue(queue); + queueObj.setQueueName(queueName); + queueObj.setUpdateTime(now); + + queueMapper.updateById(queueObj); + + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * verify queue and queueName + * + * @param queue queue + * @param queueName queue name + * @return true if the queue name not exists, otherwise return false + */ + public Result verifyQueue(String queue, String queueName) { + Result result = new Result<>(); + + if (StringUtils.isEmpty(queue)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE); + return result; + } + + if (StringUtils.isEmpty(queueName)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME); + return result; + } + + if (checkQueueNameExist(queueName)) { + putMsg(result, Status.QUEUE_NAME_EXIST, queueName); + return result; + } + + if (checkQueueExist(queue)) { + putMsg(result, Status.QUEUE_VALUE_EXIST, queue); + return result; + } + + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * check queue exist + * if exists return true,not exists return false + * check queue exist + * + * @param queue queue + * @return true if the queue not exists, otherwise return false + */ + private boolean checkQueueExist(String queue) { + return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(queue, null)); + } + + /** + * check queue name exist + * if exists return true,not exists return false + * + * @param queueName queue name + * @return true if the queue name not exists, otherwise return false + */ + private boolean checkQueueNameExist(String queueName) { + return CollectionUtils.isNotEmpty(queueMapper.queryAllQueueList(null, queueName)); + } + + /** + * check old queue name using by any user + * if need to update user + * + * @param oldQueue old queue name + * @param newQueue new queue name + * @return true if need to update user + */ + private boolean checkIfQueueIsInUsing (String oldQueue, String newQueue) { + return !oldQueue.equals(newQueue) && CollectionUtils.isNotEmpty(userMapper.queryUserListByQueue(oldQueue)); + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java index b155d5959a..02cbd79965 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/MonitorServiceTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.MonitorServiceImpl; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.model.Server; @@ -43,7 +44,8 @@ public class MonitorServiceTest { private static final Logger logger = LoggerFactory.getLogger(MonitorServiceTest.class); @InjectMocks - private MonitorService monitorService; + private MonitorServiceImpl monitorService; + @Mock private MonitorDBDao monitorDBDao; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index de23d7570e..e39f9792d0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; +import org.apache.dolphinscheduler.api.service.impl.ProcessInstanceServiceImpl; import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -68,7 +69,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; public class ProcessInstanceServiceTest { @InjectMocks - ProcessInstanceService processInstanceService; + ProcessInstanceServiceImpl processInstanceService; @Mock ProjectMapper projectMapper; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java index dbae95b181..5bb44a2af3 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.QueueServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -50,7 +51,7 @@ public class QueueServiceTest { private static final Logger logger = LoggerFactory.getLogger(QueueServiceTest.class); @InjectMocks - private QueueService queueService; + private QueueServiceImpl queueService; @Mock private QueueMapper queueMapper; @Mock diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index d2855e0067..0297ae6534 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -752,9 +752,17 @@ public final class Constants { public static final String SUBTRACT_STRING = "-"; public static final String GLOBAL_PARAMS = "globalParams"; public static final String LOCAL_PARAMS = "localParams"; + public static final String LOCAL_PARAMS_LIST = "localParamsList"; + public static final String SUBPROCESS_INSTANCE_ID = "subProcessInstanceId"; public static final String PROCESS_INSTANCE_STATE = "processInstanceState"; + public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance"; + public static final String TASK_TYPE = "taskType"; public static final String TASK_LIST = "taskList"; public static final String RWXR_XR_X = "rwxr-xr-x"; + public static final String QUEUE = "queue"; + public static final String QUEUE_NAME = "queueName"; + public static final int LOG_QUERY_SKIP_LINE_NUMBER = 0; + public static final int LOG_QUERY_LIMIT = 4096; /** * master/worker server use for zk @@ -1010,11 +1018,13 @@ public final class Constants { */ public static final String PLUGIN_JAR_SUFFIX = ".jar"; - public static final int NORAML_NODE_STATUS = 0; + public static final int NORMAL_NODE_STATUS = 0; public static final int ABNORMAL_NODE_STATUS = 1; public static final String START_TIME = "start time"; public static final String END_TIME = "end time"; + public static final String START_END_DATE = "startDate,endDate"; + /** * system line separator */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index bd8c79cce9..b89d85126f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -19,12 +19,13 @@ package org.apache.dolphinscheduler.server.registry; import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; -import java.util.Date; -import java.util.Set; - import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; + +import java.util.Date; +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +57,7 @@ public class HeartBeatTask extends Thread { double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double loadAverage = OSUtils.loadAverage(); - int status = Constants.NORAML_NODE_STATUS; + int status = Constants.NORMAL_NODE_STATUS; if (availablePhysicalMemorySize < reservedMemory || loadAverage > maxCpuloadAvg) {