diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java deleted file mode 100644 index edc115b3d4..0000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/BaseDAGService.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.api.service; - -import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.process.ProcessDag; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.ProcessData; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.utils.DagHelper; - -import java.util.List; - -/** - * base DAG service - */ -public class BaseDAGService extends BaseService{ - - - /** - * process instance to DAG - * - * @param processInstance input process instance - * @return process instance dag. - */ - public static DAG processInstance2DAG(ProcessInstance processInstance) { - - String processDefinitionJson = processInstance.getProcessInstanceJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - List taskNodeList = processData.getTasks(); - - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - - return DagHelper.buildDagGraph(processDag); - } -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index b3d56e5982..6a4eb974ce 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -16,90 +16,20 @@ */ package org.apache.dolphinscheduler.api.service; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.dolphinscheduler.api.dto.ProcessMeta; -import org.apache.dolphinscheduler.api.dto.treeview.Instance; -import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.CheckUtils; -import org.apache.dolphinscheduler.api.utils.FileUtils; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam; -import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; -import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.common.process.ProcessDag; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.task.AbstractParameters; -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; -import org.apache.dolphinscheduler.dao.utils.DagHelper; -import org.apache.dolphinscheduler.service.permission.PermissionCheck; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.MediaType; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import org.springframework.web.multipart.MultipartFile; +import java.util.Map; -import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID; +import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.User; +import org.springframework.web.multipart.MultipartFile; + +import com.fasterxml.jackson.core.JsonProcessingException; /** * process definition service */ -@Service -public class ProcessDefinitionService extends BaseDAGService { - - private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionService.class); - - private static final String PROCESSDEFINITIONID = "processDefinitionId"; - - private static final String RELEASESTATE = "releaseState"; - - private static final String TASKS = "tasks"; - - @Autowired - private ProjectMapper projectMapper; - - @Autowired - private ProjectService projectService; - - @Autowired - private ProcessDefinitionMapper processDefineMapper; - - @Autowired - private ProcessInstanceMapper processInstanceMapper; - - - @Autowired - private TaskInstanceMapper taskInstanceMapper; - - @Autowired - private ScheduleMapper scheduleMapper; - - @Autowired - private ProcessService processService; +public interface ProcessDefinitionService { /** * create process definition @@ -114,92 +44,13 @@ public class ProcessDefinitionService extends BaseDAGService { * @return create result code * @throws JsonProcessingException JsonProcessingException */ - public Map createProcessDefinition(User loginUser, - String projectName, - String name, - String processDefinitionJson, - String desc, - String locations, - String connects) throws JsonProcessingException { - - Map result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - // check project auth - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return checkResult; - } - - ProcessDefinition processDefine = new ProcessDefinition(); - Date now = new Date(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); - if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { - return checkProcessJson; - } - - processDefine.setName(name); - processDefine.setReleaseState(ReleaseState.OFFLINE); - processDefine.setProjectId(project.getId()); - processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(processDefinitionJson); - processDefine.setDescription(desc); - processDefine.setLocations(locations); - processDefine.setConnects(connects); - processDefine.setTimeout(processData.getTimeout()); - processDefine.setTenantId(processData.getTenantId()); - processDefine.setModifyBy(loginUser.getUserName()); - processDefine.setResourceIds(getResourceIds(processData)); - - //custom global params - List globalParamsList = processData.getGlobalParams(); - if (CollectionUtils.isNotEmpty(globalParamsList)) { - Set globalParamsSet = new HashSet<>(globalParamsList); - globalParamsList = new ArrayList<>(globalParamsSet); - processDefine.setGlobalParamList(globalParamsList); - } - processDefine.setCreateTime(now); - processDefine.setUpdateTime(now); - processDefine.setFlag(Flag.YES); - processDefineMapper.insert(processDefine); - - // return processDefinition object with ID - result.put(Constants.DATA_LIST, processDefineMapper.selectById(processDefine.getId())); - putMsg(result, Status.SUCCESS); - result.put("processDefinitionId", processDefine.getId()); - return result; - } - - /** - * get resource ids - * - * @param processData process data - * @return resource ids - */ - private String getResourceIds(ProcessData processData) { - List tasks = processData.getTasks(); - Set resourceIds = new HashSet<>(); - for (TaskNode taskNode : tasks) { - String taskParameter = taskNode.getParams(); - AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); - if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { - Set tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet()); - resourceIds.addAll(tempSet); - } - } - - StringBuilder sb = new StringBuilder(); - for (int i : resourceIds) { - if (sb.length() > 0) { - sb.append(","); - } - sb.append(i); - } - return sb.toString(); - } - + Map createProcessDefinition(User loginUser, + String projectName, + String name, + String processDefinitionJson, + String desc, + String locations, + String connects) throws JsonProcessingException; /** * query process definition list @@ -208,24 +59,8 @@ public class ProcessDefinitionService extends BaseDAGService { * @param projectName project name * @return definition list */ - public Map queryProcessDefinitionList(User loginUser, String projectName) { - - HashMap result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return checkResult; - } - - List resourceList = processDefineMapper.queryAllDefinitionList(project.getId()); - result.put(Constants.DATA_LIST, resourceList); - putMsg(result, Status.SUCCESS); - - return result; - } - + Map queryProcessDefinitionList(User loginUser, + String projectName); /** * query process definition list paging @@ -238,29 +73,12 @@ public class ProcessDefinitionService extends BaseDAGService { * @param userId user id * @return process definition page */ - public Map queryProcessDefinitionListPaging(User loginUser, String projectName, String searchVal, Integer pageNo, Integer pageSize, Integer userId) { - - Map result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return checkResult; - } - - Page page = new Page(pageNo, pageSize); - IPage processDefinitionIPage = processDefineMapper.queryDefineListPaging( - page, searchVal, userId, project.getId(), isAdmin(loginUser)); - - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - pageInfo.setTotalCount((int) processDefinitionIPage.getTotal()); - pageInfo.setLists(processDefinitionIPage.getRecords()); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryProcessDefinitionListPaging(User loginUser, + String projectName, + String searchVal, + Integer pageNo, + Integer pageSize, + Integer userId); /** * query datail of process definition @@ -270,27 +88,9 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processId process definition id * @return process definition detail */ - public Map queryProcessDefinitionById(User loginUser, String projectName, Integer processId) { - - - Map result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return checkResult; - } - - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); - } else { - result.put(Constants.DATA_LIST, processDefinition); - putMsg(result, Status.SUCCESS); - } - return result; - } + Map queryProcessDefinitionById(User loginUser, + String projectName, + Integer processId); /** * copy process definition @@ -300,32 +100,9 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processId process definition id * @return copy result code */ - public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException { - - Map result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return checkResult; - } - - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); - return result; - } else { - return createProcessDefinition( - loginUser, - projectName, - processDefinition.getName() + "_copy_" + System.currentTimeMillis(), - processDefinition.getProcessDefinitionJson(), - processDefinition.getDescription(), - processDefinition.getLocations(), - processDefinition.getConnects()); - } - } + Map copyProcessDefinition(User loginUser, + String projectName, + Integer processId) throws JsonProcessingException; /** * update process definition @@ -340,68 +117,12 @@ public class ProcessDefinitionService extends BaseDAGService { * @param connects connects for nodes * @return update result code */ - public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, - String processDefinitionJson, String desc, - String locations, String connects) { - Map result = new HashMap<>(5); - - Project project = projectMapper.queryByName(projectName); - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - if (resultStatus != Status.SUCCESS) { - return checkResult; - } - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); - if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) { - return checkProcessJson; - } - ProcessDefinition processDefine = processService.findProcessDefineById(id); - if (processDefine == null) { - // check process definition exists - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); - return result; - } else if (processDefine.getReleaseState() == ReleaseState.ONLINE) { - // online can not permit edit - putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefine.getName()); - return result; - } else { - putMsg(result, Status.SUCCESS); - } - - Date now = new Date(); - - processDefine.setId(id); - processDefine.setName(name); - processDefine.setReleaseState(ReleaseState.OFFLINE); - processDefine.setProjectId(project.getId()); - processDefine.setProcessDefinitionJson(processDefinitionJson); - processDefine.setDescription(desc); - processDefine.setLocations(locations); - processDefine.setConnects(connects); - processDefine.setTimeout(processData.getTimeout()); - processDefine.setTenantId(processData.getTenantId()); - processDefine.setModifyBy(loginUser.getUserName()); - processDefine.setResourceIds(getResourceIds(processData)); - - //custom global params - List globalParamsList = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) { - Set userDefParamsSet = new HashSet<>(processData.getGlobalParams()); - globalParamsList = new ArrayList<>(userDefParamsSet); - } - processDefine.setGlobalParamList(globalParamsList); - processDefine.setUpdateTime(now); - processDefine.setFlag(Flag.YES); - if (processDefineMapper.updateById(processDefine) > 0) { - putMsg(result, Status.SUCCESS); - - } else { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - } - return result; - } + Map updateProcessDefinition(User loginUser, + String projectName, + int id, + String name, + String processDefinitionJson, String desc, + String locations, String connects); /** * verify process definition name unique @@ -411,24 +132,9 @@ public class ProcessDefinitionService extends BaseDAGService { * @param name name * @return true if process definition name not exists, otherwise false */ - public Map verifyProcessDefinitionName(User loginUser, String projectName, String name) { - - Map result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name); - if (processDefinition == null) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.PROCESS_INSTANCE_EXIST, name); - } - return result; - } + Map verifyProcessDefinitionName(User loginUser, + String projectName, + String name); /** * delete process definition by id @@ -438,62 +144,9 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processDefinitionId process definition id * @return delete result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Map deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) { - - Map result = new HashMap<>(5); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); - - if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); - return result; - } - - // Determine if the login user is the owner of the process definition - if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; - } - - // check process definition is already online - if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); - return result; - } - - // get the timing according to the process definition - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); - if (!schedules.isEmpty() && schedules.size() > 1) { - logger.warn("scheduler num is {},Greater than 1", schedules.size()); - putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); - return result; - } else if (schedules.size() == 1) { - Schedule schedule = schedules.get(0); - if (schedule.getReleaseState() == ReleaseState.OFFLINE) { - scheduleMapper.deleteById(schedule.getId()); - } else if (schedule.getReleaseState() == ReleaseState.ONLINE) { - putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); - return result; - } - } - - int delete = processDefineMapper.deleteById(processDefinitionId); - - if (delete > 0) { - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); - } - return result; - } + Map deleteProcessDefinitionById(User loginUser, + String projectName, + Integer processDefinitionId); /** * release process definition: online / offline @@ -504,244 +157,23 @@ public class ProcessDefinitionService extends BaseDAGService { * @param releaseState release state * @return release result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Map releaseProcessDefinition(User loginUser, String projectName, int id, int releaseState) { - HashMap result = new HashMap<>(); - Project project = projectMapper.queryByName(projectName); - - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultEnum = (Status) checkResult.get(Constants.STATUS); - if (resultEnum != Status.SUCCESS) { - return checkResult; - } - - ReleaseState state = ReleaseState.getEnum(releaseState); - - // check state - if (null == state) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); - return result; - } - - ProcessDefinition processDefinition = processDefineMapper.selectById(id); - - switch (state) { - case ONLINE: - // To check resources whether they are already cancel authorized or deleted - String resourceIds = processDefinition.getResourceIds(); - if (StringUtils.isNotBlank(resourceIds)) { - Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); - PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); - try { - permissionCheck.checkPermission(); - } catch (Exception e) { - logger.error(e.getMessage(), e); - putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, RELEASESTATE); - return result; - } - } - - processDefinition.setReleaseState(state); - processDefineMapper.updateById(processDefinition); - break; - case OFFLINE: - processDefinition.setReleaseState(state); - processDefineMapper.updateById(processDefinition); - List scheduleList = scheduleMapper.selectAllByProcessDefineArray( - new int[]{processDefinition.getId()} - ); - - for (Schedule schedule : scheduleList) { - logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); - // set status - schedule.setReleaseState(ReleaseState.OFFLINE); - scheduleMapper.updateById(schedule); - SchedulerService.deleteSchedule(project.getId(), schedule.getId()); - } - break; - default: - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); - return result; - } - - putMsg(result, Status.SUCCESS); - return result; - } + Map releaseProcessDefinition(User loginUser, + String projectName, + int id, + int releaseState); /** * batch export process definition by ids * - * @param loginUser - * @param projectName - * @param processDefinitionIds - * @param response + * @param loginUser login user + * @param projectName project name + * @param processDefinitionIds process definition ids + * @param response http servlet response */ - public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response) { - - if (StringUtils.isEmpty(processDefinitionIds)) { - return; - } - - //export project info - Project project = projectMapper.queryByName(projectName); - - //check user access for project - Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); - Status resultStatus = (Status) checkResult.get(Constants.STATUS); - - if (resultStatus != Status.SUCCESS) { - return; - } - - List processDefinitionList = - getProcessDefinitionList(processDefinitionIds); - - if (CollectionUtils.isNotEmpty(processDefinitionList)) { - downloadProcessDefinitionFile(response, processDefinitionList); - } - } - - /** - * get process definition list by ids - * - * @param processDefinitionIds - * @return - */ - private List getProcessDefinitionList(String processDefinitionIds) { - List processDefinitionList = new ArrayList<>(); - String[] processDefinitionIdArray = processDefinitionIds.split(","); - for (String strProcessDefinitionId : processDefinitionIdArray) { - //get workflow info - int processDefinitionId = Integer.parseInt(strProcessDefinitionId); - ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); - if (null != processDefinition) { - processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); - } - } - - return processDefinitionList; - } - - /** - * download the process definition file - * - * @param response - * @param processDefinitionList - */ - private void downloadProcessDefinitionFile(HttpServletResponse response, List processDefinitionList) { - response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); - BufferedOutputStream buff = null; - ServletOutputStream out = null; - try { - out = response.getOutputStream(); - buff = new BufferedOutputStream(out); - buff.write(JSONUtils.toJsonString(processDefinitionList).getBytes(StandardCharsets.UTF_8)); - buff.flush(); - buff.close(); - } catch (IOException e) { - logger.warn("export process fail", e); - } finally { - if (null != buff) { - try { - buff.close(); - } catch (Exception e) { - logger.warn("export process buffer not close", e); - } - } - if (null != out) { - try { - out.close(); - } catch (Exception e) { - logger.warn("export process output stream not close", e); - } - } - } - } - - /** - * get export process metadata string - * - * @param processDefinitionId process definition id - * @param processDefinition process definition - * @return export process metadata string - */ - public String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { - //create workflow json file - return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); - } - - /** - * get export process metadata string - * - * @param processDefinitionId process definition id - * @param processDefinition process definition - * @return export process metadata string - */ - public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { - //correct task param which has data source or dependent param - String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); - processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); - - //export process metadata - ProcessMeta exportProcessMeta = new ProcessMeta(); - exportProcessMeta.setProjectName(processDefinition.getProjectName()); - exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); - exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); - exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); - exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); - - //schedule info - List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); - if (!schedules.isEmpty()) { - Schedule schedule = schedules.get(0); - exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); - exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); - exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); - exportProcessMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime())); - exportProcessMeta.setScheduleCrontab(schedule.getCrontab()); - exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy())); - exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE)); - exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority())); - exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup()); - } - //create workflow json file - return exportProcessMeta; - } - - /** - * correct task param which has datasource or dependent - * - * @param processDefinitionJson processDefinitionJson - * @return correct processDefinitionJson - */ - public String addExportTaskNodeSpecialParam(String processDefinitionJson) { - ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); - ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); - - for (int i = 0; i < jsonArray.size(); i++) { - JsonNode taskNode = jsonArray.path(i); - if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { - String taskType = taskNode.path("type").asText(); - - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - if (null != addTaskParam) { - addTaskParam.addExportSpecialParam(taskNode); - } - } - } - jsonObject.set(TASKS, jsonArray); - return jsonObject.toString(); - } - - /** - * check task if has sub process - * - * @param taskType task type - * @return if task has sub process return true else false - */ - private boolean checkTaskHasSubProcess(String taskType) { - return taskType.equals(TaskType.SUB_PROCESS.name()); - } + void batchExportProcessDefinitionByIds(User loginUser, + String projectName, + String processDefinitionIds, + HttpServletResponse response); /** * import process definition @@ -751,357 +183,9 @@ public class ProcessDefinitionService extends BaseDAGService { * @param currentProjectName current project name * @return import process */ - @Transactional(rollbackFor = RuntimeException.class) - public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { - Map result = new HashMap<>(5); - String processMetaJson = FileUtils.file2String(file); - List processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class); - - //check file content - if (CollectionUtils.isEmpty(processMetaList)) { - putMsg(result, Status.DATA_IS_NULL, "fileContent"); - return result; - } - - for (ProcessMeta processMeta : processMetaList) { - - if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)) { - return result; - } - } - - return result; - } - - /** - * check and import process definition - * - * @param loginUser - * @param currentProjectName - * @param result - * @param processMeta - * @return - */ - private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map result, ProcessMeta processMeta) { - - if (!checkImportanceParams(processMeta, result)) { - return false; - } - - //deal with process name - String processDefinitionName = processMeta.getProcessDefinitionName(); - //use currentProjectName to query - Project targetProject = projectMapper.queryByName(currentProjectName); - if (null != targetProject) { - processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), - processDefinitionName, 1); - } - - //unique check - Map checkResult = verifyProcessDefinitionName(loginUser, currentProjectName, processDefinitionName); - Status status = (Status) checkResult.get(Constants.STATUS); - if (Status.SUCCESS.equals(status)) { - putMsg(result, Status.SUCCESS); - } else { - result.putAll(checkResult); - return false; - } - - // get create process result - Map createProcessResult = - getCreateProcessResult(loginUser, - currentProjectName, - result, - processMeta, - processDefinitionName, - addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); - - if (createProcessResult == null) { - return false; - } - - //create process definition - Integer processDefinitionId = - Objects.isNull(createProcessResult.get(PROCESSDEFINITIONID)) ? - null : Integer.parseInt(createProcessResult.get(PROCESSDEFINITIONID).toString()); - - //scheduler param - return getImportProcessScheduleResult(loginUser, - currentProjectName, - result, - processMeta, - processDefinitionName, - processDefinitionId); - - } - - /** - * get create process result - * - * @param loginUser - * @param currentProjectName - * @param result - * @param processMeta - * @param processDefinitionName - * @param importProcessParam - * @return - */ - private Map getCreateProcessResult(User loginUser, - String currentProjectName, - Map result, - ProcessMeta processMeta, - String processDefinitionName, - String importProcessParam) { - Map createProcessResult = null; - try { - createProcessResult = createProcessDefinition(loginUser - , currentProjectName, - processDefinitionName + "_import_" + System.currentTimeMillis(), - importProcessParam, - processMeta.getProcessDefinitionDescription(), - processMeta.getProcessDefinitionLocations(), - processMeta.getProcessDefinitionConnects()); - putMsg(result, Status.SUCCESS); - } catch (JsonProcessingException e) { - logger.error("import process meta json data: {}", e.getMessage(), e); - putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); - } - - return createProcessResult; - } - - /** - * get import process schedule result - * - * @param loginUser - * @param currentProjectName - * @param result - * @param processMeta - * @param processDefinitionName - * @param processDefinitionId - * @return - */ - private boolean getImportProcessScheduleResult(User loginUser, - String currentProjectName, - Map result, - ProcessMeta processMeta, - String processDefinitionName, - Integer processDefinitionId) { - if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) { - int scheduleInsert = importProcessSchedule(loginUser, - currentProjectName, - processMeta, - processDefinitionName, - processDefinitionId); - - if (0 == scheduleInsert) { - putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); - return false; - } - } - return true; - } - - /** - * check importance params - * - * @param processMeta - * @param result - * @return - */ - private boolean checkImportanceParams(ProcessMeta processMeta, Map result) { - if (StringUtils.isEmpty(processMeta.getProjectName())) { - putMsg(result, Status.DATA_IS_NULL, "projectName"); - return false; - } - if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); - return false; - } - if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { - putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); - return false; - } - - return true; - } - - /** - * import process add special task param - * - * @param loginUser login user - * @param processDefinitionJson process definition json - * @param targetProject target project - * @return import process param - */ - private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { - ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); - ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); - //add sql and dependent param - for (int i = 0; i < jsonArray.size(); i++) { - JsonNode taskNode = jsonArray.path(i); - String taskType = taskNode.path("type").asText(); - ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); - if (null != addTaskParam) { - addTaskParam.addImportSpecialParam(taskNode); - } - } - - //recursive sub-process parameter correction map key for old process id value for new process id - Map subProcessIdMap = new HashMap<>(20); - - List subProcessList = StreamUtils.asStream(jsonArray.elements()) - .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) - .collect(Collectors.toList()); - - if (CollectionUtils.isNotEmpty(subProcessList)) { - importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); - } - - jsonObject.set(TASKS, jsonArray); - return jsonObject.toString(); - } - - /** - * import process schedule - * - * @param loginUser login user - * @param currentProjectName current project name - * @param processMeta process meta data - * @param processDefinitionName process definition name - * @param processDefinitionId process definition id - * @return insert schedule flag - */ - public int importProcessSchedule(User loginUser, String currentProjectName, ProcessMeta processMeta, - String processDefinitionName, Integer processDefinitionId) { - Date now = new Date(); - Schedule scheduleObj = new Schedule(); - scheduleObj.setProjectName(currentProjectName); - scheduleObj.setProcessDefinitionId(processDefinitionId); - scheduleObj.setProcessDefinitionName(processDefinitionName); - scheduleObj.setCreateTime(now); - scheduleObj.setUpdateTime(now); - scheduleObj.setUserId(loginUser.getId()); - scheduleObj.setUserName(loginUser.getUserName()); - - scheduleObj.setCrontab(processMeta.getScheduleCrontab()); - - if (null != processMeta.getScheduleStartTime()) { - scheduleObj.setStartTime(DateUtils.stringToDate(processMeta.getScheduleStartTime())); - } - if (null != processMeta.getScheduleEndTime()) { - scheduleObj.setEndTime(DateUtils.stringToDate(processMeta.getScheduleEndTime())); - } - if (null != processMeta.getScheduleWarningType()) { - scheduleObj.setWarningType(WarningType.valueOf(processMeta.getScheduleWarningType())); - } - if (null != processMeta.getScheduleWarningGroupId()) { - scheduleObj.setWarningGroupId(processMeta.getScheduleWarningGroupId()); - } - if (null != processMeta.getScheduleFailureStrategy()) { - scheduleObj.setFailureStrategy(FailureStrategy.valueOf(processMeta.getScheduleFailureStrategy())); - } - if (null != processMeta.getScheduleReleaseState()) { - scheduleObj.setReleaseState(ReleaseState.valueOf(processMeta.getScheduleReleaseState())); - } - if (null != processMeta.getScheduleProcessInstancePriority()) { - scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority())); - } - - if (null != processMeta.getScheduleWorkerGroupName()) { - scheduleObj.setWorkerGroup(processMeta.getScheduleWorkerGroupName()); - } - - return scheduleMapper.insert(scheduleObj); - } - - /** - * check import process has sub process - * recursion create sub process - * - * @param loginUser login user - * @param targetProject target project - * @param jsonArray process task array - * @param subProcessIdMap correct sub process id map - */ - public void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map subProcessIdMap) { - for (int i = 0; i < jsonArray.size(); i++) { - ObjectNode taskNode = (ObjectNode) jsonArray.path(i); - String taskType = taskNode.path("type").asText(); - - if (!checkTaskHasSubProcess(taskType)) { - continue; - } - //get sub process info - ObjectNode subParams = (ObjectNode) taskNode.path("params"); - Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); - ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); - //check is sub process exist in db - if (null == subProcess) { - continue; - } - String subProcessJson = subProcess.getProcessDefinitionJson(); - //check current project has sub process - ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); - - if (null == currentProjectSubProcess) { - ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); - - List subProcessList = StreamUtils.asStream(subJsonArray.elements()) - .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) - .collect(Collectors.toList()); - - if (CollectionUtils.isNotEmpty(subProcessList)) { - importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); - //sub process processId correct - if (!subProcessIdMap.isEmpty()) { - - for (Map.Entry entry : subProcessIdMap.entrySet()) { - String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); - String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); - subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); - } - - subProcessIdMap.clear(); - } - } - - //if sub-process recursion - Date now = new Date(); - //create sub process in target project - ProcessDefinition processDefine = new ProcessDefinition(); - processDefine.setName(subProcess.getName()); - processDefine.setVersion(subProcess.getVersion()); - processDefine.setReleaseState(subProcess.getReleaseState()); - processDefine.setProjectId(targetProject.getId()); - processDefine.setUserId(loginUser.getId()); - processDefine.setProcessDefinitionJson(subProcessJson); - processDefine.setDescription(subProcess.getDescription()); - processDefine.setLocations(subProcess.getLocations()); - processDefine.setConnects(subProcess.getConnects()); - processDefine.setTimeout(subProcess.getTimeout()); - processDefine.setTenantId(subProcess.getTenantId()); - processDefine.setGlobalParams(subProcess.getGlobalParams()); - processDefine.setCreateTime(now); - processDefine.setUpdateTime(now); - processDefine.setFlag(subProcess.getFlag()); - processDefine.setReceivers(subProcess.getReceivers()); - processDefine.setReceiversCc(subProcess.getReceiversCc()); - processDefineMapper.insert(processDefine); - - logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); - - //modify task node - ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); - - if (null != newSubProcessDefine) { - subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); - subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); - taskNode.set("params", subParams); - } - } - } - } + Map importProcessDefinition(User loginUser, + MultipartFile file, + String currentProjectName); /** @@ -1111,50 +195,8 @@ public class ProcessDefinitionService extends BaseDAGService { * @param processDefinitionJson process definition json * @return check result code */ - public Map checkProcessNodeList(ProcessData processData, String processDefinitionJson) { - - Map result = new HashMap<>(5); - try { - if (processData == null) { - logger.error("process data is null"); - putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); - return result; - } - - // Check whether the task node is normal - List taskNodes = processData.getTasks(); - - if (taskNodes == null) { - logger.error("process node info is empty"); - putMsg(result, Status.DATA_IS_NULL, processDefinitionJson); - return result; - } - - // check has cycle - if (graphHasCycle(taskNodes)) { - logger.error("process DAG has cycle"); - putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); - return result; - } - - // check whether the process definition json is normal - for (TaskNode taskNode : taskNodes) { - if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getType())) { - logger.error("task node {} parameter invalid", taskNode.getName()); - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); - return result; - } - - // check extra params - CheckUtils.checkOtherParams(taskNode.getExtras()); - } - putMsg(result, Status.SUCCESS); - } catch (Exception e) { - result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); - result.put(Constants.MSG, e.getMessage()); - } - return result; - } + Map checkProcessNodeList(ProcessData processData, + String processDefinitionJson); /** * get task node details based on process definition @@ -1162,36 +204,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param defineId define id * @return task node list */ - public Map getTaskNodeListByDefinitionId(Integer defineId) { - Map result = new HashMap<>(); - - ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); - if (processDefinition == null) { - logger.info("process define not exists"); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId); - return result; - } - - - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - //process data check - if (null == processData) { - logger.error("process data is null"); - putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); - return result; - } - - List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - - result.put(Constants.DATA_LIST, taskNodeList); - putMsg(result, Status.SUCCESS); - - return result; - - } + Map getTaskNodeListByDefinitionId(Integer defineId); /** * get task node details based on process definition @@ -1199,36 +212,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param defineIdList define id list * @return task node list */ - public Map getTaskNodeListByDefinitionIdList(String defineIdList) { - Map result = new HashMap<>(); - - Map> taskNodeMap = new HashMap<>(); - String[] idList = defineIdList.split(","); - List idIntList = new ArrayList<>(); - for (String definitionId : idList) { - idIntList.add(Integer.parseInt(definitionId)); - } - Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]); - List processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray); - if (CollectionUtils.isEmpty(processDefinitionList)) { - logger.info("process definition not exists"); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); - return result; - } - - for (ProcessDefinition processDefinition : processDefinitionList) { - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); - taskNodeMap.put(processDefinition.getId(), taskNodeList); - } - - result.put(Constants.DATA_LIST, taskNodeMap); - putMsg(result, Status.SUCCESS); - - return result; - - } + Map getTaskNodeListByDefinitionIdList(String defineIdList); /** @@ -1237,16 +221,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @param projectId project id * @return process definitions in the project */ - public Map queryProcessDefinitionAllByProjectId(Integer projectId) { - - HashMap result = new HashMap<>(5); - - List resourceList = processDefineMapper.queryAllDefinitionList(projectId); - result.put(Constants.DATA_LIST, resourceList); - putMsg(result, Status.SUCCESS); - - return result; - } + Map queryProcessDefinitionAllByProjectId(Integer projectId); /** * Encapsulates the TreeView structure @@ -1256,200 +231,7 @@ public class ProcessDefinitionService extends BaseDAGService { * @return tree view json data * @throws Exception exception */ - public Map viewTree(Integer processId, Integer limit) throws Exception { - Map result = new HashMap<>(); - - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); - if (null == processDefinition) { - logger.info("process define not exists"); - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); - return result; - } - DAG dag = genDagGraph(processDefinition); - /** - * nodes that is running - */ - Map> runningNodeMap = new ConcurrentHashMap<>(); - - /** - * nodes that is waiting torun - */ - Map> waitingRunningNodeMap = new ConcurrentHashMap<>(); - - /** - * List of process instances - */ - List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); - - for (ProcessInstance processInstance : processInstanceList) { - processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); - } - - if (limit > processInstanceList.size()) { - limit = processInstanceList.size(); - } - - TreeViewDto parentTreeViewDto = new TreeViewDto(); - parentTreeViewDto.setName("DAG"); - parentTreeViewDto.setType(""); - // Specify the process definition, because it is a TreeView for a process definition - - for (int i = limit - 1; i >= 0; i--) { - ProcessInstance processInstance = processInstanceList.get(i); - - Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); - parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", processInstance.getState().toString() - , processInstance.getStartTime(), endTime, processInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); - } - - List parentTreeViewDtoList = new ArrayList<>(); - parentTreeViewDtoList.add(parentTreeViewDto); - // Here is the encapsulation task instance - for (String startNode : dag.getBeginNode()) { - runningNodeMap.put(startNode, parentTreeViewDtoList); - } - - while (Stopper.isRunning()) { - Set postNodeList = null; - Iterator>> iter = runningNodeMap.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry> en = iter.next(); - String nodeName = en.getKey(); - parentTreeViewDtoList = en.getValue(); - - TreeViewDto treeViewDto = new TreeViewDto(); - treeViewDto.setName(nodeName); - TaskNode taskNode = dag.getNode(nodeName); - treeViewDto.setType(taskNode.getType()); - - - //set treeViewDto instances - for (int i = limit - 1; i >= 0; i--) { - ProcessInstance processInstance = processInstanceList.get(i); - TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); - if (taskInstance == null) { - treeViewDto.getInstances().add(new Instance(-1, "not running", "null")); - } else { - Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); - Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); - - int subProcessId = 0; - /** - * if process is sub process, the return sub id, or sub id=0 - */ - if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) { - String taskJson = taskInstance.getTaskJson(); - taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); - subProcessId = Integer.parseInt(JSONUtils.parseObject( - taskNode.getParams()).path(CMDPARAM_SUB_PROCESS_DEFINE_ID).asText()); - } - treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString() - , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); - } - } - for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { - pTreeViewDto.getChildren().add(treeViewDto); - } - postNodeList = dag.getSubsequentNodes(nodeName); - if (CollectionUtils.isNotEmpty(postNodeList)) { - for (String nextNodeName : postNodeList) { - List treeViewDtoList = waitingRunningNodeMap.get(nextNodeName); - if (CollectionUtils.isNotEmpty(treeViewDtoList)) { - treeViewDtoList.add(treeViewDto); - waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); - } else { - treeViewDtoList = new ArrayList<>(); - treeViewDtoList.add(treeViewDto); - waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); - } - } - } - runningNodeMap.remove(nodeName); - } - if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) { - break; - } else { - runningNodeMap.putAll(waitingRunningNodeMap); - waitingRunningNodeMap.clear(); - } - } - result.put(Constants.DATA_LIST, parentTreeViewDto); - result.put(Constants.STATUS, Status.SUCCESS); - result.put(Constants.MSG, Status.SUCCESS.getMsg()); - return result; - } - - - /** - * Generate the DAG Graph based on the process definition id - * - * @param processDefinition process definition - * @return dag graph - */ - private DAG genDagGraph(ProcessDefinition processDefinition) { - - String processDefinitionJson = processDefinition.getProcessDefinitionJson(); - - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - - //check process data - if (null != processData) { - List taskNodeList = processData.getTasks(); - processDefinition.setGlobalParamList(processData.getGlobalParams()); - ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); - - // Generate concrete Dag to be executed - return DagHelper.buildDagGraph(processDag); - } - - return new DAG<>(); - } - - - /** - * whether the graph has a ring - * - * @param taskNodeResponseList task node response list - * @return if graph has cycle flag - */ - private boolean graphHasCycle(List taskNodeResponseList) { - DAG graph = new DAG<>(); - - // Fill the vertices - for (TaskNode taskNodeResponse : taskNodeResponseList) { - graph.addNode(taskNodeResponse.getName(), taskNodeResponse); - } - - // Fill edge relations - for (TaskNode taskNodeResponse : taskNodeResponseList) { - taskNodeResponse.getPreTasks(); - List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); - if (CollectionUtils.isNotEmpty(preTasks)) { - for (String preTask : preTasks) { - if (!graph.addEdge(preTask, taskNodeResponse.getName())) { - return true; - } - } - } - } - - return graph.hasCycle(); - } - - private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) { - ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); - if (processDefinition != null) { - if (num > 1) { - String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); - processDefinitionName = str + "(" + num + ")"; - } else { - processDefinitionName = processDefinition.getName() + "(" + num + ")"; - } - } else { - return processDefinitionName; - } - return recursionProcessDefinitionName(projectId, processDefinitionName, num + 1); - } - + Map viewTree(Integer processId, + Integer limit) throws Exception; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index e4a00f3895..dc3dbe976f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -16,8 +16,28 @@ */ package org.apache.dolphinscheduler.api.service; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +import static org.apache.dolphinscheduler.common.Constants.DATA_LIST; +import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT; +import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE; +import static org.apache.dolphinscheduler.common.Constants.TASK_LIST; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.enums.Status; @@ -31,14 +51,26 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ProcessDag; import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; -import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,22 +78,14 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.text.ParseException; -import java.util.*; -import java.util.stream.Collectors; - -import static org.apache.dolphinscheduler.common.Constants.*; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; /** * process instance service */ @Service -public class ProcessInstanceService extends BaseDAGService { +public class ProcessInstanceService extends BaseService { private static final Logger logger = LoggerFactory.getLogger(ProcessInstanceService.class); @@ -167,7 +191,7 @@ public class ProcessInstanceService extends BaseDAGService { ProcessDefinition processDefinition = processService.findProcessDefineById(processInstance.getProcessDefinitionId()); processInstance.setReceivers(processDefinition.getReceivers()); processInstance.setReceiversCc(processDefinition.getReceiversCc()); - result.put(Constants.DATA_LIST, processInstance); + result.put(DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); return result; @@ -242,7 +266,7 @@ public class ProcessInstanceService extends BaseDAGService { pageInfo.setTotalCount((int) processInstanceList.getTotal()); pageInfo.setLists(processInstances); - result.put(Constants.DATA_LIST, pageInfo); + result.put(DATA_LIST, pageInfo); putMsg(result, Status.SUCCESS); return result; } @@ -273,7 +297,7 @@ public class ProcessInstanceService extends BaseDAGService { Map resultMap = new HashMap<>(); resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); resultMap.put(TASK_LIST, taskInstanceList); - result.put(Constants.DATA_LIST, resultMap); + result.put(DATA_LIST, resultMap); putMsg(result, Status.SUCCESS); return result; @@ -362,7 +386,7 @@ public class ProcessInstanceService extends BaseDAGService { } Map dataMap = new HashMap<>(); dataMap.put("subProcessInstanceId", subWorkflowInstance.getId()); - result.put(Constants.DATA_LIST, dataMap); + result.put(DATA_LIST, dataMap); putMsg(result, Status.SUCCESS); return result; } @@ -501,7 +525,7 @@ public class ProcessInstanceService extends BaseDAGService { } Map dataMap = new HashMap<>(); dataMap.put("parentWorkflowInstance", parentWorkflowInstance.getId()); - result.put(Constants.DATA_LIST, dataMap); + result.put(DATA_LIST, dataMap); putMsg(result, Status.SUCCESS); return result; } @@ -618,7 +642,7 @@ public class ProcessInstanceService extends BaseDAGService { resultMap.put(GLOBAL_PARAMS, globalParams); resultMap.put(LOCAL_PARAMS, localUserDefParams); - result.put(Constants.DATA_LIST, resultMap); + result.put(DATA_LIST, resultMap); putMsg(result, Status.SUCCESS); return result; } @@ -668,9 +692,28 @@ public class ProcessInstanceService extends BaseDAGService { } ganttDto.setTasks(taskList); - result.put(Constants.DATA_LIST, ganttDto); + result.put(DATA_LIST, ganttDto); putMsg(result, Status.SUCCESS); return result; } + /** + * process instance to DAG + * + * @param processInstance input process instance + * @return process instance dag. + */ + private static DAG processInstance2DAG(ProcessInstance processInstance) { + + String processDefinitionJson = processInstance.getProcessInstanceJson(); + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + List taskNodeList = processData.getTasks(); + + ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); + + return DagHelper.buildDagGraph(processDag); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java new file mode 100644 index 0000000000..ee02940c09 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -0,0 +1,1493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import javax.servlet.ServletOutputStream; +import javax.servlet.http.HttpServletResponse; + +import org.apache.dolphinscheduler.api.dto.ProcessMeta; +import org.apache.dolphinscheduler.api.dto.treeview.Instance; +import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.SchedulerService; +import org.apache.dolphinscheduler.api.utils.CheckUtils; +import org.apache.dolphinscheduler.api.utils.FileUtils; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.exportprocess.ProcessAddTaskParam; +import org.apache.dolphinscheduler.api.utils.exportprocess.TaskNodeParamFactory; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.graph.DAG; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.model.TaskNodeRelation; +import org.apache.dolphinscheduler.common.process.ProcessDag; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StreamUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.utils.DagHelper; +import org.apache.dolphinscheduler.service.permission.PermissionCheck; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.web.multipart.MultipartFile; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * process definition service + */ +@Service +public class ProcessDefinitionServiceImpl extends BaseService implements + ProcessDefinitionService { + + private static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionServiceImpl.class); + + private static final String PROCESSDEFINITIONID = "processDefinitionId"; + + private static final String RELEASESTATE = "releaseState"; + + private static final String TASKS = "tasks"; + + @Autowired + private ProjectMapper projectMapper; + + @Autowired + private ProjectService projectService; + + @Autowired + private ProcessDefinitionMapper processDefineMapper; + + @Autowired + private ProcessInstanceMapper processInstanceMapper; + + + @Autowired + private TaskInstanceMapper taskInstanceMapper; + + @Autowired + private ScheduleMapper scheduleMapper; + + @Autowired + private ProcessService processService; + + /** + * create process definition + * + * @param loginUser login user + * @param projectName project name + * @param name process definition name + * @param processDefinitionJson process definition json + * @param desc description + * @param locations locations for nodes + * @param connects connects for nodes + * @return create result code + * @throws JsonProcessingException JsonProcessingException + */ + public Map createProcessDefinition(User loginUser, + String projectName, + String name, + String processDefinitionJson, + String desc, + String locations, + String connects) throws JsonProcessingException { + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + // check project auth + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefine = new ProcessDefinition(); + Date now = new Date(); + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); + if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { + return checkProcessJson; + } + + processDefine.setName(name); + processDefine.setReleaseState(ReleaseState.OFFLINE); + processDefine.setProjectId(project.getId()); + processDefine.setUserId(loginUser.getId()); + processDefine.setProcessDefinitionJson(processDefinitionJson); + processDefine.setDescription(desc); + processDefine.setLocations(locations); + processDefine.setConnects(connects); + processDefine.setTimeout(processData.getTimeout()); + processDefine.setTenantId(processData.getTenantId()); + processDefine.setModifyBy(loginUser.getUserName()); + processDefine.setResourceIds(getResourceIds(processData)); + + //custom global params + List globalParamsList = processData.getGlobalParams(); + if (CollectionUtils.isNotEmpty(globalParamsList)) { + Set globalParamsSet = new HashSet<>(globalParamsList); + globalParamsList = new ArrayList<>(globalParamsSet); + processDefine.setGlobalParamList(globalParamsList); + } + processDefine.setCreateTime(now); + processDefine.setUpdateTime(now); + processDefine.setFlag(Flag.YES); + processDefineMapper.insert(processDefine); + + // return processDefinition object with ID + result.put(Constants.DATA_LIST, processDefineMapper.selectById(processDefine.getId())); + putMsg(result, Status.SUCCESS); + result.put("processDefinitionId", processDefine.getId()); + return result; + } + + /** + * get resource ids + * + * @param processData process data + * @return resource ids + */ + private String getResourceIds(ProcessData processData) { + List tasks = processData.getTasks(); + Set resourceIds = new HashSet<>(); + for (TaskNode taskNode : tasks) { + String taskParameter = taskNode.getParams(); + AbstractParameters params = TaskParametersUtils.getParameters(taskNode.getType(), taskParameter); + if (CollectionUtils.isNotEmpty(params.getResourceFilesList())) { + Set tempSet = params.getResourceFilesList().stream().map(t -> t.getId()).collect(Collectors.toSet()); + resourceIds.addAll(tempSet); + } + } + + StringBuilder sb = new StringBuilder(); + for (int i : resourceIds) { + if (sb.length() > 0) { + sb.append(","); + } + sb.append(i); + } + return sb.toString(); + } + + + /** + * query process definition list + * + * @param loginUser login user + * @param projectName project name + * @return definition list + */ + public Map queryProcessDefinitionList(User loginUser, String projectName) { + + HashMap result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + List resourceList = processDefineMapper.queryAllDefinitionList(project.getId()); + result.put(Constants.DATA_LIST, resourceList); + putMsg(result, Status.SUCCESS); + + return result; + } + + + /** + * query process definition list paging + * + * @param loginUser login user + * @param projectName project name + * @param searchVal search value + * @param pageNo page number + * @param pageSize page size + * @param userId user id + * @return process definition page + */ + public Map queryProcessDefinitionListPaging(User loginUser, String projectName, String searchVal, Integer pageNo, Integer pageSize, Integer userId) { + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + Page page = new Page(pageNo, pageSize); + IPage processDefinitionIPage = processDefineMapper.queryDefineListPaging( + page, searchVal, userId, project.getId(), isAdmin(loginUser)); + + PageInfo pageInfo = new PageInfo(pageNo, pageSize); + pageInfo.setTotalCount((int) processDefinitionIPage.getTotal()); + pageInfo.setLists(processDefinitionIPage.getRecords()); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * query datail of process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return process definition detail + */ + public Map queryProcessDefinitionById(User loginUser, String projectName, Integer processId) { + + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId); + } else { + result.put(Constants.DATA_LIST, processDefinition); + putMsg(result, Status.SUCCESS); + } + return result; + } + + /** + * copy process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return copy result code + */ + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException { + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); + return result; + } else { + return createProcessDefinition( + loginUser, + projectName, + processDefinition.getName() + "_copy_" + System.currentTimeMillis(), + processDefinition.getProcessDefinitionJson(), + processDefinition.getDescription(), + processDefinition.getLocations(), + processDefinition.getConnects()); + } + } + + /** + * update process definition + * + * @param loginUser login user + * @param projectName project name + * @param name process definition name + * @param id process definition id + * @param processDefinitionJson process definition json + * @param desc description + * @param locations locations for nodes + * @param connects connects for nodes + * @return update result code + */ + public Map updateProcessDefinition(User loginUser, String projectName, int id, String name, + String processDefinitionJson, String desc, + String locations, String connects) { + Map result = new HashMap<>(5); + + Project project = projectMapper.queryByName(projectName); + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); + if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) { + return checkProcessJson; + } + ProcessDefinition processDefine = processService.findProcessDefineById(id); + if (processDefine == null) { + // check process definition exists + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); + return result; + } else if (processDefine.getReleaseState() == ReleaseState.ONLINE) { + // online can not permit edit + putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefine.getName()); + return result; + } else { + putMsg(result, Status.SUCCESS); + } + + Date now = new Date(); + + processDefine.setId(id); + processDefine.setName(name); + processDefine.setReleaseState(ReleaseState.OFFLINE); + processDefine.setProjectId(project.getId()); + processDefine.setProcessDefinitionJson(processDefinitionJson); + processDefine.setDescription(desc); + processDefine.setLocations(locations); + processDefine.setConnects(connects); + processDefine.setTimeout(processData.getTimeout()); + processDefine.setTenantId(processData.getTenantId()); + processDefine.setModifyBy(loginUser.getUserName()); + processDefine.setResourceIds(getResourceIds(processData)); + + //custom global params + List globalParamsList = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(processData.getGlobalParams())) { + Set userDefParamsSet = new HashSet<>(processData.getGlobalParams()); + globalParamsList = new ArrayList<>(userDefParamsSet); + } + processDefine.setGlobalParamList(globalParamsList); + processDefine.setUpdateTime(now); + processDefine.setFlag(Flag.YES); + if (processDefineMapper.updateById(processDefine) > 0) { + putMsg(result, Status.SUCCESS); + + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + return result; + } + + /** + * verify process definition name unique + * + * @param loginUser login user + * @param projectName project name + * @param name name + * @return true if process definition name not exists, otherwise false + */ + public Map verifyProcessDefinitionName(User loginUser, String projectName, String name) { + + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), name); + if (processDefinition == null) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.PROCESS_INSTANCE_EXIST, name); + } + return result; + } + + /** + * delete process definition by id + * + * @param loginUser login user + * @param projectName project name + * @param processDefinitionId process definition id + * @return delete result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map deleteProcessDefinitionById(User loginUser, String projectName, Integer processDefinitionId) { + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); + + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId); + return result; + } + + // Determine if the login user is the owner of the process definition + if (loginUser.getId() != processDefinition.getUserId() && loginUser.getUserType() != UserType.ADMIN_USER) { + putMsg(result, Status.USER_NO_OPERATION_PERM); + return result; + } + + // check process definition is already online + if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionId); + return result; + } + + // get the timing according to the process definition + List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); + if (!schedules.isEmpty() && schedules.size() > 1) { + logger.warn("scheduler num is {},Greater than 1", schedules.size()); + putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); + return result; + } else if (schedules.size() == 1) { + Schedule schedule = schedules.get(0); + if (schedule.getReleaseState() == ReleaseState.OFFLINE) { + scheduleMapper.deleteById(schedule.getId()); + } else if (schedule.getReleaseState() == ReleaseState.ONLINE) { + putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); + return result; + } + } + + int delete = processDefineMapper.deleteById(processDefinitionId); + + if (delete > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR); + } + return result; + } + + /** + * release process definition: online / offline + * + * @param loginUser login user + * @param projectName project name + * @param id process definition id + * @param releaseState release state + * @return release result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map releaseProcessDefinition(User loginUser, String projectName, int id, int releaseState) { + HashMap result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultEnum = (Status) checkResult.get(Constants.STATUS); + if (resultEnum != Status.SUCCESS) { + return checkResult; + } + + ReleaseState state = ReleaseState.getEnum(releaseState); + + // check state + if (null == state) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); + return result; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(id); + + switch (state) { + case ONLINE: + // To check resources whether they are already cancel authorized or deleted + String resourceIds = processDefinition.getResourceIds(); + if (StringUtils.isNotBlank(resourceIds)) { + Integer[] resourceIdArray = Arrays.stream(resourceIds.split(",")).map(Integer::parseInt).toArray(Integer[]::new); + PermissionCheck permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE_ID, processService, resourceIdArray, loginUser.getId(), logger); + try { + permissionCheck.checkPermission(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION, RELEASESTATE); + return result; + } + } + + processDefinition.setReleaseState(state); + processDefineMapper.updateById(processDefinition); + break; + case OFFLINE: + processDefinition.setReleaseState(state); + processDefineMapper.updateById(processDefinition); + List scheduleList = scheduleMapper.selectAllByProcessDefineArray( + new int[]{processDefinition.getId()} + ); + + for (Schedule schedule : scheduleList) { + logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); + // set status + schedule.setReleaseState(ReleaseState.OFFLINE); + scheduleMapper.updateById(schedule); + SchedulerService.deleteSchedule(project.getId(), schedule.getId()); + } + break; + default: + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE); + return result; + } + + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * batch export process definition by ids + * + * @param loginUser + * @param projectName + * @param processDefinitionIds + * @param response + */ + public void batchExportProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds, HttpServletResponse response) { + + if (StringUtils.isEmpty(processDefinitionIds)) { + return; + } + + //export project info + Project project = projectMapper.queryByName(projectName); + + //check user access for project + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + + if (resultStatus != Status.SUCCESS) { + return; + } + + List processDefinitionList = + getProcessDefinitionList(processDefinitionIds); + + if (CollectionUtils.isNotEmpty(processDefinitionList)) { + downloadProcessDefinitionFile(response, processDefinitionList); + } + } + + /** + * get process definition list by ids + * + * @param processDefinitionIds + * @return + */ + private List getProcessDefinitionList(String processDefinitionIds) { + List processDefinitionList = new ArrayList<>(); + String[] processDefinitionIdArray = processDefinitionIds.split(","); + for (String strProcessDefinitionId : processDefinitionIdArray) { + //get workflow info + int processDefinitionId = Integer.parseInt(strProcessDefinitionId); + ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); + if (null != processDefinition) { + processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); + } + } + + return processDefinitionList; + } + + /** + * download the process definition file + * + * @param response + * @param processDefinitionList + */ + private void downloadProcessDefinitionFile(HttpServletResponse response, List processDefinitionList) { + response.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); + BufferedOutputStream buff = null; + ServletOutputStream out = null; + try { + out = response.getOutputStream(); + buff = new BufferedOutputStream(out); + buff.write(JSONUtils.toJsonString(processDefinitionList).getBytes(StandardCharsets.UTF_8)); + buff.flush(); + buff.close(); + } catch (IOException e) { + logger.warn("export process fail", e); + } finally { + if (null != buff) { + try { + buff.close(); + } catch (Exception e) { + logger.warn("export process buffer not close", e); + } + } + if (null != out) { + try { + out.close(); + } catch (Exception e) { + logger.warn("export process output stream not close", e); + } + } + } + } + + /** + * get export process metadata string + * + * @param processDefinitionId process definition id + * @param processDefinition process definition + * @return export process metadata string + */ + private String exportProcessMetaDataStr(Integer processDefinitionId, ProcessDefinition processDefinition) { + //create workflow json file + return JSONUtils.toJsonString(exportProcessMetaData(processDefinitionId, processDefinition)); + } + + /** + * get export process metadata string + * + * @param processDefinitionId process definition id + * @param processDefinition process definition + * @return export process metadata string + */ + public ProcessMeta exportProcessMetaData(Integer processDefinitionId, ProcessDefinition processDefinition) { + //correct task param which has data source or dependent param + String correctProcessDefinitionJson = addExportTaskNodeSpecialParam(processDefinition.getProcessDefinitionJson()); + processDefinition.setProcessDefinitionJson(correctProcessDefinitionJson); + + //export process metadata + ProcessMeta exportProcessMeta = new ProcessMeta(); + exportProcessMeta.setProjectName(processDefinition.getProjectName()); + exportProcessMeta.setProcessDefinitionName(processDefinition.getName()); + exportProcessMeta.setProcessDefinitionJson(processDefinition.getProcessDefinitionJson()); + exportProcessMeta.setProcessDefinitionLocations(processDefinition.getLocations()); + exportProcessMeta.setProcessDefinitionConnects(processDefinition.getConnects()); + + //schedule info + List schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId); + if (!schedules.isEmpty()) { + Schedule schedule = schedules.get(0); + exportProcessMeta.setScheduleWarningType(schedule.getWarningType().toString()); + exportProcessMeta.setScheduleWarningGroupId(schedule.getWarningGroupId()); + exportProcessMeta.setScheduleStartTime(DateUtils.dateToString(schedule.getStartTime())); + exportProcessMeta.setScheduleEndTime(DateUtils.dateToString(schedule.getEndTime())); + exportProcessMeta.setScheduleCrontab(schedule.getCrontab()); + exportProcessMeta.setScheduleFailureStrategy(String.valueOf(schedule.getFailureStrategy())); + exportProcessMeta.setScheduleReleaseState(String.valueOf(ReleaseState.OFFLINE)); + exportProcessMeta.setScheduleProcessInstancePriority(String.valueOf(schedule.getProcessInstancePriority())); + exportProcessMeta.setScheduleWorkerGroupName(schedule.getWorkerGroup()); + } + //create workflow json file + return exportProcessMeta; + } + + /** + * correct task param which has datasource or dependent + * + * @param processDefinitionJson processDefinitionJson + * @return correct processDefinitionJson + */ + private String addExportTaskNodeSpecialParam(String processDefinitionJson) { + ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.path(TASKS); + + for (int i = 0; i < jsonArray.size(); i++) { + JsonNode taskNode = jsonArray.path(i); + if (StringUtils.isNotEmpty(taskNode.path("type").asText())) { + String taskType = taskNode.path("type").asText(); + + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + if (null != addTaskParam) { + addTaskParam.addExportSpecialParam(taskNode); + } + } + } + jsonObject.set(TASKS, jsonArray); + return jsonObject.toString(); + } + + /** + * check task if has sub process + * + * @param taskType task type + * @return if task has sub process return true else false + */ + private boolean checkTaskHasSubProcess(String taskType) { + return taskType.equals(TaskType.SUB_PROCESS.name()); + } + + /** + * import process definition + * + * @param loginUser login user + * @param file process metadata json file + * @param currentProjectName current project name + * @return import process + */ + @Transactional(rollbackFor = RuntimeException.class) + public Map importProcessDefinition(User loginUser, MultipartFile file, String currentProjectName) { + Map result = new HashMap<>(5); + String processMetaJson = FileUtils.file2String(file); + List processMetaList = JSONUtils.toList(processMetaJson, ProcessMeta.class); + + //check file content + if (CollectionUtils.isEmpty(processMetaList)) { + putMsg(result, Status.DATA_IS_NULL, "fileContent"); + return result; + } + + for (ProcessMeta processMeta : processMetaList) { + + if (!checkAndImportProcessDefinition(loginUser, currentProjectName, result, processMeta)) { + return result; + } + } + + return result; + } + + /** + * check and import process definition + * + * @param loginUser + * @param currentProjectName + * @param result + * @param processMeta + * @return + */ + private boolean checkAndImportProcessDefinition(User loginUser, String currentProjectName, Map result, ProcessMeta processMeta) { + + if (!checkImportanceParams(processMeta, result)) { + return false; + } + + //deal with process name + String processDefinitionName = processMeta.getProcessDefinitionName(); + //use currentProjectName to query + Project targetProject = projectMapper.queryByName(currentProjectName); + if (null != targetProject) { + processDefinitionName = recursionProcessDefinitionName(targetProject.getId(), + processDefinitionName, 1); + } + + //unique check + Map checkResult = verifyProcessDefinitionName(loginUser, currentProjectName, processDefinitionName); + Status status = (Status) checkResult.get(Constants.STATUS); + if (Status.SUCCESS.equals(status)) { + putMsg(result, Status.SUCCESS); + } else { + result.putAll(checkResult); + return false; + } + + // get create process result + Map createProcessResult = + getCreateProcessResult(loginUser, + currentProjectName, + result, + processMeta, + processDefinitionName, + addImportTaskNodeParam(loginUser, processMeta.getProcessDefinitionJson(), targetProject)); + + if (createProcessResult == null) { + return false; + } + + //create process definition + Integer processDefinitionId = + Objects.isNull(createProcessResult.get(PROCESSDEFINITIONID)) ? + null : Integer.parseInt(createProcessResult.get(PROCESSDEFINITIONID).toString()); + + //scheduler param + return getImportProcessScheduleResult(loginUser, + currentProjectName, + result, + processMeta, + processDefinitionName, + processDefinitionId); + + } + + /** + * get create process result + * + * @param loginUser + * @param currentProjectName + * @param result + * @param processMeta + * @param processDefinitionName + * @param importProcessParam + * @return + */ + private Map getCreateProcessResult(User loginUser, + String currentProjectName, + Map result, + ProcessMeta processMeta, + String processDefinitionName, + String importProcessParam) { + Map createProcessResult = null; + try { + createProcessResult = createProcessDefinition(loginUser + , currentProjectName, + processDefinitionName + "_import_" + System.currentTimeMillis(), + importProcessParam, + processMeta.getProcessDefinitionDescription(), + processMeta.getProcessDefinitionLocations(), + processMeta.getProcessDefinitionConnects()); + putMsg(result, Status.SUCCESS); + } catch (JsonProcessingException e) { + logger.error("import process meta json data: {}", e.getMessage(), e); + putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); + } + + return createProcessResult; + } + + /** + * get import process schedule result + * + * @param loginUser + * @param currentProjectName + * @param result + * @param processMeta + * @param processDefinitionName + * @param processDefinitionId + * @return + */ + private boolean getImportProcessScheduleResult(User loginUser, + String currentProjectName, + Map result, + ProcessMeta processMeta, + String processDefinitionName, + Integer processDefinitionId) { + if (null != processMeta.getScheduleCrontab() && null != processDefinitionId) { + int scheduleInsert = importProcessSchedule(loginUser, + currentProjectName, + processMeta, + processDefinitionName, + processDefinitionId); + + if (0 == scheduleInsert) { + putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); + return false; + } + } + return true; + } + + /** + * check importance params + * + * @param processMeta + * @param result + * @return + */ + private boolean checkImportanceParams(ProcessMeta processMeta, Map result) { + if (StringUtils.isEmpty(processMeta.getProjectName())) { + putMsg(result, Status.DATA_IS_NULL, "projectName"); + return false; + } + if (StringUtils.isEmpty(processMeta.getProcessDefinitionName())) { + putMsg(result, Status.DATA_IS_NULL, "processDefinitionName"); + return false; + } + if (StringUtils.isEmpty(processMeta.getProcessDefinitionJson())) { + putMsg(result, Status.DATA_IS_NULL, "processDefinitionJson"); + return false; + } + + return true; + } + + /** + * import process add special task param + * + * @param loginUser login user + * @param processDefinitionJson process definition json + * @param targetProject target project + * @return import process param + */ + private String addImportTaskNodeParam(User loginUser, String processDefinitionJson, Project targetProject) { + ObjectNode jsonObject = JSONUtils.parseObject(processDefinitionJson); + ArrayNode jsonArray = (ArrayNode) jsonObject.get(TASKS); + //add sql and dependent param + for (int i = 0; i < jsonArray.size(); i++) { + JsonNode taskNode = jsonArray.path(i); + String taskType = taskNode.path("type").asText(); + ProcessAddTaskParam addTaskParam = TaskNodeParamFactory.getByTaskType(taskType); + if (null != addTaskParam) { + addTaskParam.addImportSpecialParam(taskNode); + } + } + + //recursive sub-process parameter correction map key for old process id value for new process id + Map subProcessIdMap = new HashMap<>(20); + + List subProcessList = StreamUtils.asStream(jsonArray.elements()) + .filter(elem -> checkTaskHasSubProcess(JSONUtils.parseObject(elem.toString()).path("type").asText())) + .collect(Collectors.toList()); + + if (CollectionUtils.isNotEmpty(subProcessList)) { + importSubProcess(loginUser, targetProject, jsonArray, subProcessIdMap); + } + + jsonObject.set(TASKS, jsonArray); + return jsonObject.toString(); + } + + /** + * import process schedule + * + * @param loginUser login user + * @param currentProjectName current project name + * @param processMeta process meta data + * @param processDefinitionName process definition name + * @param processDefinitionId process definition id + * @return insert schedule flag + */ + private int importProcessSchedule(User loginUser, String currentProjectName, ProcessMeta processMeta, + String processDefinitionName, Integer processDefinitionId) { + Date now = new Date(); + Schedule scheduleObj = new Schedule(); + scheduleObj.setProjectName(currentProjectName); + scheduleObj.setProcessDefinitionId(processDefinitionId); + scheduleObj.setProcessDefinitionName(processDefinitionName); + scheduleObj.setCreateTime(now); + scheduleObj.setUpdateTime(now); + scheduleObj.setUserId(loginUser.getId()); + scheduleObj.setUserName(loginUser.getUserName()); + + scheduleObj.setCrontab(processMeta.getScheduleCrontab()); + + if (null != processMeta.getScheduleStartTime()) { + scheduleObj.setStartTime(DateUtils.stringToDate(processMeta.getScheduleStartTime())); + } + if (null != processMeta.getScheduleEndTime()) { + scheduleObj.setEndTime(DateUtils.stringToDate(processMeta.getScheduleEndTime())); + } + if (null != processMeta.getScheduleWarningType()) { + scheduleObj.setWarningType(WarningType.valueOf(processMeta.getScheduleWarningType())); + } + if (null != processMeta.getScheduleWarningGroupId()) { + scheduleObj.setWarningGroupId(processMeta.getScheduleWarningGroupId()); + } + if (null != processMeta.getScheduleFailureStrategy()) { + scheduleObj.setFailureStrategy(FailureStrategy.valueOf(processMeta.getScheduleFailureStrategy())); + } + if (null != processMeta.getScheduleReleaseState()) { + scheduleObj.setReleaseState(ReleaseState.valueOf(processMeta.getScheduleReleaseState())); + } + if (null != processMeta.getScheduleProcessInstancePriority()) { + scheduleObj.setProcessInstancePriority(Priority.valueOf(processMeta.getScheduleProcessInstancePriority())); + } + + if (null != processMeta.getScheduleWorkerGroupName()) { + scheduleObj.setWorkerGroup(processMeta.getScheduleWorkerGroupName()); + } + + return scheduleMapper.insert(scheduleObj); + } + + /** + * check import process has sub process + * recursion create sub process + * + * @param loginUser login user + * @param targetProject target project + * @param jsonArray process task array + * @param subProcessIdMap correct sub process id map + */ + private void importSubProcess(User loginUser, Project targetProject, ArrayNode jsonArray, Map subProcessIdMap) { + for (int i = 0; i < jsonArray.size(); i++) { + ObjectNode taskNode = (ObjectNode) jsonArray.path(i); + String taskType = taskNode.path("type").asText(); + + if (!checkTaskHasSubProcess(taskType)) { + continue; + } + //get sub process info + ObjectNode subParams = (ObjectNode) taskNode.path("params"); + Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); + ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); + //check is sub process exist in db + if (null == subProcess) { + continue; + } + String subProcessJson = subProcess.getProcessDefinitionJson(); + //check current project has sub process + ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); + + if (null == currentProjectSubProcess) { + ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); + + List subProcessList = StreamUtils.asStream(subJsonArray.elements()) + .filter(item -> checkTaskHasSubProcess(JSONUtils.parseObject(item.toString()).path("type").asText())) + .collect(Collectors.toList()); + + if (CollectionUtils.isNotEmpty(subProcessList)) { + importSubProcess(loginUser, targetProject, subJsonArray, subProcessIdMap); + //sub process processId correct + if (!subProcessIdMap.isEmpty()) { + + for (Map.Entry entry : subProcessIdMap.entrySet()) { + String oldSubProcessId = "\"processDefinitionId\":" + entry.getKey(); + String newSubProcessId = "\"processDefinitionId\":" + entry.getValue(); + subProcessJson = subProcessJson.replaceAll(oldSubProcessId, newSubProcessId); + } + + subProcessIdMap.clear(); + } + } + + //if sub-process recursion + Date now = new Date(); + //create sub process in target project + ProcessDefinition processDefine = new ProcessDefinition(); + processDefine.setName(subProcess.getName()); + processDefine.setVersion(subProcess.getVersion()); + processDefine.setReleaseState(subProcess.getReleaseState()); + processDefine.setProjectId(targetProject.getId()); + processDefine.setUserId(loginUser.getId()); + processDefine.setProcessDefinitionJson(subProcessJson); + processDefine.setDescription(subProcess.getDescription()); + processDefine.setLocations(subProcess.getLocations()); + processDefine.setConnects(subProcess.getConnects()); + processDefine.setTimeout(subProcess.getTimeout()); + processDefine.setTenantId(subProcess.getTenantId()); + processDefine.setGlobalParams(subProcess.getGlobalParams()); + processDefine.setCreateTime(now); + processDefine.setUpdateTime(now); + processDefine.setFlag(subProcess.getFlag()); + processDefine.setReceivers(subProcess.getReceivers()); + processDefine.setReceiversCc(subProcess.getReceiversCc()); + processDefineMapper.insert(processDefine); + + logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); + + //modify task node + ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); + + if (null != newSubProcessDefine) { + subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); + subParams.put(PROCESSDEFINITIONID, newSubProcessDefine.getId()); + taskNode.set("params", subParams); + } + } + } + } + + + /** + * check the process definition node meets the specifications + * + * @param processData process data + * @param processDefinitionJson process definition json + * @return check result code + */ + public Map checkProcessNodeList(ProcessData processData, String processDefinitionJson) { + + Map result = new HashMap<>(5); + try { + if (processData == null) { + logger.error("process data is null"); + putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); + return result; + } + + // Check whether the task node is normal + List taskNodes = processData.getTasks(); + + if (taskNodes == null) { + logger.error("process node info is empty"); + putMsg(result, Status.DATA_IS_NULL, processDefinitionJson); + return result; + } + + // check has cycle + if (graphHasCycle(taskNodes)) { + logger.error("process DAG has cycle"); + putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); + return result; + } + + // check whether the process definition json is normal + for (TaskNode taskNode : taskNodes) { + if (!CheckUtils.checkTaskNodeParameters(taskNode.getParams(), taskNode.getType())) { + logger.error("task node {} parameter invalid", taskNode.getName()); + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName()); + return result; + } + + // check extra params + CheckUtils.checkOtherParams(taskNode.getExtras()); + } + putMsg(result, Status.SUCCESS); + } catch (Exception e) { + result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); + result.put(Constants.MSG, e.getMessage()); + } + return result; + } + + /** + * get task node details based on process definition + * + * @param defineId define id + * @return task node list + */ + public Map getTaskNodeListByDefinitionId(Integer defineId) { + Map result = new HashMap<>(); + + ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); + if (processDefinition == null) { + logger.info("process define not exists"); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId); + return result; + } + + + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + //process data check + if (null == processData) { + logger.error("process data is null"); + putMsg(result, Status.DATA_IS_NOT_VALID, processDefinitionJson); + return result; + } + + List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); + + result.put(Constants.DATA_LIST, taskNodeList); + putMsg(result, Status.SUCCESS); + + return result; + + } + + /** + * get task node details based on process definition + * + * @param defineIdList define id list + * @return task node list + */ + public Map getTaskNodeListByDefinitionIdList(String defineIdList) { + Map result = new HashMap<>(); + + Map> taskNodeMap = new HashMap<>(); + String[] idList = defineIdList.split(","); + List idIntList = new ArrayList<>(); + for (String definitionId : idList) { + idIntList.add(Integer.parseInt(definitionId)); + } + Integer[] idArray = idIntList.toArray(new Integer[idIntList.size()]); + List processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray); + if (CollectionUtils.isEmpty(processDefinitionList)) { + logger.info("process definition not exists"); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); + return result; + } + + for (ProcessDefinition processDefinition : processDefinitionList) { + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + List taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); + taskNodeMap.put(processDefinition.getId(), taskNodeList); + } + + result.put(Constants.DATA_LIST, taskNodeMap); + putMsg(result, Status.SUCCESS); + + return result; + + } + + + /** + * query process definition all by project id + * + * @param projectId project id + * @return process definitions in the project + */ + public Map queryProcessDefinitionAllByProjectId(Integer projectId) { + + HashMap result = new HashMap<>(5); + + List resourceList = processDefineMapper.queryAllDefinitionList(projectId); + result.put(Constants.DATA_LIST, resourceList); + putMsg(result, Status.SUCCESS); + + return result; + } + + /** + * Encapsulates the TreeView structure + * + * @param processId process definition id + * @param limit limit + * @return tree view json data + * @throws Exception exception + */ + public Map viewTree(Integer processId, Integer limit) throws Exception { + Map result = new HashMap<>(); + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (null == processDefinition) { + logger.info("process define not exists"); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); + return result; + } + DAG dag = genDagGraph(processDefinition); + /** + * nodes that is running + */ + Map> runningNodeMap = new ConcurrentHashMap<>(); + + /** + * nodes that is waiting torun + */ + Map> waitingRunningNodeMap = new ConcurrentHashMap<>(); + + /** + * List of process instances + */ + List processInstanceList = processInstanceMapper.queryByProcessDefineId(processId, limit); + + for (ProcessInstance processInstance : processInstanceList) { + processInstance.setDuration(DateUtils.differSec(processInstance.getStartTime(), processInstance.getEndTime())); + } + + if (limit > processInstanceList.size()) { + limit = processInstanceList.size(); + } + + TreeViewDto parentTreeViewDto = new TreeViewDto(); + parentTreeViewDto.setName("DAG"); + parentTreeViewDto.setType(""); + // Specify the process definition, because it is a TreeView for a process definition + + for (int i = limit - 1; i >= 0; i--) { + ProcessInstance processInstance = processInstanceList.get(i); + + Date endTime = processInstance.getEndTime() == null ? new Date() : processInstance.getEndTime(); + parentTreeViewDto.getInstances().add(new Instance(processInstance.getId(), processInstance.getName(), "", processInstance.getState().toString() + , processInstance.getStartTime(), endTime, processInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - processInstance.getStartTime().getTime()))); + } + + List parentTreeViewDtoList = new ArrayList<>(); + parentTreeViewDtoList.add(parentTreeViewDto); + // Here is the encapsulation task instance + for (String startNode : dag.getBeginNode()) { + runningNodeMap.put(startNode, parentTreeViewDtoList); + } + + while (Stopper.isRunning()) { + Set postNodeList = null; + Iterator>> iter = runningNodeMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry> en = iter.next(); + String nodeName = en.getKey(); + parentTreeViewDtoList = en.getValue(); + + TreeViewDto treeViewDto = new TreeViewDto(); + treeViewDto.setName(nodeName); + TaskNode taskNode = dag.getNode(nodeName); + treeViewDto.setType(taskNode.getType()); + + + //set treeViewDto instances + for (int i = limit - 1; i >= 0; i--) { + ProcessInstance processInstance = processInstanceList.get(i); + TaskInstance taskInstance = taskInstanceMapper.queryByInstanceIdAndName(processInstance.getId(), nodeName); + if (taskInstance == null) { + treeViewDto.getInstances().add(new Instance(-1, "not running", "null")); + } else { + Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); + Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); + + int subProcessId = 0; + /** + * if process is sub process, the return sub id, or sub id=0 + */ + if (taskInstance.getTaskType().equals(TaskType.SUB_PROCESS.name())) { + String taskJson = taskInstance.getTaskJson(); + taskNode = JSONUtils.parseObject(taskJson, TaskNode.class); + subProcessId = Integer.parseInt(JSONUtils.parseObject( + taskNode.getParams()).path(CMDPARAM_SUB_PROCESS_DEFINE_ID).asText()); + } + treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskType(), taskInstance.getState().toString() + , taskInstance.getStartTime(), taskInstance.getEndTime(), taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); + } + } + for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { + pTreeViewDto.getChildren().add(treeViewDto); + } + postNodeList = dag.getSubsequentNodes(nodeName); + if (CollectionUtils.isNotEmpty(postNodeList)) { + for (String nextNodeName : postNodeList) { + List treeViewDtoList = waitingRunningNodeMap.get(nextNodeName); + if (CollectionUtils.isNotEmpty(treeViewDtoList)) { + treeViewDtoList.add(treeViewDto); + waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); + } else { + treeViewDtoList = new ArrayList<>(); + treeViewDtoList.add(treeViewDto); + waitingRunningNodeMap.put(nextNodeName, treeViewDtoList); + } + } + } + runningNodeMap.remove(nodeName); + } + if (waitingRunningNodeMap == null || waitingRunningNodeMap.size() == 0) { + break; + } else { + runningNodeMap.putAll(waitingRunningNodeMap); + waitingRunningNodeMap.clear(); + } + } + result.put(Constants.DATA_LIST, parentTreeViewDto); + result.put(Constants.STATUS, Status.SUCCESS); + result.put(Constants.MSG, Status.SUCCESS.getMsg()); + return result; + } + + + /** + * Generate the DAG Graph based on the process definition id + * + * @param processDefinition process definition + * @return dag graph + */ + private DAG genDagGraph(ProcessDefinition processDefinition) { + + String processDefinitionJson = processDefinition.getProcessDefinitionJson(); + + ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + + //check process data + if (null != processData) { + List taskNodeList = processData.getTasks(); + processDefinition.setGlobalParamList(processData.getGlobalParams()); + ProcessDag processDag = DagHelper.getProcessDag(taskNodeList); + + // Generate concrete Dag to be executed + return DagHelper.buildDagGraph(processDag); + } + + return new DAG<>(); + } + + + /** + * whether the graph has a ring + * + * @param taskNodeResponseList task node response list + * @return if graph has cycle flag + */ + private boolean graphHasCycle(List taskNodeResponseList) { + DAG graph = new DAG<>(); + + // Fill the vertices + for (TaskNode taskNodeResponse : taskNodeResponseList) { + graph.addNode(taskNodeResponse.getName(), taskNodeResponse); + } + + // Fill edge relations + for (TaskNode taskNodeResponse : taskNodeResponseList) { + taskNodeResponse.getPreTasks(); + List preTasks = JSONUtils.toList(taskNodeResponse.getPreTasks(), String.class); + if (CollectionUtils.isNotEmpty(preTasks)) { + for (String preTask : preTasks) { + if (!graph.addEdge(preTask, taskNodeResponse.getName())) { + return true; + } + } + } + } + + return graph.hasCycle(); + } + + private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) { + ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); + if (processDefinition != null) { + if (num > 1) { + String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); + processDefinitionName = str + "(" + num + ")"; + } else { + processDefinitionName = processDefinition.getName() + "(" + num + ")"; + } + } else { + return processDefinitionName; + } + return recursionProcessDefinitionName(projectId, processDefinitionName, num + 1); + } + +} + diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 8c0d04c6c6..c5f98630a2 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.controller; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -55,7 +56,7 @@ public class ProcessDefinitionControllerTest{ private ProcessDefinitionController processDefinitionController; @Mock - private ProcessDefinitionService processDefinitionService; + private ProcessDefinitionServiceImpl processDefinitionService; protected User user; @@ -342,9 +343,7 @@ public class ProcessDefinitionControllerTest{ String processDefinitionIds = "1,2"; String projectName = "test"; HttpServletResponse response = new MockHttpServletResponse(); - ProcessDefinitionService service = new ProcessDefinitionService(); - ProcessDefinitionService spy = Mockito.spy(service); - Mockito.doNothing().when(spy).batchExportProcessDefinitionByIds(user, projectName, processDefinitionIds, response); + Mockito.doNothing().when(this.processDefinitionService).batchExportProcessDefinitionByIds(user, projectName, processDefinitionIds, response); processDefinitionController.batchExportProcessDefinitionByIds(user, projectName, processDefinitionIds, response); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java deleted file mode 100644 index bb6e3882fe..0000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseDAGServiceTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.api.service; - -import org.apache.dolphinscheduler.common.graph.DAG; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.model.TaskNodeRelation; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class BaseDAGServiceTest { - - @Test - public void testProcessInstance2DAG(){ - - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setProcessInstanceJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-61567\"," + - "\"name\":\"开始\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo '1'\"}," + - "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + - "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + - "\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SHELL\",\"id\":\"tasks-6-3ug5ej\",\"name\":\"结束\"," + - "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo '1'\"},\"description\":\"\"," + - "\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + - "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + - "\"workerGroupId\":-1,\"preTasks\":[\"开始\"]}],\"tenantId\":-1,\"timeout\":0}"); - - DAG relationDAG = BaseDAGService.processInstance2DAG(processInstance); - - Assert.assertTrue(relationDAG.containsNode("开始")); - - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8db667e28b..168fe48fc3 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -16,21 +16,45 @@ */ package org.apache.dolphinscheduler.api.service; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.dolphinscheduler.api.ApiApplicationServer; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.apache.dolphinscheduler.api.dto.ProcessMeta; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessData; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.http.entity.ContentType; -import org.json.JSONException; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,23 +62,14 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.skyscreamer.jsonassert.JSONAssert; -import org.springframework.boot.test.context.SpringBootTest; import org.springframework.mock.web.MockMultipartFile; import org.springframework.web.multipart.MultipartFile; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.text.MessageFormat; -import java.util.*; - @RunWith(MockitoJUnitRunner.Silent.class) -@SpringBootTest(classes = ApiApplicationServer.class) public class ProcessDefinitionServiceTest { @InjectMocks - ProcessDefinitionService processDefinitionService; + ProcessDefinitionServiceImpl processDefinitionService; @Mock private DataSourceMapper dataSourceMapper; @@ -502,157 +517,6 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.SUCCESS, taskNotNuLLRes.get(Constants.STATUS)); } - /** - * add datasource param and dependent when export process - * @throws JSONException - */ - @Test - public void testAddTaskNodeSpecialParam() throws JSONException { - - Mockito.when(dataSourceMapper.selectById(1)).thenReturn(getDataSource()); - Mockito.when(processDefineMapper.queryByDefineId(2)).thenReturn(getProcessDefinition()); - - String corSqlDependentJson = processDefinitionService.addExportTaskNodeSpecialParam(sqlDependentJson); - - JSONAssert.assertEquals(sqlDependentJson,corSqlDependentJson,false); - - } - - @Test - public void testExportProcessMetaDataStr() { - Mockito.when(scheduleMapper.queryByProcessDefinitionId(46)).thenReturn(getSchedulerList()); - ProcessDefinition processDefinition = getProcessDefinition(); - processDefinition.setProcessDefinitionJson(sqlDependentJson); - - String exportProcessMetaDataStr = processDefinitionService.exportProcessMetaDataStr(46, processDefinition); - Assert.assertNotEquals(sqlDependentJson,exportProcessMetaDataStr); - } - - @Test - public void testAddExportTaskNodeSpecialParam() throws JSONException { - String shellData = shellJson; - - String resultStr = processDefinitionService.addExportTaskNodeSpecialParam(shellData); - JSONAssert.assertEquals(shellJson, resultStr, false); - } - - @Test - public void testImportProcessSchedule() { - User loginUser = new User(); - loginUser.setId(1); - loginUser.setUserType(UserType.GENERAL_USER); - - String currentProjectName = "test"; - String processDefinitionName = "test_process"; - Integer processDefinitionId = 1; - Schedule schedule = getSchedule(); - - ProcessMeta processMeta = getProcessMeta(); - - int insertFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMeta, - processDefinitionName, processDefinitionId); - Assert.assertEquals(0, insertFlag); - - ProcessMeta processMetaCron = new ProcessMeta(); - processMetaCron.setScheduleCrontab(schedule.getCrontab()); - - int insertFlagCron = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, - processDefinitionName, processDefinitionId); - Assert.assertEquals(0, insertFlagCron); - - WorkerGroup workerGroup = new WorkerGroup(); - workerGroup.setName("ds-test-workergroup"); - List workerGroups = new ArrayList<>(); - workerGroups.add(workerGroup); - - processMetaCron.setScheduleWorkerGroupName("ds-test"); - int insertFlagWorker = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, - processDefinitionName, processDefinitionId); - Assert.assertEquals(0, insertFlagWorker); - - int workerNullFlag = processDefinitionService.importProcessSchedule(loginUser, currentProjectName, processMetaCron, - processDefinitionName, processDefinitionId); - Assert.assertEquals(0, workerNullFlag); - - - } - - /** - * import sub process test - */ - @Test - public void testImportSubProcess() { - - User loginUser = new User(); - loginUser.setId(1); - loginUser.setUserType(UserType.ADMIN_USER); - - Project testProject = getProject("test"); - - //Recursive subprocess sub2 process in sub1 process and sub1process in top process - String topProcessJson = "{\"globalParams\":[]," + - "\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-38634\",\"name\":\"shell1\"," + - "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-1\\\"\"}," + - "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + - "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," + - "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}," + - "{\"type\":\"SUB_PROCESS\",\"id\":\"tasks-44207\",\"name\":\"shell-4\"," + - "\"params\":{\"processDefinitionId\":39},\"description\":\"\",\"runFlag\":\"NORMAL\"," + - "\"dependence\":{},\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," + - "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + - "\"preTasks\":[\"shell1\"]}],\"tenantId\":1,\"timeout\":0}"; - - String sub1ProcessJson = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-84090\"," + - "\"name\":\"shell-4\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"#!/bin/bash\\necho \\\"shell-4\\\"\"}," + - "\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"," + - "\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," + - "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]},{\"type\":\"SUB_PROCESS\"," + - "\"id\":\"tasks-87364\",\"name\":\"shell-5\"," + - "\"params\":{\"processDefinitionId\":46},\"description\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{}," + - "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\"," + - "\"workerGroupId\":-1,\"preTasks\":[\"shell-4\"]}],\"tenantId\":1,\"timeout\":0}"; - - String sub2ProcessJson = "{\"globalParams\":[]," + - "\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-52423\",\"name\":\"shell-5\"," + - "\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo \\\"shell-5\\\"\"},\"description\":\"\"," + - "\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\"," + - "\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1," + - "\"preTasks\":[]}],\"tenantId\":1,\"timeout\":0}"; - - - ObjectNode jsonObject = JSONUtils.parseObject(topProcessJson); - ArrayNode jsonArray = (ArrayNode) jsonObject.path("tasks"); - - String originSubJson = jsonArray.toString(); - - Map subProcessIdMap = new HashMap<>(20); - - ProcessDefinition shellDefinition1 = new ProcessDefinition(); - shellDefinition1.setId(39); - shellDefinition1.setName("shell-4"); - shellDefinition1.setProjectId(2); - shellDefinition1.setProcessDefinitionJson(sub1ProcessJson); - - ProcessDefinition shellDefinition2 = new ProcessDefinition(); - shellDefinition2.setId(46); - shellDefinition2.setName("shell-5"); - shellDefinition2.setProjectId(2); - shellDefinition2.setProcessDefinitionJson(sub2ProcessJson); - - Mockito.when(processDefineMapper.queryByDefineId(39)).thenReturn(shellDefinition1); - Mockito.when(processDefineMapper.queryByDefineId(46)).thenReturn(shellDefinition2); - Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-5")).thenReturn(null); - Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "shell-4")).thenReturn(null); - Mockito.when(processDefineMapper.queryByDefineName(testProject.getId(), "testProject")).thenReturn(shellDefinition2); - - processDefinitionService.importSubProcess(loginUser,testProject, jsonArray, subProcessIdMap); - - String correctSubJson = jsonArray.toString(); - - Assert.assertEquals(originSubJson, correctSubJson); - - } - @Test public void testImportProcessDefinitionById() throws IOException { @@ -731,34 +595,6 @@ public class ProcessDefinitionServiceTest { } - /** - * check import process metadata - * @param file file - * @param loginUser login user - * @param currentProjectName current project name - * @param processMetaJson process meta json - * @throws IOException IO exception - */ - private void improssProcessCheckData(File file, User loginUser, String currentProjectName, String processMetaJson) throws IOException { - //check null - FileUtils.writeStringToFile(new File("/tmp/task.json"),processMetaJson); - - File fileEmpty = new File("/tmp/task.json"); - - FileInputStream fileEmptyInputStream = new FileInputStream("/tmp/task.json"); - - MultipartFile multiFileEmpty = new MockMultipartFile(fileEmpty.getName(), fileEmpty.getName(), - ContentType.APPLICATION_OCTET_STREAM.toString(), fileEmptyInputStream); - - Map resEmptyProcess = processDefinitionService.importProcessDefinition(loginUser, multiFileEmpty, currentProjectName); - - Assert.assertEquals(Status.DATA_IS_NULL, resEmptyProcess.get(Constants.STATUS)); - - boolean deleteFlag = file.delete(); - - Assert.assertTrue(deleteFlag); - } - @Test public void testUpdateProcessDefinition () { User loginUser = new User();