Browse Source

[Feature][JsonSplit] Fix save process bug (#5001)

* modify checkDAGRing and ProcessService method

* merge

* modify dagRing

* modify process instance for project home page

* fix save process bug

* codeStyle

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 3 years ago committed by GitHub
parent
commit
7f0678d3be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  2. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  3. 39
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java
  5. 31
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  6. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
  7. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  8. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  9. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  10. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java
  11. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -285,7 +285,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinition.setFailRetryTimes(taskDefinitionLog.getFailRetryTimes()); taskDefinition.setFailRetryTimes(taskDefinitionLog.getFailRetryTimes());
taskDefinition.setFailRetryInterval(taskDefinitionLog.getFailRetryInterval()); taskDefinition.setFailRetryInterval(taskDefinitionLog.getFailRetryInterval());
taskDefinition.setTimeoutFlag(taskDefinitionLog.getTimeoutFlag()); taskDefinition.setTimeoutFlag(taskDefinitionLog.getTimeoutFlag());
taskDefinition.setTaskTimeoutStrategy(taskDefinitionLog.getTaskTimeoutStrategy()); taskDefinition.setTimeoutNotifyStrategy(taskDefinitionLog.getTimeoutNotifyStrategy());
taskDefinition.setTimeout(taskDefinitionLog.getTimeout()); taskDefinition.setTimeout(taskDefinitionLog.getTimeout());
taskDefinition.setUpdateTime(new Date()); taskDefinition.setUpdateTime(new Date());
taskDefinition.setResourceIds(taskDefinitionLog.getResourceIds()); taskDefinition.setResourceIds(taskDefinitionLog.getResourceIds());

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -87,6 +87,7 @@ public enum TaskType {
} }
public static TaskType of(String descp) { public static TaskType of(String descp) {
descp = descp.toLowerCase();
if (TASK_TYPE_MAP.containsKey(descp)) { if (TASK_TYPE_MAP.containsKey(descp)) {
return TASK_TYPE_MAP.get(descp); return TASK_TYPE_MAP.get(descp);
} }

39
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -95,18 +95,6 @@ public class ProcessTaskRelation {
*/ */
private String conditionParams; private String conditionParams;
/**
* condition parameter list
*/
@TableField(exist = false)
private List<Property> conditionParamList;
/**
* condition parameter map
*/
@TableField(exist = false)
private Map<String, String> conditionParamMap;
/** /**
* create time * create time
*/ */
@ -181,36 +169,9 @@ public class ProcessTaskRelation {
} }
public void setConditionParams(String conditionParams) { public void setConditionParams(String conditionParams) {
if (conditionParams == null) {
this.conditionParamList = new ArrayList<>();
} else {
this.conditionParamList = JSONUtils.toList(conditionParams, Property.class);
}
this.conditionParams = conditionParams; this.conditionParams = conditionParams;
} }
public List<Property> getConditionParamList() {
return conditionParamList;
}
public void setConditionParamList(List<Property> conditionParamList) {
this.conditionParams = JSONUtils.toJsonString(conditionParamList);
this.conditionParamList = conditionParamList;
}
public Map<String, String> getConditionParamMap() {
if (conditionParamMap == null && StringUtils.isNotEmpty(conditionParams)) {
List<Property> propList = JSONUtils.toList(conditionParams, Property.class);
conditionParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
}
return conditionParamMap;
}
public void setConditionParamMap(Map<String, String> conditionParamMap) {
this.conditionParamMap = conditionParamMap;
}
public int getProcessDefinitionVersion() { public int getProcessDefinitionVersion() {
return processDefinitionVersion; return processDefinitionVersion;
} }

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelationLog.java

@ -56,8 +56,6 @@ public class ProcessTaskRelationLog extends ProcessTaskRelation {
this.setPostTaskVersion(processTaskRelation.getPostTaskVersion()); this.setPostTaskVersion(processTaskRelation.getPostTaskVersion());
this.setConditionType(processTaskRelation.getConditionType()); this.setConditionType(processTaskRelation.getConditionType());
this.setConditionParams(processTaskRelation.getConditionParams()); this.setConditionParams(processTaskRelation.getConditionParams());
this.setConditionParamList(processTaskRelation.getConditionParamList());
this.setConditionParamMap(processTaskRelation.getConditionParamMap());
this.setCreateTime(processTaskRelation.getCreateTime()); this.setCreateTime(processTaskRelation.getCreateTime());
this.setUpdateTime(processTaskRelation.getUpdateTime()); this.setUpdateTime(processTaskRelation.getUpdateTime());
} }

31
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -146,7 +146,7 @@ public class TaskDefinition {
/** /**
* timeout notify strategy * timeout notify strategy
*/ */
private TaskTimeoutStrategy taskTimeoutStrategy; private TaskTimeoutStrategy timeoutNotifyStrategy;
/** /**
* task warning time out. unit: minute * task warning time out. unit: minute
@ -247,36 +247,25 @@ public class TaskDefinition {
} }
public void setTaskParams(String taskParams) { public void setTaskParams(String taskParams) {
if (taskParams == null) {
this.taskParamList = new ArrayList<>();
} else {
this.taskParamList = JSONUtils.toList(taskParams, Property.class);
}
this.taskParams = taskParams; this.taskParams = taskParams;
} }
public List<Property> getTaskParamList() { public List<Property> getTaskParamList() {
return taskParamList; List<Property> propList = JSONUtils.toList(JSONUtils.parseObject(taskParams).findValue("localParams").toString(),
} Property.class);
return taskParamList = propList;
public void setTaskParamList(List<Property> taskParamList) {
this.taskParams = JSONUtils.toJsonString(taskParamList);
this.taskParamList = taskParamList;
} }
public Map<String, String> getTaskParamMap() { public Map<String, String> getTaskParamMap() {
if (taskParamMap == null && StringUtils.isNotEmpty(taskParams)) { if (taskParamMap == null && StringUtils.isNotEmpty(taskParams)) {
List<Property> propList = JSONUtils.toList(taskParams, Property.class); List<Property> propList = JSONUtils.toList(JSONUtils.parseObject(taskParams).findValue("localParams").toString(),
Property.class);
taskParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); taskParamMap = propList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
} }
return taskParamMap; return taskParamMap;
} }
public void setTaskParamMap(Map<String, String> taskParamMap) {
this.taskParamMap = taskParamMap;
}
public int getTimeout() { public int getTimeout() {
return timeout; return timeout;
} }
@ -357,12 +346,12 @@ public class TaskDefinition {
this.failRetryInterval = failRetryInterval; this.failRetryInterval = failRetryInterval;
} }
public TaskTimeoutStrategy getTaskTimeoutStrategy() { public TaskTimeoutStrategy getTimeoutNotifyStrategy() {
return taskTimeoutStrategy; return timeoutNotifyStrategy;
} }
public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) { public void setTimeoutNotifyStrategy(TaskTimeoutStrategy timeoutNotifyStrategy) {
this.taskTimeoutStrategy = taskTimeoutStrategy; this.timeoutNotifyStrategy = timeoutNotifyStrategy;
} }
public TimeoutFlag getTimeoutFlag() { public TimeoutFlag getTimeoutFlag() {

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -57,10 +57,8 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setProjectName(taskDefinition.getProjectName()); this.setProjectName(taskDefinition.getProjectName());
this.setResourceIds(taskDefinition.getResourceIds()); this.setResourceIds(taskDefinition.getResourceIds());
this.setTaskParams(taskDefinition.getTaskParams()); this.setTaskParams(taskDefinition.getTaskParams());
this.setTaskParamList(taskDefinition.getTaskParamList());
this.setTaskParamMap(taskDefinition.getTaskParamMap());
this.setTaskPriority(taskDefinition.getTaskPriority()); this.setTaskPriority(taskDefinition.getTaskPriority());
this.setTaskTimeoutStrategy(taskDefinition.getTaskTimeoutStrategy()); this.setTimeoutNotifyStrategy(taskDefinition.getTimeoutNotifyStrategy());
this.setTaskType(taskDefinition.getTaskType()); this.setTaskType(taskDefinition.getTaskType());
this.setTimeout(taskDefinition.getTimeout()); this.setTimeout(taskDefinition.getTimeout());
this.setTimeoutFlag(taskDefinition.getTimeoutFlag()); this.setTimeoutFlag(taskDefinition.getTimeoutFlag());

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -53,7 +53,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
/** /**
* query max version for definition * query max version for definition
*/ */
int queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode); Integer queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode);
/** /**
* query the certain process definition version info by process definition code and version number * query the certain process definition version info by process definition code and version number

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -54,7 +54,6 @@
select max(version) select max(version)
from t_ds_process_definition_log from t_ds_process_definition_log
where code = #{processDefinitionCode} where code = #{processDefinitionCode}
</select> </select>
<select id="queryProcessDefinitionVersionsPaging" <select id="queryProcessDefinitionVersionsPaging"

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -27,7 +27,7 @@
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_task_definition from t_ds_task_definition
WHERE projectCode = #{projectCode} WHERE project_code = #{projectCode}
and `name` = #{taskDefinitionName} and `name` = #{taskDefinitionName}
</select> </select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java

@ -151,8 +151,8 @@ public class ProcessDefinitionLogMapperTest {
ProcessDefinitionLog processDefinitionLog = insertOne(); ProcessDefinitionLog processDefinitionLog = insertOne();
ProcessDefinitionLog processDefinitionLog1 = insertTwo(); ProcessDefinitionLog processDefinitionLog1 = insertTwo();
int version = processDefinitionLogMapper.queryMaxVersionForDefinition(1L); Integer version = processDefinitionLogMapper.queryMaxVersionForDefinition(1L);
Assert.assertEquals(2, version); Assert.assertEquals(2, version == null ? 1 : version);
} }
@Test @Test

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2193,7 +2193,7 @@ public class ProcessService {
taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes()); taskDefinition.setFailRetryTimes(taskNode.getMaxRetryTimes());
taskDefinition.setFailRetryInterval(taskNode.getRetryInterval()); taskDefinition.setFailRetryInterval(taskNode.getRetryInterval());
taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); taskDefinition.setTimeoutFlag(taskNode.getTaskTimeoutParameter().getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinition.setTaskTimeoutStrategy(taskNode.getTaskTimeoutParameter().getStrategy()); taskDefinition.setTimeoutNotifyStrategy(taskNode.getTaskTimeoutParameter().getStrategy());
taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval()); taskDefinition.setTimeout(taskNode.getTaskTimeoutParameter().getInterval());
taskDefinition.setResourceIds(getResourceIds(taskDefinition)); taskDefinition.setResourceIds(getResourceIds(taskDefinition));
} }
@ -2207,7 +2207,7 @@ public class ProcessService {
public String getResourceIds(TaskDefinition taskDefinition) { public String getResourceIds(TaskDefinition taskDefinition) {
Set<Integer> resourceIds = null; Set<Integer> resourceIds = null;
// TODO modify taskDefinition.getTaskType() // TODO modify taskDefinition.getTaskType()
AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp(), taskDefinition.getTaskParams()); AbstractParameters params = TaskParametersUtils.getParameters(taskDefinition.getTaskType().getDescp().toUpperCase(), taskDefinition.getTaskParams());
if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) { if (params != null && CollectionUtils.isNotEmpty(params.getResourceFilesList())) {
resourceIds = params.getResourceFilesList(). resourceIds = params.getResourceFilesList().
@ -2240,9 +2240,9 @@ public class ProcessService {
ProcessData processData, Project project, ProcessData processData, Project project,
String desc, String locations, String connects) { String desc, String locations, String connects) {
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(); ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog();
int version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionLog.getCode()); Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode);
processDefinitionLog.setCode(processDefinitionCode); processDefinitionLog.setCode(processDefinitionCode);
processDefinitionLog.setVersion(version); processDefinitionLog.setVersion(version == null || version == 0 ? 1 : version);
processDefinitionLog.setName(processDefinitionName); processDefinitionLog.setName(processDefinitionName);
processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
processDefinitionLog.setProjectCode(project.getCode()); processDefinitionLog.setProjectCode(project.getCode());
@ -2252,9 +2252,10 @@ public class ProcessService {
processDefinitionLog.setTimeout(processData.getTimeout()); processDefinitionLog.setTimeout(processData.getTimeout());
processDefinitionLog.setTenantId(processData.getTenantId()); processDefinitionLog.setTenantId(processData.getTenantId());
processDefinitionLog.setOperator(operator.getId()); processDefinitionLog.setOperator(operator.getId());
Date updateTime = new Date(); Date now = new Date();
processDefinitionLog.setOperateTime(updateTime); processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(updateTime); processDefinitionLog.setUpdateTime(now);
processDefinitionLog.setCreateTime(now);
//custom global params //custom global params
List<Property> globalParamsList = new ArrayList<>(); List<Property> globalParamsList = new ArrayList<>();
@ -2443,7 +2444,7 @@ public class ProcessService {
v.setCode(taskDefinitionLog.getCode()); v.setCode(taskDefinitionLog.getCode());
v.setName(taskDefinitionLog.getName()); v.setName(taskDefinitionLog.getName());
v.setDesc(taskDefinitionLog.getDescription()); v.setDesc(taskDefinitionLog.getDescription());
v.setType(taskDefinitionLog.getTaskType().getDescp()); v.setType(taskDefinitionLog.getTaskType().getDescp().toUpperCase());
v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : "NORMAL"); v.setRunFlag(taskDefinitionLog.getFlag() == Flag.YES ? Constants.FLOWNODE_RUN_FLAG_FORBIDDEN : "NORMAL");
v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes()); v.setMaxRetryTimes(taskDefinitionLog.getFailRetryTimes());
v.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); v.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
@ -2451,7 +2452,7 @@ public class ProcessService {
v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority()); v.setTaskInstancePriority(taskDefinitionLog.getTaskPriority());
v.setWorkerGroup(taskDefinitionLog.getWorkerGroup()); v.setWorkerGroup(taskDefinitionLog.getWorkerGroup());
v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN, v.setTimeout(JSONUtils.toJsonString(new TaskTimeoutParameter(taskDefinitionLog.getTimeoutFlag() == TimeoutFlag.OPEN,
taskDefinitionLog.getTaskTimeoutStrategy(), taskDefinitionLog.getTimeoutNotifyStrategy(),
taskDefinitionLog.getTimeout()))); taskDefinitionLog.getTimeout())));
// TODO name will be remove // TODO name will be remove
v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName())); v.getPreTaskNodeList().forEach(task -> task.setName(taskDefinitionLogMap.get(task.getCode()).getName()));

Loading…
Cancel
Save