Browse Source

Fix cannot save processDefinition (#11931)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
d833a28b2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 120
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 7
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java
  4. 132
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  5. 250
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  6. 30
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  7. 15
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  8. 123
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

120
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -261,43 +261,24 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
if (checkDescriptionLength(description)) {
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
throw new ServiceException(Status.DESCRIPTION_TOO_LONG_ERROR);
}
// check whether the new process define name exist
ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name);
if (definition != null) {
putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name);
return result;
}
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
return checkTaskDefinitions;
}
List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson =
checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
throw new ServiceException(Status.PROCESS_DEFINITION_NAME_EXIST, name);
}
List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
if (tenant == null) {
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
throw new ServiceException(Status.TENANT_NOT_EXIST);
}
tenantId = tenant.getId();
}
long processDefinitionCode;
try {
processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
} catch (CodeGenerateException e) {
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
return result;
}
long processDefinitionCode = CodeGenerateUtils.getInstance().genCode();
ProcessDefinition processDefinition =
new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenantId);
@ -317,66 +298,63 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, Boolean.TRUE, Boolean.TRUE);
if (insertVersion == 0) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(),
insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
saveOtherRelation(loginUser, processDefinition, result, otherParamsJson);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
return result;
}
private Map<String, Object> checkTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionLogs,
String taskDefinitionJson) {
Map<String, Object> result = new HashMap<>();
private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinitionJson) {
try {
if (taskDefinitionLogs.isEmpty()) {
logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
logger.error("Generate task definition list failed, the given taskDefinitionJson is invalided: {}",
taskDefinitionJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson);
}
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (!taskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
logger.error(
"Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}",
taskDefinitionLog.getName(), taskDefinitionLog);
throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
}
}
putMsg(result, Status.SUCCESS);
return taskDefinitionLogs;
} catch (ServiceException ex) {
throw ex;
} catch (Exception e) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, e.getMessage());
logger.error("Generate task definition list failed, meet an unknown exception", e);
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR);
}
return result;
}
private Map<String, Object> checkTaskRelationList(List<ProcessTaskRelationLog> taskRelationList,
String taskRelationJson,
private List<ProcessTaskRelationLog> generateTaskRelationList(String taskRelationJson,
List<TaskDefinitionLog> taskDefinitionLogs) {
Map<String, Object> result = new HashMap<>();
try {
if (taskRelationList == null || taskRelationList.isEmpty()) {
logger.error("task relation list is null");
putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson);
return result;
List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
if (CollectionUtils.isEmpty(taskRelationList)) {
logger.error("Generate task relation list failed the taskRelation list is empty, taskRelationJson: {}",
taskRelationJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
List<ProcessTaskRelation> processTaskRelations = taskRelationList.stream()
.map(processTaskRelationLog -> JSONUtils.parseObject(JSONUtils.toJsonString(processTaskRelationLog),
@ -390,31 +368,29 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
if (CollectionUtils.isNotEmpty(codes)) {
logger.error("the task code is not exist");
putMsg(result, Status.TASK_DEFINE_NOT_EXIST,
StringUtils.join(codes, Constants.COMMA));
return result;
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA));
}
}
if (graphHasCycle(taskNodeList)) {
logger.error("process DAG has cycle");
putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
return result;
throw new ServiceException(Status.PROCESS_NODE_HAS_CYCLE);
}
// check whether the task relation json is normal
for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) {
if (processTaskRelationLog.getPostTaskCode() == 0) {
logger.error("the post_task_code or post_task_version can't be zero");
putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR);
return result;
throw new ServiceException(Status.CHECK_PROCESS_TASK_RELATION_ERROR);
}
}
putMsg(result, Status.SUCCESS);
return taskRelationList;
} catch (ServiceException ex) {
throw ex;
} catch (Exception e) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, e.getMessage());
logger.error("Check task relation list error, meet an unknown exception, given taskRelationJson: {}",
taskRelationJson, e);
throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR);
}
return result;
}
/**
@ -620,18 +596,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
return checkTaskDefinitions;
}
List<ProcessTaskRelationLog> taskRelationList =
JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson =
checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson;
}
List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
List<ProcessTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) {

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -96,9 +96,6 @@ import org.springframework.mock.web.MockMultipartFile;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
/**
* process definition service test
*/
@RunWith(MockitoJUnitRunner.class)
public class ProcessDefinitionServiceTest {
@ -756,10 +753,13 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_UPDATE))
.thenReturn(result);
Map<String, Object> updateResult =
try {
processDefinitionService.updateProcessDefinition(loginUser, projectCode, "test", 1,
"", "", "", 0, "root", null, "", null, ProcessExecutionTypeEnum.PARALLEL);
Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS));
Assert.fail();
} catch (ServiceException ex) {
Assert.assertEquals(Status.DATA_IS_NOT_VALID.getCode(), ex.getCode());
}
}
@Test

