diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionService.java index 5538194db7..2ae1a1213f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionService.java @@ -31,7 +31,7 @@ public interface ProcessDefinitionVersionService { * @param processDefinition the process definition that need to record version * @return the newest version number of this process definition */ - long addProcessDefinitionVersion(ProcessDefinition processDefinition); + int addProcessDefinitionVersion(ProcessDefinition processDefinition); /** * query the pagination versions info by one certain process definition id diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java index b41ff71f5d..e74b2b46fc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java @@ -479,7 +479,7 @@ public class ProcessInstanceService extends BaseService { processDefinition.setUpdateTime(new Date()); // add process definition version - long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); + int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); processDefinition.setVersion(version); updateDefine = processDefineMapper.updateById(processDefinition); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 0d0fffadcf..c49a2e06cb 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.api.dto.treeview.TreeViewDto; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.BaseService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; -import org.apache.dolphinscheduler.api.service.ProcessDefinitionVersionService; import org.apache.dolphinscheduler.api.service.ProcessInstanceService; import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; import org.apache.dolphinscheduler.api.service.ProjectService; @@ -64,10 +63,10 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionVersion; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; @@ -135,9 +134,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements @Autowired private ProjectService projectService; - @Autowired - private ProcessDefinitionVersionService processDefinitionVersionService; - @Autowired private TaskDefinitionService taskDefinitionService; @@ -148,7 +144,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements private ProcessDefinitionLogMapper processDefinitionLogMapper; @Autowired - private ProcessDefinitionMapper processDefineMapper; + private ProcessDefinitionMapper processDefinitionMapper; @Autowired private ProcessInstanceService processInstanceService; @@ -212,6 +208,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } processDefinition.setName(name); + processDefinition.setVersion(1); processDefinition.setReleaseState(ReleaseState.OFFLINE); processDefinition.setUserId(loginUser.getId()); processDefinition.setDescription(desc); @@ -220,7 +217,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements processDefinition.setTimeout(processData.getTimeout()); processDefinition.setTenantId(processData.getTenantId()); processDefinition.setModifyBy(loginUser.getUserName()); - processDefinition.setResourceIds(getResourceIds(processData)); //custom global params List globalParamsList = processData.getGlobalParams(); @@ -234,7 +230,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements processDefinition.setFlag(Flag.YES); // save the new process definition - processDefineMapper.insert(processDefinition); + processDefinitionMapper.insert(processDefinition); // parse and save the taskDefinition and processTaskRelation try { @@ -273,11 +269,6 @@ public class ProcessDefinitionServiceImpl extends BaseService implements processDefinitionLog.setOperateTime(now); processDefinitionLogMapper.insert(processDefinitionLog); - // add process definition version - long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); - processDefinition.setVersion(version); - processDefineMapper.updateVersionByProcessDefinitionId(processDefinition.getId(), version); - // return processDefinition object with ID result.put(Constants.DATA_LIST, processDefinition.getId()); putMsg(result, Status.SUCCESS); @@ -341,7 +332,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - List resourceList = processDefineMapper.queryAllDefinitionList(project.getId()); + List resourceList = processDefinitionMapper.queryAllDefinitionList(project.getId()); result.put(Constants.DATA_LIST, resourceList); putMsg(result, Status.SUCCESS); @@ -372,7 +363,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } Page page = new Page<>(pageNo, pageSize); - IPage processDefinitionIPage = processDefineMapper.queryDefineListPaging( + IPage processDefinitionIPage = processDefinitionMapper.queryDefineListPaging( page, searchVal, userId, project.getId(), isAdmin(loginUser)); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); @@ -404,7 +395,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); } else { @@ -426,7 +417,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(project.getId(), processDefinitionName); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getId(), processDefinitionName); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionName); } else { @@ -472,43 +463,55 @@ public class ProcessDefinitionServiceImpl extends BaseService implements if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) { return checkProcessJson; } - ProcessDefinition processDefine = processService.findProcessDefineById(id); + // TODO processDefinitionMapper.queryByCode + ProcessDefinition processDefinition = processService.findProcessDefineById(id); // check process definition exists - if (processDefine == null) { + if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); return result; } - if (processDefine.getReleaseState() == ReleaseState.ONLINE) { + if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { // online can not permit edit - putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefine.getName()); + putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); return result; } - if (!name.equals(processDefine.getName())) { + if (!name.equals(processDefinition.getName())) { // check whether the new process define name exist - ProcessDefinition definition = processDefineMapper.verifyByDefineName(project.getId(), name); + ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getId(), name); if (definition != null) { putMsg(result, Status.VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR, name); return result; } } // get the processdefinitionjson before saving,and then save the name and taskid - String oldJson = processDefine.getProcessDefinitionJson(); - processDefinitionJson = processService.changeJson(processData,oldJson); - Date now = new Date(); + String oldJson = processDefinition.getProcessDefinitionJson(); + processDefinitionJson = processService.changeJson(processData, oldJson); - processDefine.setId(id); - processDefine.setName(name); - processDefine.setReleaseState(ReleaseState.OFFLINE); - processDefine.setProjectId(project.getId()); - processDefine.setProcessDefinitionJson(processDefinitionJson); - processDefine.setDescription(desc); - processDefine.setLocations(locations); - processDefine.setConnects(connects); - processDefine.setTimeout(processData.getTimeout()); - processDefine.setTenantId(processData.getTenantId()); - processDefine.setModifyBy(loginUser.getUserName()); - processDefine.setResourceIds(getResourceIds(processData)); + // update TaskDefinition + ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); + List taskNodeList = (newProcessData.getTasks() == null) ? new ArrayList<>() : newProcessData.getTasks(); + + for (TaskNode task : taskNodeList) { + // TODO update by code directly + Map stringObjectMap = taskDefinitionService.queryTaskDefinitionByName(loginUser, projectName, task.getName()); + TaskDefinition taskDefinition = (TaskDefinition) stringObjectMap.get(Constants.DATA_LIST); + taskDefinitionService.updateTaskDefinition(loginUser, projectName, taskDefinition.getCode(), JSONUtils.toJsonString(task)); + } + + List processDefinitionLogs = processDefinitionLogMapper.queryByDefinitionCode(processDefinition.getCode()); + int version = getNextVersion(processDefinitionLogs); + + Date now = new Date(); + processDefinition.setVersion(version); + processDefinition.setName(name); + processDefinition.setReleaseState(ReleaseState.OFFLINE); + processDefinition.setProjectCode(project.getCode()); + processDefinition.setDescription(desc); + processDefinition.setLocations(locations); + processDefinition.setConnects(connects); + processDefinition.setTimeout(processData.getTimeout()); + processDefinition.setTenantId(processData.getTenantId()); //custom global params List globalParamsList = new ArrayList<>(); @@ -516,23 +519,39 @@ public class ProcessDefinitionServiceImpl extends BaseService implements Set userDefParamsSet = new HashSet<>(processData.getGlobalParams()); globalParamsList = new ArrayList<>(userDefParamsSet); } - processDefine.setGlobalParamList(globalParamsList); - processDefine.setUpdateTime(now); - processDefine.setFlag(Flag.YES); + processDefinition.setGlobalParamList(globalParamsList); + processDefinition.setUpdateTime(now); + processDefinition.setFlag(Flag.YES); - // add process definition version - long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefine); - processDefine.setVersion(version); - if (processDefineMapper.updateById(processDefine) > 0) { + processDefinition.setVersion(version); + int update = processDefinitionMapper.updateById(processDefinition); + + // save processDefinitionLog + ProcessDefinitionLog processDefinitionLog = JSONUtils.parseObject( + JSONUtils.toJsonString(processDefinition), ProcessDefinitionLog.class); + + processDefinitionLog.setOperator(loginUser.getId()); + processDefinitionLog.setOperateTime(now); + int insert = processDefinitionLogMapper.insert(processDefinitionLog); + + if (update > 0 && insert > 0) { putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefineMapper.queryByDefineId(id)); + result.put(Constants.DATA_LIST, processDefinition); } else { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); } return result; } + private int getNextVersion(List processDefinitionLogs) { + return processDefinitionLogs + .stream() + .map(ProcessDefinitionLog::getVersion) + .max((x, y) -> x > y ? x : y) + .orElse(0) + 1; + } + /** * verify process definition name unique * @@ -552,7 +571,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements if (resultEnum != Status.SUCCESS) { return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.verifyByDefineName(project.getId(), name); + ProcessDefinition processDefinition = processDefinitionMapper.verifyByDefineName(project.getId(), name); if (processDefinition == null) { putMsg(result, Status.SUCCESS); } else { @@ -582,7 +601,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.selectById(processDefinitionId); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(processDefinitionId); // TODO: replace id to code // ProcessDefinition processDefinition = processDefineMapper.selectByCode(processDefinitionCode); @@ -628,7 +647,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements // TODO: replace id to code // ProcessDefinition processDefinition = processDefineMapper.deleteByCode(processDefinitionCode); - int delete = processDefineMapper.deleteById(processDefinitionId); + int delete = processDefinitionMapper.deleteById(processDefinitionId); if (delete > 0) { putMsg(result, Status.SUCCESS); @@ -665,7 +684,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return result; } - ProcessDefinition processDefinition = processDefineMapper.selectById(id); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(id); switch (releaseState) { case ONLINE: @@ -684,11 +703,11 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } processDefinition.setReleaseState(releaseState); - processDefineMapper.updateById(processDefinition); + processDefinitionMapper.updateById(processDefinition); break; case OFFLINE: processDefinition.setReleaseState(releaseState); - processDefineMapper.updateById(processDefinition); + processDefinitionMapper.updateById(processDefinition); List scheduleList = scheduleMapper.selectAllByProcessDefineArray( new int[]{processDefinition.getId()} ); @@ -748,7 +767,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements for (String strProcessDefinitionId : processDefinitionIdArray) { //get workflow info int processDefinitionId = Integer.parseInt(strProcessDefinitionId); - ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); if (null != processDefinition) { processDefinitionList.add(exportProcessMetaData(processDefinitionId, processDefinition)); } @@ -1143,14 +1162,14 @@ public class ProcessDefinitionServiceImpl extends BaseService implements //get sub process info ObjectNode subParams = (ObjectNode) taskNode.path("params"); Integer subProcessId = subParams.path(PROCESSDEFINITIONID).asInt(); - ProcessDefinition subProcess = processDefineMapper.queryByDefineId(subProcessId); + ProcessDefinition subProcess = processDefinitionMapper.queryByDefineId(subProcessId); //check is sub process exist in db if (null == subProcess) { continue; } String subProcessJson = subProcess.getProcessDefinitionJson(); //check current project has sub process - ProcessDefinition currentProjectSubProcess = processDefineMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); + ProcessDefinition currentProjectSubProcess = processDefinitionMapper.queryByDefineName(targetProject.getId(), subProcess.getName()); if (null == currentProjectSubProcess) { ArrayNode subJsonArray = (ArrayNode) JSONUtils.parseObject(subProcess.getProcessDefinitionJson()).get(TASKS); @@ -1194,12 +1213,12 @@ public class ProcessDefinitionServiceImpl extends BaseService implements processDefine.setUpdateTime(now); processDefine.setFlag(subProcess.getFlag()); processDefine.setWarningGroupId(subProcess.getWarningGroupId()); - processDefineMapper.insert(processDefine); + processDefinitionMapper.insert(processDefine); logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), processDefine.getName()); //modify task node - ProcessDefinition newSubProcessDefine = processDefineMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); + ProcessDefinition newSubProcessDefine = processDefinitionMapper.queryByDefineName(processDefine.getProjectId(), processDefine.getName()); if (null != newSubProcessDefine) { subProcessIdMap.put(subProcessId, newSubProcessDefine.getId()); @@ -1273,7 +1292,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements public Map getTaskNodeListByDefinitionId(Integer defineId) { Map result = new HashMap<>(); - ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(defineId); if (processDefinition == null) { logger.info("process define not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineId); @@ -1317,7 +1336,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements idIntList.add(Integer.parseInt(definitionId)); } Integer[] idArray = idIntList.toArray(new Integer[0]); - List processDefinitionList = processDefineMapper.queryDefinitionListByIdList(idArray); + List processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray); if (CollectionUtils.isEmpty(processDefinitionList)) { logger.info("process definition not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, defineIdList); @@ -1349,7 +1368,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements HashMap result = new HashMap<>(5); - List resourceList = processDefineMapper.queryAllDefinitionList(projectId); + List resourceList = processDefinitionMapper.queryAllDefinitionList(projectId); result.put(Constants.DATA_LIST, resourceList); putMsg(result, Status.SUCCESS); @@ -1368,7 +1387,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements public Map viewTree(Integer processId, Integer limit) throws Exception { Map result = new HashMap<>(); - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); if (null == processDefinition) { logger.info("process define not exists"); putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinition); @@ -1540,7 +1559,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements } private String recursionProcessDefinitionName(Integer projectId, String processDefinitionName, int num) { - ProcessDefinition processDefinition = processDefineMapper.queryByDefineName(projectId, processDefinitionName); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectId, processDefinitionName); if (processDefinition != null) { if (num > 1) { String str = processDefinitionName.substring(0, processDefinitionName.length() - 3); @@ -1560,7 +1579,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements Map result = new HashMap<>(5); - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); return result; @@ -1695,7 +1714,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return checkResult; } - ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefinitionId); if (Objects.isNull(processDefinition)) { putMsg(result , Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR @@ -1703,28 +1722,28 @@ public class ProcessDefinitionServiceImpl extends BaseService implements return result; } - ProcessDefinitionVersion processDefinitionVersion = processDefinitionVersionService - .queryByProcessDefinitionIdAndVersion(processDefinitionId, version); - if (Objects.isNull(processDefinitionVersion)) { + ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper + .queryByDefinitionCodeAndVersion(processDefinition.getCode(),version); + + if (Objects.isNull(processDefinitionLog)) { putMsg(result , Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_VERSION_ERROR - , processDefinitionId + , processDefinition.getCode() , version); return result; } - processDefinition.setVersion(processDefinitionVersion.getVersion()); - processDefinition.setProcessDefinitionJson(processDefinitionVersion.getProcessDefinitionJson()); - processDefinition.setDescription(processDefinitionVersion.getDescription()); - processDefinition.setLocations(processDefinitionVersion.getLocations()); - processDefinition.setConnects(processDefinitionVersion.getConnects()); - processDefinition.setTimeout(processDefinitionVersion.getTimeout()); - processDefinition.setGlobalParams(processDefinitionVersion.getGlobalParams()); + processDefinition.setVersion(processDefinitionLog.getVersion()); + processDefinition.setDescription(processDefinitionLog.getDescription()); + processDefinition.setLocations(processDefinitionLog.getLocations()); + processDefinition.setConnects(processDefinitionLog.getConnects()); + processDefinition.setTimeout(processDefinitionLog.getTimeout()); + processDefinition.setGlobalParams(processDefinitionLog.getGlobalParams()); processDefinition.setUpdateTime(new Date()); - processDefinition.setWarningGroupId(processDefinitionVersion.getWarningGroupId()); - processDefinition.setResourceIds(processDefinitionVersion.getResourceIds()); + processDefinition.setWarningGroupId(processDefinitionLog.getWarningGroupId()); + processDefinition.setResourceIds(processDefinitionLog.getResourceIds()); - if (processDefineMapper.updateById(processDefinition) > 0) { + if (processDefinitionMapper.updateById(processDefinition) > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); @@ -1784,7 +1803,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements * @param processDefinitionId processDefinitionId */ private void setFailedProcessList(List failedProcessList, String processDefinitionId) { - ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(Integer.valueOf(processDefinitionId)); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(Integer.valueOf(processDefinitionId)); if (processDefinition != null) { failedProcessList.add(processDefinitionId + "[" + processDefinition.getName() + "]"); } else { @@ -1823,7 +1842,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements Map result = new HashMap<>(); - ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + ProcessDefinition processDefinition = processDefinitionMapper.selectById(processId); if (processDefinition == null) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); return result; @@ -1831,7 +1850,7 @@ public class ProcessDefinitionServiceImpl extends BaseService implements processDefinition.setProjectId(targetProject.getId()); processDefinition.setUpdateTime(new Date()); - if (processDefineMapper.updateById(processDefinition) > 0) { + if (processDefinitionMapper.updateById(processDefinition) > 0) { putMsg(result, Status.SUCCESS); } else { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java index 08cedfee15..0776688121 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionVersionServiceImpl.java @@ -61,7 +61,7 @@ public class ProcessDefinitionVersionServiceImpl extends BaseService implements * @param processDefinition the process definition that need to record version * @return the newest version number of this process definition */ - public long addProcessDefinitionVersion(ProcessDefinition processDefinition) { + public int addProcessDefinitionVersion(ProcessDefinition processDefinition) { long version = this.queryMaxVersionByProcessDefinitionId(processDefinition.getId()) + 1; @@ -82,7 +82,7 @@ public class ProcessDefinitionVersionServiceImpl extends BaseService implements processDefinitionVersionMapper.insert(processDefinitionVersion); - return version; + return Integer.parseInt(String.valueOf(version)); } /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 4db533838c..d0ed52560c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -969,7 +969,7 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectName)).thenReturn(result); Mockito.when(processService.findProcessDefineById(1)).thenReturn(processDefinition); - Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1L); + Mockito.when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1); String sqlDependentJson = "{\n" + " \"globalParams\": [\n" diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionServiceTest.java index 5a8d09fdc5..5b5c20252e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionVersionServiceTest.java @@ -68,7 +68,7 @@ public class ProcessDefinitionVersionServiceTest { .queryMaxVersionByProcessDefinitionId(processDefinition.getId())) .thenReturn(expectedVersion); - long version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); + int version = processDefinitionVersionService.addProcessDefinitionVersion(processDefinition); Assert.assertEquals(expectedVersion + 1, version); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index de23d7570e..aeb56ea626 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -369,7 +369,7 @@ public class ProcessInstanceServiceTest { when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.updateProcessInstance(processInstance)).thenReturn(1); when(processDefinitionService.checkProcessNodeList(Mockito.any(), eq(shellJson))).thenReturn(result); - when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1L); + when(processDefinitionVersionService.addProcessDefinitionVersion(processDefinition)).thenReturn(1); Map processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectName, 1, shellJson, "2020-02-21 00:00:00", true, Flag.YES, "", ""); Assert.assertEquals(Status.UPDATE_PROCESS_INSTANCE_ERROR, processInstanceFinishRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java index 5cf81c2775..677925845d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java @@ -61,7 +61,7 @@ public class ProcessDefinition { /** * version */ - private long version; + private int version; /** * release state : online/offline @@ -192,11 +192,11 @@ public class ProcessDefinition { this.name = name; } - public long getVersion() { + public int getVersion() { return version; } - public void setVersion(long version) { + public void setVersion(int version) { this.version = version; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java index 02d03ce1e9..5cfb1cf890 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinitionLog.java @@ -61,7 +61,7 @@ public class ProcessDefinitionLog { /** * version */ - private long version; + private int version; /** * release state : online/offline @@ -165,6 +165,17 @@ public class ProcessDefinitionLog { */ private String modifyBy; + /** + * warningGroupId + */ + @TableField(exist = false) + private int warningGroupId; + + /** + * connects array for web + */ + private String connects; + /** * resource ids */ @@ -181,6 +192,22 @@ public class ProcessDefinitionLog { @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date operateTime; + public int getWarningGroupId() { + return warningGroupId; + } + + public void setWarningGroupId(int warningGroupId) { + this.warningGroupId = warningGroupId; + } + + public String getConnects() { + return connects; + } + + public void setConnects(String connects) { + this.connects = connects; + } + public int getOperator() { return operator; } @@ -221,11 +248,11 @@ public class ProcessDefinitionLog { this.name = name; } - public long getVersion() { + public int getVersion() { return version; } - public void setVersion(long version) { + public void setVersion(int version) { this.version = version; } @@ -403,32 +430,34 @@ public class ProcessDefinitionLog { @Override public String toString() { return "ProcessDefinitionLog{" - + "id=" + id - + ", code=" + code - + ", name='" + name + '\'' - + ", version=" + version - + ", releaseState=" + releaseState - + ", projectCode=" + projectCode - + ", description='" + description + '\'' - + ", globalParams='" + globalParams + '\'' - + ", globalParamList=" + globalParamList - + ", globalParamMap=" + globalParamMap - + ", createTime=" + createTime - + ", updateTime=" + updateTime - + ", flag=" + flag - + ", userId=" + userId - + ", userName='" + userName + '\'' - + ", projectName='" + projectName + '\'' - + ", locations='" + locations + '\'' - + ", receivers='" + receivers + '\'' - + ", receiversCc='" + receiversCc + '\'' - + ", scheduleReleaseState=" + scheduleReleaseState - + ", timeout=" + timeout - + ", tenantId=" + tenantId - + ", modifyBy='" + modifyBy + '\'' - + ", resourceIds='" + resourceIds + '\'' - + ", operator=" + operator - + ", operateTime=" + operateTime - + '}'; + + "id=" + id + + ", code=" + code + + ", name='" + name + '\'' + + ", version=" + version + + ", releaseState=" + releaseState + + ", projectCode=" + projectCode + + ", description='" + description + '\'' + + ", globalParams='" + globalParams + '\'' + + ", globalParamList=" + globalParamList + + ", globalParamMap=" + globalParamMap + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + ", flag=" + flag + + ", userId=" + userId + + ", userName='" + userName + '\'' + + ", projectName='" + projectName + '\'' + + ", locations='" + locations + '\'' + + ", receivers='" + receivers + '\'' + + ", receiversCc='" + receiversCc + '\'' + + ", scheduleReleaseState=" + scheduleReleaseState + + ", timeout=" + timeout + + ", tenantId=" + tenantId + + ", modifyBy='" + modifyBy + '\'' + + ", warningGroupId=" + warningGroupId + + ", connects='" + connects + '\'' + + ", resourceIds='" + resourceIds + '\'' + + ", operator=" + operator + + ", operateTime=" + operateTime + + '}'; } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java index dcf99a2651..80a426ad3d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -18,11 +18,8 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; - import org.apache.ibatis.annotations.Param; - import java.util.List; - import com.baomidou.mybatisplus.core.mapper.BaseMapper; /** @@ -48,4 +45,13 @@ public interface ProcessDefinitionLogMapper extends BaseMapper queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); + /** + * query the certain process definition version info by process definition code and version number + * + * @param processDefinitionCode process definition code + * @param version version number + * @return the process definition version info + */ + ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("processDefinitionCode") Long processDefinitionCode, @Param("version") long version); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml index e722cf53a3..80cca29f35 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml @@ -20,15 +20,17 @@ - pd.id, pd.code, pd.name, pd.version, pd.description, pd.project_code, - pd.release_state, pd.user_id,pd.global_params, pd.flag, pd.locations, pd.connects, - pd.warning_group_id, pd.timeout, pd.tenant_id,pd.operator, pd.operate_time, pd.create_time, - pd.update_time, u.user_name,p.name as project_name + id, code, name, version, description, project_code, + release_state, user_id,global_params, flag, locations, connects, + warning_group_id, timeout, tenant_id,operator, operate_time, create_time, + update_time select - from t_ds_process_definition_log pd - JOIN t_ds_user u ON pd.user_id = u.id - JOIN t_ds_project p ON pd.project_code = p.code + from t_ds_process_definition_log WHERE pd.code = #{processDefinitionCode} + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java index 02b5f12321..daa5552b0d 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapperTest.java @@ -358,7 +358,7 @@ public class ProcessDefinitionMapperTest { @Test public void testUpdateVersionByProcessDefinitionId() { - long expectedVersion = 10; + int expectedVersion = 10; ProcessDefinition processDefinition = insertOne(); processDefinition.setVersion(expectedVersion); processDefinitionMapper.updateVersionByProcessDefinitionId(