diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/Instance.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/Instance.java index e13a7d1440..c39ece8ae6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/Instance.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/dto/treeview/Instance.java @@ -71,7 +71,7 @@ public class Instance { */ private String duration; - private int subflowId; + private long subflowCode; public Instance() { } @@ -83,7 +83,7 @@ public class Instance { this.type = type; } - public Instance(int id, String name, long code, String type, String state, Date startTime, Date endTime, String host, String duration, int subflowId) { + public Instance(int id, String name, long code, String type, String state, Date startTime, Date endTime, String host, String duration, long subflowCode) { this.id = id; this.name = name; this.code = code; @@ -93,7 +93,7 @@ public class Instance { this.endTime = endTime; this.host = host; this.duration = duration; - this.subflowId = subflowId; + this.subflowCode = subflowCode; } public Instance(int id, String name, long code, String type, String state, Date startTime, Date endTime, String host, String duration) { @@ -173,11 +173,11 @@ public class Instance { this.duration = duration; } - public int getSubflowId() { - return subflowId; + public long getSubflowCode() { + return subflowCode; } - public void setSubflowId(int subflowId) { - this.subflowId = subflowId; + public void setSubflowCode(long subflowCode) { + this.subflowCode = subflowCode; } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index e0e654a902..02e5b5785c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -465,11 +465,10 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ return result; } - List ids = new ArrayList<>(); - processService.recurseFindSubProcessId(processDefinition.getId(), ids); - Integer[] idArray = ids.toArray(new Integer[ids.size()]); - if (!ids.isEmpty()) { - List processDefinitionList = processDefinitionMapper.queryDefinitionListByIdList(idArray); + List codes = new ArrayList<>(); + processService.recurseFindSubProcess(processDefinition.getCode(), codes); + if (!codes.isEmpty()) { + List processDefinitionList = processDefinitionMapper.queryByCodes(codes); if (processDefinitionList != null) { for (ProcessDefinition processDefinitionTmp : processDefinitionList) { /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 7ef26b0dbf..4c126a794b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import org.apache.dolphinscheduler.api.dto.DagDataSchedule; import org.apache.dolphinscheduler.api.dto.treeview.Instance; @@ -33,7 +33,6 @@ import org.apache.dolphinscheduler.api.utils.FileUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.graph.DAG; @@ -68,7 +67,6 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; @@ -621,9 +619,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete process definition by code * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return delete result code */ @Override @@ -692,9 +690,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * release process definition: online / offline * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code + * @param loginUser login user + * @param projectCode project code + * @param code process definition code * @param releaseState release state * @return release result code */ @@ -725,7 +723,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setReleaseState(releaseState); int updateProcess = processDefinitionMapper.updateById(processDefinition); List scheduleList = scheduleMapper.selectAllByProcessDefineArray( - new long[]{processDefinition.getCode()} + new long[]{processDefinition.getCode()} ); if (updateProcess > 0 && scheduleList.size() == 1) { Schedule schedule = scheduleList.get(0); @@ -824,9 +822,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * import process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param file process metadata json file + * @param file process metadata json file * @return import process */ @Override @@ -1034,9 +1032,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return task node list */ @Override @@ -1063,9 +1061,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details map based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param codes define codes + * @param codes define codes * @return task node list */ @Override @@ -1100,7 +1098,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition all by project code * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code * @return process definitions in the project */ @@ -1122,7 +1120,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * Encapsulates the TreeView structure * - * @param code process definition code + * @param code process definition code * @param limit limit * @return tree view json data */ @@ -1196,16 +1194,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Date startTime = taskInstance.getStartTime() == null ? new Date() : taskInstance.getStartTime(); Date endTime = taskInstance.getEndTime() == null ? new Date() : taskInstance.getEndTime(); - int subProcessId = 0; + long subProcessCode = 0L; // if process is sub process, the return sub id, or sub id=0 if (taskInstance.isSubProcess()) { TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); - subProcessId = Integer.parseInt(JSONUtils.parseObject( - taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_ID).asText()); + subProcessCode = Integer.parseInt(JSONUtils.parseObject( + taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText()); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(), taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), - taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessId)); + taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode)); } } for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { @@ -1266,9 +1264,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch copy process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1289,9 +1287,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch move process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1390,7 +1388,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * switch the defined process definition version * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @param code process definition code * @param version the version user want to switch diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java index cf1c833765..1a4e529909 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java @@ -349,12 +349,11 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe return result; } // check sub process definition release state - List subProcessDefineIds = new ArrayList<>(); - processService.recurseFindSubProcessId(processDefinition.getId(), subProcessDefineIds); - Integer[] idArray = subProcessDefineIds.toArray(new Integer[subProcessDefineIds.size()]); - if (!subProcessDefineIds.isEmpty()) { + List subProcessDefineCodes = new ArrayList<>(); + processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes); + if (!subProcessDefineCodes.isEmpty()) { List subProcessDefinitionList = - processDefinitionMapper.queryDefinitionListByIdList(idArray); + processDefinitionMapper.queryByCodes(subProcessDefineCodes); if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { /** diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 9766c61dff..bb279eabab 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -279,9 +279,9 @@ public class ExecutorServiceTest { @Test public void testStartCheckByProcessDefinedCode() { - List ids = new ArrayList<>(); - ids.add(1); - Mockito.doNothing().when(processService).recurseFindSubProcessId(1, ids); + List ids = new ArrayList<>(); + ids.add(1L); + Mockito.doNothing().when(processService).recurseFindSubProcess(1, ids); List processDefinitionList = new ArrayList<>(); ProcessDefinition processDefinition = new ProcessDefinition(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java index 2f15929240..71b0f929cd 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/CheckUtilsTest.java @@ -128,7 +128,7 @@ public class CheckUtilsTest { taskNode.setType(TaskType.SUB_PROCESS.getDesc()); assertFalse(CheckUtils.checkTaskNodeParameters(taskNode)); - subProcessParameters.setProcessDefinitionId(1234); + subProcessParameters.setProcessDefinitionCode(1234L); taskNode.setParams(JSONUtils.toJsonString(subProcessParameters)); taskNode.setType(TaskType.SUB_PROCESS.getDesc()); assertTrue(CheckUtils.checkTaskNodeParameters(taskNode)); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 20e145bf8f..91ae62ae7d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -431,7 +431,7 @@ public final class Constants { public static final String CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID = "parentProcessInstanceId"; - public static final String CMD_PARAM_SUB_PROCESS_DEFINE_ID = "processDefinitionId"; + public static final String CMD_PARAM_SUB_PROCESS_DEFINE_CODE = "processDefinitionCode"; public static final String CMD_PARAM_START_NODE_NAMES = "StartNodeNameList"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java index 46f0e8510c..21a4f3e52a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/subprocess/SubProcessParameters.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.dolphinscheduler.common.task.subprocess; + import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -27,19 +28,19 @@ public class SubProcessParameters extends AbstractParameters { /** * process definition id */ - private Integer processDefinitionId; + private long processDefinitionCode; - public void setProcessDefinitionId(Integer processDefinitionId){ - this.processDefinitionId = processDefinitionId; + public void setProcessDefinitionCode(long processDefinitionCode) { + this.processDefinitionCode = processDefinitionCode; } - public Integer getProcessDefinitionId(){ - return this.processDefinitionId; + public long getProcessDefinitionCode() { + return this.processDefinitionCode; } @Override public boolean checkParameters() { - return this.processDefinitionId != null && this.processDefinitionId != 0; + return this.processDefinitionCode != 0; } @Override diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index 9109be59ff..2087c6e5f6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -648,6 +648,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { ObjectNode task = (ObjectNode) tasks.path(i); ObjectNode param = (ObjectNode) task.get("params"); TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + String taskType = task.get("type").asText(); if (param != null) { JsonNode resourceJsonNode = param.get("resourceList"); if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) { @@ -657,6 +658,13 @@ public abstract class UpgradeDao extends AbstractBaseDao { } else { taskDefinitionLog.setResourceIds(StringUtils.EMPTY); } + if (TaskType.SUB_PROCESS.getDesc().equals(taskType)) { + JsonNode jsonNodeDefinitionId = param.get("processDefinitionId"); + if (jsonNodeDefinitionId != null) { + param.put("processDefinitionCode", processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode()); + param.remove("processDefinitionId"); + } + } param.put("conditionResult", task.get("conditionResult")); param.put("dependence", task.get("dependence")); taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param)); @@ -669,9 +677,9 @@ public abstract class UpgradeDao extends AbstractBaseDao { } taskDefinitionLog.setDescription(task.get("description").asText()); taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO); - taskDefinitionLog.setTaskType(task.get("type").asText()); - taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt()); - taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt()); + taskDefinitionLog.setTaskType(taskType); + taskDefinitionLog.setFailRetryInterval(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 1 : task.get("retryInterval").asInt()); + taskDefinitionLog.setFailRetryTimes(TaskType.SUB_PROCESS.getDesc().equals(taskType) ? 0 : task.get("maxRetryTimes").asInt()); taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class)); String name = task.get("name").asText(); taskDefinitionLog.setName(name); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index a203f78adb..2e5fac8227 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -312,7 +312,6 @@ public class TaskInstanceMapperTest { definition.setCreateTime(new Date()); definition.setUpdateTime(new Date()); processDefinitionMapper.insert(definition); - //task.setProcessDefinitionId(definition.getId()); taskInstanceMapper.updateById(task); int countTask = taskInstanceMapper.countTask( @@ -348,7 +347,6 @@ public class TaskInstanceMapperTest { definition.setCreateTime(new Date()); definition.setUpdateTime(new Date()); processDefinitionMapper.insert(definition); - //task.setProcessDefinitionId(definition.getId()); taskInstanceMapper.updateById(task); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index caee8111aa..10621af4bd 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -23,7 +23,7 @@ import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_P import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; @@ -258,18 +258,6 @@ public class ProcessService { return null; } - /** - * check thread num - * - * @param command command - * @param validThreadNum validThreadNum - * @return if thread is enough - */ - private boolean checkThreadNum(Command command, int validThreadNum) { - int commandThreadCount = this.workProcessThreadNumCount(command.getProcessDefinitionCode()); - return validThreadNum >= commandThreadCount; - } - /** * insert one command * @@ -341,8 +329,8 @@ public class ProcessService { /** * get task node list by definitionId */ - public List getTaskNodeListByDefinitionId(Integer defineId) { - ProcessDefinition processDefinition = processDefineMapper.selectById(defineId); + public List getTaskNodeListByDefinition(long defineCode) { + ProcessDefinition processDefinition = processDefineMapper.queryByCode(defineCode); if (processDefinition == null) { logger.error("process define not exists"); return new ArrayList<>(); @@ -464,38 +452,24 @@ public class ProcessService { } } - /** - * calculate sub process number in the process define. - * - * @param processDefinitionCode processDefinitionCode - * @return process thread num count - */ - private Integer workProcessThreadNumCount(long processDefinitionCode) { - ProcessDefinition processDefinition = processDefineMapper.queryByCode(processDefinitionCode); - - List ids = new ArrayList<>(); - recurseFindSubProcessId(processDefinition.getId(), ids); - return ids.size() + 1; - } - /** * recursive query sub process definition id by parent id. * - * @param parentId parentId + * @param parentCode parentCode * @param ids ids */ - public void recurseFindSubProcessId(int parentId, List ids) { - List taskNodeList = this.getTaskNodeListByDefinitionId(parentId); + public void recurseFindSubProcess(long parentCode, List ids) { + List taskNodeList = this.getTaskNodeListByDefinition(parentCode); if (taskNodeList != null && !taskNodeList.isEmpty()) { for (TaskDefinition taskNode : taskNodeList) { String parameter = taskNode.getTaskParams(); ObjectNode parameterJson = JSONUtils.parseObject(parameter); - if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_ID) != null) { + if (parameterJson.get(CMD_PARAM_SUB_PROCESS_DEFINE_CODE) != null) { SubProcessParameters subProcessParam = JSONUtils.parseObject(parameter, SubProcessParameters.class); - ids.add(subProcessParam.getProcessDefinitionId()); - recurseFindSubProcessId(subProcessParam.getProcessDefinitionId(), ids); + ids.add(subProcessParam.getProcessDefinitionCode()); + recurseFindSubProcess(subProcessParam.getProcessDefinitionCode(), ids); } } } @@ -607,7 +581,6 @@ public class ProcessService { processInstance.setStartTime(new Date()); processInstance.setRunTimes(1); processInstance.setMaxTryTimes(0); - //processInstance.setProcessDefinitionId(command.getProcessDefinitionId()); processInstance.setCommandParam(command.getCommandParam()); processInstance.setCommandType(command.getCommandType()); processInstance.setIsSubProcess(Flag.NO); @@ -1236,8 +1209,8 @@ public class ProcessService { TaskInstance task) { CommandType commandType = getSubCommandType(parentProcessInstance, childInstance); Map subProcessParam = JSONUtils.toMap(task.getTaskParams()); - int childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID)); - ProcessDefinition subProcessDefinition = processDefineMapper.queryByDefineId(childDefineId); + int childDefineCode = Integer.parseInt(subProcessParam.get(Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE)); + ProcessDefinition subProcessDefinition = processDefineMapper.queryByCode(childDefineCode); Object localParams = subProcessParam.get(Constants.LOCAL_PARAMS); List allParam = JSONUtils.toList(JSONUtils.toJsonString(localParams), Property.class); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 21125639e8..45b195239f 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.service.process; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; import static org.mockito.ArgumentMatchers.any; import org.apache.dolphinscheduler.common.Constants; @@ -127,7 +127,7 @@ public class ProcessServiceTest { parentInstance.setWarningGroupId(0); TaskInstance task = new TaskInstance(); - task.setTaskParams("{\"processDefinitionId\":100}}"); + task.setTaskParams("{\"processDefinitionCode\":10}}"); task.setId(10); task.setTaskCode(1L); task.setTaskDefinitionVersion(1); @@ -142,8 +142,8 @@ public class ProcessServiceTest { parentInstance.setHistoryCmd("START_PROCESS"); parentInstance.setCommandType(CommandType.START_PROCESS); ProcessDefinition processDefinition = new ProcessDefinition(); - processDefinition.setCode(1L); - Mockito.when(processDefineMapper.queryByDefineId(100)).thenReturn(processDefinition); + processDefinition.setCode(10L); + Mockito.when(processDefineMapper.queryByCode(10L)).thenReturn(processDefinition); command = processService.createSubProcessCommand(parentInstance, childInstance, instanceMap, task); Assert.assertEquals(CommandType.START_PROCESS, command.getCommandType()); @@ -245,7 +245,7 @@ public class ProcessServiceTest { command.setProcessDefinitionCode(222); command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" - + CMD_PARAM_SUB_PROCESS_DEFINE_ID + "\":\"222\"}"); + + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps)); int definitionVersion = 1; @@ -346,7 +346,6 @@ public class ProcessServiceTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setCode(parentProcessDefineCode); processDefinition.setVersion(parentProcessDefineVersion); - Mockito.when(processDefineMapper.selectById(parentProcessDefineId)).thenReturn(processDefinition); long postTaskCode = 2L; int postTaskVersion = 2; @@ -356,19 +355,16 @@ public class ProcessServiceTest { processTaskRelationLog.setPostTaskCode(postTaskCode); processTaskRelationLog.setPostTaskVersion(postTaskVersion); relationLogList.add(processTaskRelationLog); - Mockito.when(processTaskRelationLogMapper.queryByProcessCodeAndVersion(parentProcessDefineCode - , parentProcessDefineVersion)).thenReturn(relationLogList); List taskDefinitionLogs = new ArrayList<>(); TaskDefinitionLog taskDefinitionLog1 = new TaskDefinitionLog(); - taskDefinitionLog1.setTaskParams("{\"processDefinitionId\": 123}"); + taskDefinitionLog1.setTaskParams("{\"processDefinitionCode\": 123L}"); taskDefinitionLogs.add(taskDefinitionLog1); - Mockito.when(taskDefinitionLogMapper.queryByTaskDefinitions(Mockito.anySet())).thenReturn(taskDefinitionLogs); - List ids = new ArrayList<>(); - processService.recurseFindSubProcessId(parentProcessDefineId, ids); + List ids = new ArrayList<>(); + processService.recurseFindSubProcess(parentProcessDefineCode, ids); - Assert.assertEquals(1, ids.size()); + Assert.assertEquals(0, ids.size()); } @Test diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue index e0041e15ae..5e31b51357 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sub_process.vue @@ -29,7 +29,7 @@ @@ -72,7 +72,7 @@ return false } this.$emit('on-params', { - processDefinitionId: this.wdiCurr + processDefinitionCode: this.wdiCurr }) return true }, @@ -85,8 +85,8 @@ /** * Return the name according to the process definition id */ - _handleName (id) { - return _.filter(this.processDefinitionList, v => id === v.id)[0].name + _handleName (code) { + return _.filter(this.processDefinitionList, v => code === v.code)[0].name }, /** * Get all processDefinition list @@ -101,7 +101,7 @@ watch: { wdiCurr (val) { this.$emit('on-cache-params', { - processDefinitionId: this.wdiCurr + processDefinitionCode: this.wdiCurr }) } }, @@ -124,10 +124,10 @@ let o = this.backfillItem // Non-null objects represent backfill if (!_.isEmpty(o)) { - this.wdiCurr = o.params.processDefinitionId + this.wdiCurr = o.params.processDefinitionCode } else { if (this.processDefinitionList.length) { - this.wdiCurr = this.processDefinitionList[0].id + this.wdiCurr = this.processDefinitionList[0].code this.$emit('on-set-process-name', this._handleName(this.wdiCurr)) } }