7
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CodeGenerateUtils.java

@ -10,6 +10,7 @@ import java.util.Objects;
* Rewriting based on Twitter snowflake algorithm
*/
public class CodeGenerateUtils {
// start timestamp
private static final long START_TIMESTAMP = 1609430400000L; // 2021-01-01 00:00:00
// Each machine generates 32 in the same millisecond
@ -28,7 +29,8 @@ public class CodeGenerateUtils {
private CodeGenerateUtils() throws CodeGenerateException {
try {
this.machineHash = Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
this.machineHash =
Math.abs(Objects.hash(InetAddress.getLocalHost().getHostName())) % (2 << (MIDDLE_BIT - 1));
} catch (UnknownHostException e) {
throw new CodeGenerateException(e.getMessage());
}
@ -66,7 +68,8 @@ public class CodeGenerateUtils {
return SYSTEM_TIMESTAMP + (System.nanoTime() - SYSTEM_NANOTIME) / 1000000;
}
public static class CodeGenerateException extends Exception {
public static class CodeGenerateException extends RuntimeException {
public CodeGenerateException(String message) {
super(message);
}

132
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -36,27 +36,15 @@ import com.baomidou.mybatisplus.annotation.TableName;
@TableName("t_ds_command")
public class Command {
/**
* id
*/
@TableId(value = "id", type = IdType.AUTO)
private Integer id;
/**
* command type
*/
@TableField("command_type")
private CommandType commandType;
/**
* process definition code
*/
@TableField("process_definition_code")
private long processDefinitionCode;
/**
* executor id
*/
@TableField("executor_id")
private int executorId;
@ -66,69 +54,36 @@ public class Command {
@TableField("command_param")
private String commandParam;
/**
* task depend type
*/
@TableField("task_depend_type")
private TaskDependType taskDependType;
/**
* failure strategy
*/
@TableField("failure_strategy")
private FailureStrategy failureStrategy;
/**
* warning type
*/
@TableField("warning_type")
private WarningType warningType;
/**
* warning group id
*/
@TableField("warning_group_id")
private Integer warningGroupId;
/**
* schedule time
*/
@TableField("schedule_time")
private Date scheduleTime;
/**
* start time
*/
@TableField("start_time")
private Date startTime;
/**
* process instance priority
*/
@TableField("process_instance_priority")
private Priority processInstancePriority;
/**
* update time
*/
@TableField("update_time")
private Date updateTime;
/**
* worker group
*/
@TableField("worker_group")
private String workerGroup;
/**
* environment code
*/
@TableField("environment_code")
private Long environmentCode;
/**
* dry run flag
*/
@TableField("dry_run")
private int dryRun;
@ -180,91 +135,4 @@ public class Command {
this.processDefinitionVersion = processDefinitionVersion;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Command command = (Command) o;
if (id != command.id) {
return false;
}
if (processDefinitionCode != command.processDefinitionCode) {
return false;
}
if (executorId != command.executorId) {
return false;
}
if (workerGroup != null ? workerGroup.equals(command.workerGroup) : command.workerGroup == null) {
return false;
}
if (environmentCode != null ? environmentCode.equals(command.environmentCode)
: command.environmentCode == null) {
return false;
}
if (commandType != command.commandType) {
return false;
}
if (commandParam != null ? !commandParam.equals(command.commandParam) : command.commandParam != null) {
return false;
}
if (taskDependType != command.taskDependType) {
return false;
}
if (failureStrategy != command.failureStrategy) {
return false;
}
if (warningType != command.warningType) {
return false;
}
if (warningGroupId != null ? !warningGroupId.equals(command.warningGroupId) : command.warningGroupId != null) {
return false;
}
if (scheduleTime != null ? !scheduleTime.equals(command.scheduleTime) : command.scheduleTime != null) {
return false;
}
if (startTime != null ? !startTime.equals(command.startTime) : command.startTime != null) {
return false;
}
if (processInstancePriority != command.processInstancePriority) {
return false;
}
if (processInstanceId != command.processInstanceId) {
return false;
}
if (processDefinitionVersion != command.getProcessDefinitionVersion()) {
return false;
}
return !(updateTime != null ? !updateTime.equals(command.updateTime) : command.updateTime != null);
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (commandType != null ? commandType.hashCode() : 0);
result = 31 * result + Long.hashCode(processDefinitionCode);
result = 31 * result + executorId;
result = 31 * result + (commandParam != null ? commandParam.hashCode() : 0);
result = 31 * result + (taskDependType != null ? taskDependType.hashCode() : 0);
result = 31 * result + (failureStrategy != null ? failureStrategy.hashCode() : 0);
result = 31 * result + (warningType != null ? warningType.hashCode() : 0);
result = 31 * result + (warningGroupId != null ? warningGroupId.hashCode() : 0);
result = 31 * result + (scheduleTime != null ? scheduleTime.hashCode() : 0);
result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
result = 31 * result + (processInstancePriority != null ? processInstancePriority.hashCode() : 0);
result = 31 * result + (updateTime != null ? updateTime.hashCode() : 0);
result = 31 * result + (workerGroup != null ? workerGroup.hashCode() : 0);
result = 31 * result + (environmentCode != null ? environmentCode.hashCode() : 0);
result = 31 * result + dryRun;
result = 31 * result + processInstanceId;
result = 31 * result + processDefinitionVersion;
return result;
}
}

250
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -27,18 +27,23 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.google.common.base.Strings;
/**
* process definition
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("t_ds_process_definition")
public class ProcessDefinition {
@ -171,9 +176,6 @@ public class ProcessDefinition {
*/
private ProcessExecutionTypeEnum executionType;
public ProcessDefinition() {
}
public ProcessDefinition(long projectCode,
String name,
long code,
@ -208,90 +210,6 @@ public class ProcessDefinition {
this.flag = Flag.YES;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public ReleaseState getReleaseState() {
return releaseState;
}
public void setReleaseState(ReleaseState releaseState) {
this.releaseState = releaseState;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public Flag getFlag() {
return flag;
}
public void setFlag(Flag flag) {
this.flag = flag;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getProjectName() {
return projectName;
}
public void setProjectName(String projectName) {
this.projectName = projectName;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParamList = JSONUtils.toList(globalParams, Property.class);
if (this.globalParamList == null) {
@ -300,14 +218,6 @@ public class ProcessDefinition {
this.globalParams = globalParams;
}
public List<Property> getGlobalParamList() {
return globalParamList;
}
public void setGlobalParamList(List<Property> globalParamList) {
this.globalParamList = globalParamList;
}
public Map<String, String> getGlobalParamMap() {
if (globalParamMap == null && !Strings.isNullOrEmpty(globalParams)) {
List<Property> propList = JSONUtils.toList(globalParams, Property.class);
@ -317,146 +227,4 @@ public class ProcessDefinition {
return globalParamMap;
}
public void setGlobalParamMap(Map<String, String> globalParamMap) {
this.globalParamMap = globalParamMap;
}
public String getLocations() {
return locations;
}
public void setLocations(String locations) {
this.locations = locations;
}
public ReleaseState getScheduleReleaseState() {
return scheduleReleaseState;
}
public void setScheduleReleaseState(ReleaseState scheduleReleaseState) {
this.scheduleReleaseState = scheduleReleaseState;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTenantId() {
return tenantId;
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getModifyBy() {
return modifyBy;
}
public void setModifyBy(String modifyBy) {
this.modifyBy = modifyBy;
}
public long getCode() {
return code;
}
public void setCode(long code) {
this.code = code;
}
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
public int getWarningGroupId() {
return warningGroupId;
}
public void setWarningGroupId(int warningGroupId) {
this.warningGroupId = warningGroupId;
}
public ProcessExecutionTypeEnum getExecutionType() {
return executionType;
}
public void setExecutionType(ProcessExecutionTypeEnum executionType) {
this.executionType = executionType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcessDefinition that = (ProcessDefinition) o;
return projectCode == that.projectCode
&& userId == that.userId
&& timeout == that.timeout
&& tenantId == that.tenantId
&& Objects.equals(name, that.name)
&& releaseState == that.releaseState
&& Objects.equals(description, that.description)
&& Objects.equals(globalParams, that.globalParams)
&& flag == that.flag
&& executionType == that.executionType
&& Objects.equals(locations, that.locations);
}
@Override
public String toString() {
return "ProcessDefinition{"
+ "id=" + id
+ ", code=" + code
+ ", name='" + name + '\''
+ ", version=" + version
+ ", releaseState=" + releaseState
+ ", projectCode=" + projectCode
+ ", description='" + description + '\''
+ ", globalParams='" + globalParams + '\''
+ ", globalParamList=" + globalParamList
+ ", globalParamMap=" + globalParamMap
+ ", createTime=" + createTime
+ ", updateTime=" + updateTime
+ ", flag=" + flag
+ ", userId=" + userId
+ ", userName='" + userName + '\''
+ ", projectName='" + projectName + '\''
+ ", locations='" + locations + '\''
+ ", scheduleReleaseState=" + scheduleReleaseState
+ ", timeout=" + timeout
+ ", tenantId=" + tenantId
+ ", tenantCode='" + tenantCode + '\''
+ ", modifyBy='" + modifyBy + '\''
+ ", warningGroupId=" + warningGroupId
+ '}';
}
}

30
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -21,9 +21,9 @@ import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.util.Date;
import java.util.Objects;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
@Data
@NoArgsConstructor
@TableName("t_ds_process_task_relation")
public class ProcessTaskRelation {
@ -103,9 +104,6 @@ public class ProcessTaskRelation {
*/
private Date updateTime;
public ProcessTaskRelation() {
}
public ProcessTaskRelation(String name,
int processDefinitionVersion,
long projectCode,
@ -132,28 +130,4 @@ public class ProcessTaskRelation {
this.updateTime = updateTime;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ProcessTaskRelation that = (ProcessTaskRelation) o;
return processDefinitionVersion == that.processDefinitionVersion
&& projectCode == that.projectCode
&& processDefinitionCode == that.processDefinitionCode
&& preTaskCode == that.preTaskCode
&& preTaskVersion == that.preTaskVersion
&& postTaskCode == that.postTaskCode
&& postTaskVersion == that.postTaskVersion
&& Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(name, processDefinitionVersion, projectCode, processDefinitionCode, preTaskCode,
preTaskVersion, postTaskCode, postTaskVersion);
}
}

15
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -25,14 +25,13 @@ import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
@ -50,15 +49,6 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
@Autowired
private ProcessInstanceMapper processInstanceMapper;
@Before
public void before() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setWarningGroupId(0);
processInstance.setCommandParam("");
processInstance.setProcessDefinitionCode(1L);
processInstanceMapper.insert(processInstance);
}
/**
* insert
*
@ -387,8 +377,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
new int[0],
"",
TaskExecuteType.BATCH,
null, null
);
null, null);
processInstanceMapper.deleteById(processInstance.getId());
taskInstanceMapper.deleteById(task.getId());
processDefinitionMapper.deleteById(definition.getId());

123
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,14 +17,21 @@
package org.apache.dolphinscheduler.service.process;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
import org.apache.commons.collections.CollectionUtils;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@ -130,13 +137,9 @@ import org.apache.dolphinscheduler.service.log.LogClient;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@ -149,22 +152,24 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
/**
* process relative dao that some mappers in this.
@ -2492,20 +2497,15 @@ public class ProcessServiceImpl implements ProcessService {
taskDefinitionLog.setOperator(operator.getId());
taskDefinitionLog.setResourceIds(getResourceIds(taskDefinitionLog));
if (taskDefinitionLog.getCode() == 0) {
try {
taskDefinitionLog.setCode(CodeGenerateUtils.getInstance().genCode());
} catch (CodeGenerateException e) {
logger.error("Task code get error, ", e);
return Constants.DEFINITION_FAILURE;
}
}
if (taskDefinitionLog.getVersion() == 0) {
// init first version
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
}
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper
.queryByDefinitionCodeAndVersion(taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
TaskDefinitionLog definitionCodeAndVersion = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskDefinitionLog.getCode(), taskDefinitionLog.getVersion());
if (definitionCodeAndVersion == null) {
taskDefinitionLog.setUserId(operator.getId());
taskDefinitionLog.setCreateTime(now);
@ -2522,39 +2522,33 @@ public class ProcessServiceImpl implements ProcessService {
taskDefinitionLog.setCreateTime(definitionCodeAndVersion.getCreateTime());
updateTaskDefinitionLogs.add(taskDefinitionLog);
}
int insertResult = 0;
int updateResult = 0;
if (!updateTaskDefinitionLogs.isEmpty()) {
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryByCodeList(updateTaskDefinitionLogs
.stream().map(TaskDefinition::getCode).distinct().collect(Collectors.toList()));
if (CollectionUtils.isNotEmpty(updateTaskDefinitionLogs)) {
List<Long> taskDefinitionCodes = updateTaskDefinitionLogs
.stream()
.map(TaskDefinition::getCode)
.distinct()
.collect(Collectors.toList());
Map<Long, TaskDefinition> taskDefinitionMap = taskDefinitionMapper.queryByCodeList(taskDefinitionCodes)
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, Function.identity()));
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = null;
for (TaskDefinition taskDefinition : taskDefinitions) {
if (taskDefinitionToUpdate.getCode() == taskDefinition.getCode()) {
task = taskDefinition;
break;
}
}
TaskDefinition task = taskDefinitionMap.get(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
insertResult += taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if (Boolean.TRUE.equals(syncDefine)) {
taskDefinitionToUpdate.setId(task.getId());
updateResult += taskDefinitionMapper.updateById(taskDefinitionToUpdate);
} else {
updateResult++;
}
}
}
// for each taskDefinitionLog, we will insert a new version into db
// and update the origin one if exist
int updateResult = updateTaskDefinitionLogs.size();
int insertResult = newTaskDefinitionLogs.size();
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
insertResult = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
}
if (!newTaskDefinitionLogs.isEmpty()) {
insertResult += taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
if (Boolean.TRUE.equals(syncDefine)) {
if (CollectionUtils.isNotEmpty(newTaskDefinitionLogs) && Boolean.TRUE.equals(syncDefine)) {
updateResult += taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
} else {
updateResult += newTaskDefinitionLogs.size();
}
}
return (insertResult & updateResult) > 0 ? 1 : Constants.EXIT_CODE_SUCCESS;
}
@ -2575,10 +2569,8 @@ public class ProcessServiceImpl implements ProcessService {
: ReleaseState.OFFLINE);
processDefinitionLog.setOperator(operator.getId());
processDefinitionLog.setOperateTime(processDefinition.getUpdateTime());
int insertLog = processDefineLogMapper.updateById(processDefinitionLog);
if (insertLog == 0) {
insertLog = processDefineLogMapper.insert(processDefinitionLog);
}
processDefinitionLog.setId(null);
int insertLog = processDefineLogMapper.insert(processDefinitionLog);
int result = 1;
if (Boolean.TRUE.equals(syncDefine)) {
if (processDefinition.getId() == null) {
@ -2605,7 +2597,8 @@ public class ProcessServiceImpl implements ProcessService {
}
Map<Long, TaskDefinitionLog> taskDefinitionLogMap = null;
if (CollectionUtils.isNotEmpty(taskDefinitionLogs)) {
taskDefinitionLogMap = taskDefinitionLogs.stream()
taskDefinitionLogMap = taskDefinitionLogs
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, taskDefinitionLog -> taskDefinitionLog));
}
Date now = new Date();

Loading…
Cancel
Save