From d833a28b2e312c9becd3d356f71f124317bad7f9 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 16 Sep 2022 13:54:18 +0800 Subject: [PATCH] Fix cannot save processDefinition (#11931) --- .../impl/ProcessDefinitionServiceImpl.java | 122 +++------ .../service/ProcessDefinitionServiceTest.java | 14 +- .../common/utils/CodeGenerateUtils.java | 11 +- .../dolphinscheduler/dao/entity/Command.java | 132 --------- .../dao/entity/ProcessDefinition.java | 250 +----------------- .../dao/entity/ProcessTaskRelation.java | 30 +-- .../dao/mapper/TaskInstanceMapperTest.java | 37 +-- .../service/process/ProcessServiceImpl.java | 129 +++++---- 8 files changed, 143 insertions(+), 582 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index ef0a16a816..d2e957bd58 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -261,43 +261,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } if (checkDescriptionLength(description)) { - putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); - return result; + throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR); } // check whether the new process define name exist ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); if (definition != null) { - putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); - return result; - } - List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); - Map checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson); - if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) { - return checkTaskDefinitions; - } - List taskRelationList = - JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); - Map checkRelationJson = - checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs); - if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) { - return checkRelationJson; + throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, name); } + List taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson); + List taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs); int tenantId = -1; if (!Constants.DEFAULT.equals(tenantCode)) { Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); if (tenant == null) { - putMsg(result, Status.TENANT_NOT_EXIST); - return result; + throw new ServiceException(Status.TENANT_NOT_EXIST); } tenantId = tenant.getId(); } - long processDefinitionCode; - try { - processDefinitionCode = CodeGenerateUtils.getInstance().genCode(); - } catch (CodeGenerateException e) { - putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); - return result; - } + long processDefinitionCode = CodeGenerateUtils.getInstance().genCode(); ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, globalParams, locations, timeout, loginUser.getId(), tenantId); @@ -317,66 +298,63 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro logger.info("The task has not changed, so skip"); } if (saveTaskResult == Constants.DEFINITION_FAILURE) { - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE); if (insertVersion == 0) { - putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); } int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE); - if (insertResult == Constants.EXIT_CODE_SUCCESS) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - } else { - putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + if (insertResult != Constants.EXIT_CODE_SUCCESS) { throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); } saveOtherRelation(loginUser, processDefinition, result, otherParamsJson); + + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); return result; } - private Map checkTaskDefinitionList(List taskDefinitionLogs, - String taskDefinitionJson) { - Map result = new HashMap<>(); + private List generateTaskDefinitionList(String taskDefinitionJson) { try { - if (taskDefinitionLogs.isEmpty()) { - logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson); - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); - return result; + List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); + if (CollectionUtils.isEmpty(taskDefinitionLogs)) { + logger.error("Generate task definition list failed, the given taskDefinitionJson is invalided: {}", + taskDefinitionJson); + throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson); } for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!taskPluginManager.checkTaskParameters(ParametersNode.builder() .taskType(taskDefinitionLog.getTaskType()) .taskParams(taskDefinitionLog.getTaskParams()) .dependence(taskDefinitionLog.getDependence()) .build())) { - logger.error("task definition {} parameter invalid", taskDefinitionLog.getName()); - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); - return result; + logger.error( + "Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}", + taskDefinitionLog.getName(), taskDefinitionLog); + throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); } } - putMsg(result, Status.SUCCESS); + return taskDefinitionLogs; + } catch (ServiceException ex) { + throw ex; } catch (Exception e) { - result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); - result.put(Constants.MSG, e.getMessage()); + logger.error("Generate task definition list failed, meet an unknown exception", e); + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR); } - return result; } - private Map checkTaskRelationList(List taskRelationList, - String taskRelationJson, - List taskDefinitionLogs) { - Map result = new HashMap<>(); + private List generateTaskRelationList(String taskRelationJson, + List taskDefinitionLogs) { try { - if (taskRelationList == null || taskRelationList.isEmpty()) { - logger.error("task relation list is null"); - putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson); - return result; + List taskRelationList = + JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); + if (CollectionUtils.isEmpty(taskRelationList)) { + logger.error("Generate task relation list failed the taskRelation list is empty, taskRelationJson: {}", + taskRelationJson); + throw new ServiceException(Status.DATA_IS_NOT_VALID); } List processTaskRelations = taskRelationList.stream() .map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog), @@ -390,31 +368,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro Collection codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes); if (CollectionUtils.isNotEmpty(codes)) { logger.error("the task code is not exist"); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, - StringUtils.join(codes, Constants.COMMA)); - return result; + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA)); } } if (graphHasCycle(taskNodeList)) { logger.error("process DAG has cycle"); - putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); - return result; + throw new ServiceException(Status.PROCESS_NODE_HAS_CYCLE); } // check whether the task relation json is normal for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { if (processTaskRelationLog.getPostTaskCode() == 0) { logger.error("the post_task_code or post_task_version can't be zero"); - putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR); - return result; + throw new ServiceException(Status.CHECK_PROCESS_TASK_RELATION_ERROR); } } - putMsg(result, Status.SUCCESS); + return taskRelationList; + } catch (ServiceException ex) { + throw ex; } catch (Exception e) { - result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); - result.put(Constants.MSG, e.getMessage()); + logger.error("Check task relation list error, meet an unknown exception, given taskRelationJson: {}", + taskRelationJson, e); + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR); } - return result; } /** @@ -620,18 +596,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); return result; } - List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); - Map checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson); - if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) { - return checkTaskDefinitions; - } - List taskRelationList = - JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); - Map checkRelationJson = - checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs); - if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) { - return checkRelationJson; - } + List taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson); + List taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs); int tenantId = -1; if (!Constants.DEFAULT.equals(tenantCode)) { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 05c63ab537..e4401cd71d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -96,9 +96,6 @@ import org.springframework.mock.web.MockMultipartFile; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -/** - * process definition service test - */ @RunWith(MockitoJUnitRunner.class) public class ProcessDefinitionServiceTest { @@ -756,10 +753,13 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE)) .thenReturn(result); - Map updateResult = - processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1, - "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL); - Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS)); + try { + processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1, + "", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL); + Assert.fail(); + } catch (ServiceException ex) { + Assert.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ex.getCode()); + } } @Test diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java index ffea87be51..f35523b59d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java @@ -10,8 +10,9 @@ import java.util.Objects; * Rewriting based on Twitter snowflake algorithm */ public class CodeGenerateUtils { + // start timestamp - private static final long START_TIMESTAMP = 1609430400000L; //2021-01-01 00:00:00 + private static final long START_TIMESTAMP = 1609430400000L; // 2021-01-01 00:00:00 // Each machine generates 32 in the same millisecond private static final long LOW_DIGIT_BIT = 5L; private static final long MIDDLE_BIT = 2L; @@ -24,11 +25,12 @@ public class CodeGenerateUtils { private long recordMillisecond = -1L; private static final long SYSTEM_TIMESTAMP = System.currentTimeMillis(); - private static final long SYSTEM_NANOTIME = System.nanoTime(); + private static final long SYSTEM_NANOTIME = System.nanoTime(); private CodeGenerateUtils() throws CodeGenerateException { try { - this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1)); + this.machineHash = + Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1)); } catch (UnknownHostException e) { throw new CodeGenerateException(e.getMessage()); } @@ -66,7 +68,8 @@ public class CodeGenerateUtils { return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000; } - public static class CodeGenerateException extends Exception { + public static class CodeGenerateException extends RuntimeException { + public CodeGenerateException(String message) { super(message); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index 12a094139a..ec9336d501 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -36,27 +36,15 @@ import com.baomidou.mybatisplus.annotation.TableName; @TableName("t_ds_command") public class Command { - /** - * id - */ @TableId(value = "id", type = IdType.AUTO) private Integer id; - /** - * command type - */ @TableField("command_type") private CommandType commandType; - /** - * process definition code - */ @TableField("process_definition_code") private long processDefinitionCode; - /** - * executor id - */ @TableField("executor_id") private int executorId; @@ -66,69 +54,36 @@ public class Command { @TableField("command_param") private String commandParam; - /** - * task depend type - */ @TableField("task_depend_type") private TaskDependType taskDependType; - /** - * failure strategy - */ @TableField("failure_strategy") private FailureStrategy failureStrategy; - /** - * warning type - */ @TableField("warning_type") private WarningType warningType; - /** - * warning group id - */ @TableField("warning_group_id") private Integer warningGroupId; - /** - * schedule time - */ @TableField("schedule_time") private Date scheduleTime; - /** - * start time - */ @TableField("start_time") private Date startTime; - /** - * process instance priority - */ @TableField("process_instance_priority") private Priority processInstancePriority; - /** - * update time - */ @TableField("update_time") private Date updateTime; - /** - * worker group - */ @TableField("worker_group") private String workerGroup; - /** - * environment code - */ @TableField("environment_code") private Long environmentCode; - /** - * dry run flag - */ @TableField("dry_run") private int dryRun; @@ -180,91 +135,4 @@ public class Command { this.processDefinitionVersion = processDefinitionVersion; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - Command command = (Command) o; - - if (id != command.id) { - return false; - } - if (processDefinitionCode != command.processDefinitionCode) { - return false; - } - if (executorId != command.executorId) { - return false; - } - if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) { - return false; - } - - if (environmentCode != null ? environmentCode.equals(command.environmentCode) - : command.environmentCode == null) { - return false; - } - - if (commandType != command.commandType) { - return false; - } - if (commandParam != null ? !commandParam.equals(command.commandParam) : command.commandParam != null) { - return false; - } - if (taskDependType != command.taskDependType) { - return false; - } - if (failureStrategy != command.failureStrategy) { - return false; - } - if (warningType != command.warningType) { - return false; - } - if (warningGroupId != null ? !warningGroupId.equals(command.warningGroupId) : command.warningGroupId != null) { - return false; - } - if (scheduleTime != null ? !scheduleTime.equals(command.scheduleTime) : command.scheduleTime != null) { - return false; - } - if (startTime != null ? !startTime.equals(command.startTime) : command.startTime != null) { - return false; - } - if (processInstancePriority != command.processInstancePriority) { - return false; - } - if (processInstanceId != command.processInstanceId) { - return false; - } - if (processDefinitionVersion != command.getProcessDefinitionVersion()) { - return false; - } - return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null); - } - - @Override - public int hashCode() { - int result = id; - result = 31 * result + (commandType != null ? commandType.hashCode() : 0); - result = 31 * result + Long.hashCode(processDefinitionCode); - result = 31 * result + executorId; - result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0); - result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0); - result = 31 * result + (failureStrategy != null ? failureStrategy.hashCode() : 0); - result = 31 * result + (warningType != null ? warningType.hashCode() : 0); - result = 31 * result + (warningGroupId != null ? warningGroupId.hashCode() : 0); - result = 31 * result + (scheduleTime != null ? scheduleTime.hashCode() : 0); - result = 31 * result + (startTime != null ? startTime.hashCode() : 0); - result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0); - result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0); - result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0); - result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0); - result = 31 * result + dryRun; - result = 31 * result + processInstanceId; - result = 31 * result + processDefinitionVersion; - return result; - } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java index 089a035373..6e7414c286 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java @@ -27,18 +27,23 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.google.common.base.Strings; -/** - * process definition - */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor @TableName("t_ds_process_definition") public class ProcessDefinition { @@ -171,9 +176,6 @@ public class ProcessDefinition { */ private ProcessExecutionTypeEnum executionType; - public ProcessDefinition() { - } - public ProcessDefinition(long projectCode, String name, long code, @@ -208,90 +210,6 @@ public class ProcessDefinition { this.flag = Flag.YES; } - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public int getVersion() { - return version; - } - - public void setVersion(int version) { - this.version = version; - } - - public Integer getId() { - return id; - } - - public void setId(Integer id) { - this.id = id; - } - - public ReleaseState getReleaseState() { - return releaseState; - } - - public void setReleaseState(ReleaseState releaseState) { - this.releaseState = releaseState; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getUpdateTime() { - return updateTime; - } - - public void setUpdateTime(Date updateTime) { - this.updateTime = updateTime; - } - - public Flag getFlag() { - return flag; - } - - public void setFlag(Flag flag) { - this.flag = flag; - } - - public int getUserId() { - return userId; - } - - public void setUserId(int userId) { - this.userId = userId; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public String getProjectName() { - return projectName; - } - - public void setProjectName(String projectName) { - this.projectName = projectName; - } - - public String getGlobalParams() { - return globalParams; - } - public void setGlobalParams(String globalParams) { this.globalParamList = JSONUtils.toList(globalParams, Property.class); if (this.globalParamList == null) { @@ -300,14 +218,6 @@ public class ProcessDefinition { this.globalParams = globalParams; } - public List getGlobalParamList() { - return globalParamList; - } - - public void setGlobalParamList(List globalParamList) { - this.globalParamList = globalParamList; - } - public Map getGlobalParamMap() { if (globalParamMap == null && !Strings.isNullOrEmpty(globalParams)) { List propList = JSONUtils.toList(globalParams, Property.class); @@ -317,146 +227,4 @@ public class ProcessDefinition { return globalParamMap; } - public void setGlobalParamMap(Map globalParamMap) { - this.globalParamMap = globalParamMap; - } - - public String getLocations() { - return locations; - } - - public void setLocations(String locations) { - this.locations = locations; - } - - public ReleaseState getScheduleReleaseState() { - return scheduleReleaseState; - } - - public void setScheduleReleaseState(ReleaseState scheduleReleaseState) { - this.scheduleReleaseState = scheduleReleaseState; - } - - public int getTimeout() { - return timeout; - } - - public void setTimeout(int timeout) { - this.timeout = timeout; - } - - public int getTenantId() { - return tenantId; - } - - public void setTenantId(int tenantId) { - this.tenantId = tenantId; - } - - public String getTenantCode() { - return tenantCode; - } - - public void setTenantCode(String tenantCode) { - this.tenantCode = tenantCode; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public String getModifyBy() { - return modifyBy; - } - - public void setModifyBy(String modifyBy) { - this.modifyBy = modifyBy; - } - - public long getCode() { - return code; - } - - public void setCode(long code) { - this.code = code; - } - - public long getProjectCode() { - return projectCode; - } - - public void setProjectCode(long projectCode) { - this.projectCode = projectCode; - } - - public int getWarningGroupId() { - return warningGroupId; - } - - public void setWarningGroupId(int warningGroupId) { - this.warningGroupId = warningGroupId; - } - - public ProcessExecutionTypeEnum getExecutionType() { - return executionType; - } - - public void setExecutionType(ProcessExecutionTypeEnum executionType) { - this.executionType = executionType; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ProcessDefinition that = (ProcessDefinition) o; - return projectCode == that.projectCode - && userId == that.userId - && timeout == that.timeout - && tenantId == that.tenantId - && Objects.equals(name, that.name) - && releaseState == that.releaseState - && Objects.equals(description, that.description) - && Objects.equals(globalParams, that.globalParams) - && flag == that.flag - && executionType == that.executionType - && Objects.equals(locations, that.locations); - } - - @Override - public String toString() { - return "ProcessDefinition{" - + "id=" + id - + ", code=" + code - + ", name='" + name + '\'' - + ", version=" + version - + ", releaseState=" + releaseState - + ", projectCode=" + projectCode - + ", description='" + description + '\'' - + ", globalParams='" + globalParams + '\'' - + ", globalParamList=" + globalParamList - + ", globalParamMap=" + globalParamMap - + ", createTime=" + createTime - + ", updateTime=" + updateTime - + ", flag=" + flag - + ", userId=" + userId - + ", userName='" + userName + '\'' - + ", projectName='" + projectName + '\'' - + ", locations='" + locations + '\'' - + ", scheduleReleaseState=" + scheduleReleaseState - + ", timeout=" + timeout - + ", tenantId=" + tenantId - + ", tenantCode='" + tenantCode + '\'' - + ", modifyBy='" + modifyBy + '\'' - + ", warningGroupId=" + warningGroupId - + '}'; - } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java index 70a324f298..bef2630d8f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java @@ -21,9 +21,9 @@ import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.util.Date; -import java.util.Objects; import lombok.Data; +import lombok.NoArgsConstructor; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; @@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @Data +@NoArgsConstructor @TableName("t_ds_process_task_relation") public class ProcessTaskRelation { @@ -103,9 +104,6 @@ public class ProcessTaskRelation { */ private Date updateTime; - public ProcessTaskRelation() { - } - public ProcessTaskRelation(String name, int processDefinitionVersion, long projectCode, @@ -132,28 +130,4 @@ public class ProcessTaskRelation { this.updateTime = updateTime; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ProcessTaskRelation that = (ProcessTaskRelation) o; - return processDefinitionVersion == that.processDefinitionVersion - && projectCode == that.projectCode - && processDefinitionCode == that.processDefinitionCode - && preTaskCode == that.preTaskCode - && preTaskVersion == that.preTaskVersion - && postTaskCode == that.postTaskCode - && postTaskVersion == that.postTaskVersion - && Objects.equals(name, that.name); - } - - @Override - public int hashCode() { - return Objects.hash(name, processDefinitionVersion, projectCode, processDefinitionCode, preTaskCode, - preTaskVersion, postTaskCode, postTaskVersion); - } } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index 014a2a2c3c..0c4702517e 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -25,14 +25,13 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import java.util.Collections; import java.util.Date; import java.util.List; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -50,15 +49,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest { @Autowired private ProcessInstanceMapper processInstanceMapper; - @Before - public void before() { - ProcessInstance processInstance = new ProcessInstance(); - processInstance.setWarningGroupId(0); - processInstance.setCommandParam(""); - processInstance.setProcessDefinitionCode(1L); - processInstanceMapper.insert(processInstance); - } - /** * insert * @@ -191,7 +181,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest { task2.setFlag(Flag.NO); taskInstanceMapper.updateById(task2); List taskInstances1 = taskInstanceMapper.findValidTaskListByProcessId(task.getProcessInstanceId(), - Flag.NO); + Flag.NO); taskInstanceMapper.deleteById(task2.getId()); taskInstanceMapper.deleteById(task.getId()); @@ -377,18 +367,17 @@ public class TaskInstanceMapperTest extends BaseDaoTest { Page page = new Page(1, 3); IPage taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging( - page, - definition.getProjectCode(), - task.getProcessInstanceId(), - "", - "", - "", - 0, - new int[0], - "", - TaskExecuteType.BATCH, - null, null - ); + page, + definition.getProjectCode(), + task.getProcessInstanceId(), + "", + "", + "", + 0, + new int[0], + "", + TaskExecuteType.BATCH, + null, null); processInstanceMapper.deleteById(processInstance.getId()); taskInstanceMapper.deleteById(task.getId()); processDefinitionMapper.deleteById(definition.getId()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index c213346451..fa6b640ddc 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,14 +17,21 @@ package org.apache.dolphinscheduler.service.process; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import io.micrometer.core.annotation.Counted; -import org.apache.commons.collections.CollectionUtils; +import static java.util.stream.Collectors.toSet; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.CommandType; @@ -130,13 +137,9 @@ import org.apache.dolphinscheduler.service.log.LogClient; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; -import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; + import java.util.ArrayList; import java.util.Arrays; import java.util.Date; @@ -149,22 +152,24 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; -import static java.util.stream.Collectors.toSet; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import io.micrometer.core.annotation.Counted; /** * process relative dao that some mappers in this. @@ -2492,20 +2497,15 @@ public class ProcessServiceImpl implements ProcessService { taskDefinitionLog.setOperator(operator.getId()); taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog)); if (taskDefinitionLog.getCode() == 0) { - try { - taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode()); - } catch (CodeGenerateException e) { - logger.error("Task code get error, ", e); - return Constants.DEFINITION_FAILURE; - } + taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode()); } if (taskDefinitionLog.getVersion() == 0) { // init first version taskDefinitionLog.setVersion(Constants.VERSION_FIRST); } - TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper - .queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion()); + TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( + taskDefinitionLog.getCode(), taskDefinitionLog.getVersion()); if (definitionCodeAndVersion == null) { taskDefinitionLog.setUserId(operator.getId()); taskDefinitionLog.setCreateTime(now); @@ -2522,39 +2522,33 @@ public class ProcessServiceImpl implements ProcessService { taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime()); updateTaskDefinitionLogs.add(taskDefinitionLog); } - int insertResult = 0; - int updateResult = 0; - if (!updateTaskDefinitionLogs.isEmpty()) { - List taskDefinitions = taskDefinitionMapper.queryByCodeList(updateTaskDefinitionLogs - .stream().map(TaskDefinition::getCode).distinct().collect(Collectors.toList())); + if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) { + List taskDefinitionCodes = updateTaskDefinitionLogs + .stream() + .map(TaskDefinition::getCode) + .distinct() + .collect(Collectors.toList()); + Map taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes) + .stream() + .collect(Collectors.toMap(TaskDefinition::getCode, Function.identity())); for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) { - TaskDefinition task = null; - for (TaskDefinition taskDefinition : taskDefinitions) { - if (taskDefinitionToUpdate.getCode() == taskDefinition.getCode()) { - task = taskDefinition; - break; - } - } + TaskDefinition task = taskDefinitionMap.get(taskDefinitionToUpdate.getCode()); if (task == null) { newTaskDefinitionLogs.add(taskDefinitionToUpdate); - } else { - insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate); - if (Boolean.TRUE.equals(syncDefine)) { - taskDefinitionToUpdate.setId(task.getId()); - updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate); - } else { - updateResult++; - } } } } - if (!newTaskDefinitionLogs.isEmpty()) { - insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); - if (Boolean.TRUE.equals(syncDefine)) { - updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); - } else { - updateResult += newTaskDefinitionLogs.size(); - } + + // for each taskDefinitionLog, we will insert a new version into db + // and update the origin one if exist + int updateResult = updateTaskDefinitionLogs.size(); + int insertResult = newTaskDefinitionLogs.size(); + if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { + insertResult = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs); + } + + if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) { + updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); } return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS; } @@ -2575,10 +2569,8 @@ public class ProcessServiceImpl implements ProcessService { : ReleaseState.OFFLINE); processDefinitionLog.setOperator(operator.getId()); processDefinitionLog.setOperateTime(processDefinition.getUpdateTime()); - int insertLog = processDefineLogMapper.updateById(processDefinitionLog); - if (insertLog == 0) { - insertLog = processDefineLogMapper.insert(processDefinitionLog); - } + processDefinitionLog.setId(null); + int insertLog = processDefineLogMapper.insert(processDefinitionLog); int result = 1; if (Boolean.TRUE.equals(syncDefine)) { if (processDefinition.getId() == null) { @@ -2605,7 +2597,8 @@ public class ProcessServiceImpl implements ProcessService { } Map taskDefinitionLogMap = null; if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) { - taskDefinitionLogMap = taskDefinitionLogs.stream() + taskDefinitionLogMap = taskDefinitionLogs + .stream() .collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog)); } Date now = new Date();