Browse Source

[Feature][JsonSplit] Fix logical bug in saving process definition (#5009)

* modify checkDAGRing and ProcessService method

* modify process instance for project home page

* fix save process bug

* Fix logical bug in saving process definition

Co-authored-by: JinyLeeChina <297062848@qq.com>
pull/3/MERGE
JinyLeeChina 4 years ago committed by GitHub
parent
commit
c010ac1f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java
  3. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  4. 5
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  5. 50
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -190,6 +190,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
try { try {
long processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); long processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
processDefinition.setCode(processDefinitionCode); processDefinition.setCode(processDefinitionCode);
processDefinition.setVersion(1);
} catch (SnowFlakeException e) { } catch (SnowFlakeException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION); putMsg(result, Status.CREATE_PROCESS_DEFINITION);
return result; return result;

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessTaskRelation.java

@ -115,7 +115,9 @@ public class ProcessTaskRelation {
long projectCode, long projectCode,
long processDefinitionCode, long processDefinitionCode,
long preTaskCode, long preTaskCode,
int preTaskVersion,
long postTaskCode, long postTaskCode,
int postTaskVersion,
ConditionType conditionType, ConditionType conditionType,
String conditionParams, String conditionParams,
Date createTime, Date createTime,
@ -125,7 +127,9 @@ public class ProcessTaskRelation {
this.projectCode = projectCode; this.projectCode = projectCode;
this.processDefinitionCode = processDefinitionCode; this.processDefinitionCode = processDefinitionCode;
this.preTaskCode = preTaskCode; this.preTaskCode = preTaskCode;
this.preTaskVersion = preTaskVersion;
this.postTaskCode = postTaskCode; this.postTaskCode = postTaskCode;
this.postTaskVersion = postTaskVersion;
this.conditionType = conditionType; this.conditionType = conditionType;
this.conditionParams = conditionParams; this.conditionParams = conditionParams;
this.createTime = createTime; this.createTime = createTime;

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -43,12 +43,11 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
@Param("taskDefinitionName") String name); @Param("taskDefinitionName") String name);
/** /**
* query task definition log list * query max version for definition
* *
* @param taskDefinitionCode taskDefinitionCode * @param taskDefinitionCode taskDefinitionCode
* @return task definition log list
*/ */
List<TaskDefinitionLog> queryByDefinitionCode(@Param("taskDefinitionCode") long taskDefinitionCode); Integer queryMaxVersionForDefinition(@Param("taskDefinitionCode") long taskDefinitionCode);
/** /**
* query task definition log * query task definition log

5
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -34,9 +34,8 @@
WHERE p.code = #{projectCode} WHERE p.code = #{projectCode}
and td.name = #{taskDefinitionName} and td.name = #{taskDefinitionName}
</select> </select>
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog"> <select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select select max(version)
<include refid="baseSql"/>
from t_ds_task_definition_log from t_ds_task_definition_log
WHERE code = #{taskDefinitionCode} WHERE code = #{taskDefinitionCode}
</select> </select>

50
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2126,7 +2126,7 @@ public class ProcessService {
/** /**
* switch process definition version to process definition log version * switch process definition version to process definition log version
*/ */
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
if (null == processDefinition || null == processDefinitionLog) { if (null == processDefinition || null == processDefinitionLog) {
return Constants.EXIT_CODE_FAILURE; return Constants.EXIT_CODE_FAILURE;
} }
@ -2135,13 +2135,23 @@ public class ProcessService {
processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); processDefinitionLog.setReleaseState(ReleaseState.OFFLINE);
processDefinitionLog.setFlag(Flag.YES); processDefinitionLog.setFlag(Flag.YES);
int switchResult = 0; int result;
if (0 == processDefinition.getId()) { if (0 == processDefinition.getId()) {
switchResult = processDefineMapper.insert(processDefinitionLog); result = processDefineMapper.insert(processDefinitionLog);
} else { } else {
switchResult = processDefineMapper.updateById(processDefinitionLog); result = processDefineMapper.updateById(processDefinitionLog);
}
return result;
}
/**
* switch process definition version to process definition log version
*/
public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) {
int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog);
if (switchResult != Constants.EXIT_CODE_FAILURE) {
switchProcessTaskRelationVersion(processDefinition);
} }
switchProcessTaskRelationVersion(processDefinition);
return switchResult; return switchResult;
} }
@ -2161,16 +2171,11 @@ public class ProcessService {
*/ */
public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) { public int updateTaskDefinition(User operator, Long projectCode, TaskNode taskNode, TaskDefinition taskDefinition) {
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper.queryByDefinitionCode(taskDefinition.getCode()); Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskDefinition.getCode());
int version = taskDefinitionLogs
.stream()
.map(TaskDefinitionLog::getVersion)
.max((x, y) -> x > y ? x : y)
.orElse(0) + 1;
Date now = new Date(); Date now = new Date();
taskDefinition.setProjectCode(projectCode); taskDefinition.setProjectCode(projectCode);
taskDefinition.setUserId(operator.getId()); taskDefinition.setUserId(operator.getId());
taskDefinition.setVersion(version); taskDefinition.setVersion(version == null || version == 0 ? 1 : version);
taskDefinition.setUpdateTime(now); taskDefinition.setUpdateTime(now);
setTaskFromTaskNode(taskNode, taskDefinition); setTaskFromTaskNode(taskNode, taskDefinition);
int update = taskDefinitionMapper.updateById(taskDefinition); int update = taskDefinitionMapper.updateById(taskDefinition);
@ -2230,7 +2235,7 @@ public class ProcessService {
createTaskAndRelation(operator, project.getCode(), processDefinition, processData); createTaskAndRelation(operator, project.getCode(), processDefinition, processData);
ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(),
name, processData, project, desc, locations, connects); name, processData, project, desc, locations, connects);
return switchVersion(processDefinition, processDefinitionLog); return processDefinitionToDB(processDefinition, processDefinitionLog);
} }
/** /**
@ -2280,13 +2285,12 @@ public class ProcessService {
ProcessDefinition processDefinition, ProcessDefinition processDefinition,
ProcessData processData) { ProcessData processData) {
List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks(); List<TaskNode> taskNodeList = (processData.getTasks() == null) ? new ArrayList<>() : processData.getTasks();
Map<String, Long> taskNameAndCode = new HashMap<>(); Map<String, TaskDefinition> taskNameAndCode = new HashMap<>();
for (TaskNode taskNode : taskNodeList) { for (TaskNode taskNode : taskNodeList) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName()); TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(projectCode, taskNode.getName());
if (taskDefinition == null) { if (taskDefinition == null) {
long code;
try { try {
code = SnowFlakeUtils.getInstance().nextId(); long code = SnowFlakeUtils.getInstance().nextId();
taskDefinition = new TaskDefinition(); taskDefinition = new TaskDefinition();
taskDefinition.setCode(code); taskDefinition.setCode(code);
} catch (SnowFlakeException e) { } catch (SnowFlakeException e) {
@ -2299,7 +2303,7 @@ public class ProcessService {
} }
updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); updateTaskDefinition(operator, projectCode, taskNode, taskDefinition);
} }
taskNameAndCode.put(taskNode.getName(), taskDefinition.getCode()); taskNameAndCode.put(taskNode.getName(), taskDefinition);
} }
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode()); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinition.getCode());
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
@ -2315,8 +2319,10 @@ public class ProcessService {
processDefinition.getVersion(), processDefinition.getVersion(),
projectCode, projectCode,
processDefinition.getCode(), processDefinition.getCode(),
taskNameAndCode.get(preTaskName), taskNameAndCode.get(preTaskName).getCode(),
taskNameAndCode.get(taskNode.getName()), taskNameAndCode.get(preTaskName).getVersion(),
taskNameAndCode.get(taskNode.getName()).getCode(),
taskNameAndCode.get(taskNode.getName()).getVersion(),
ConditionType.of("none"), ConditionType.of("none"),
taskNode.getConditionResult(), taskNode.getConditionResult(),
now, now,
@ -2327,8 +2333,10 @@ public class ProcessService {
processDefinition.getVersion(), processDefinition.getVersion(),
projectCode, projectCode,
processDefinition.getCode(), processDefinition.getCode(),
0L, 0L, // this isn't previous task node, set zero
taskNameAndCode.get(taskNode.getName()), 0,
taskNameAndCode.get(taskNode.getName()).getCode(),
taskNameAndCode.get(taskNode.getName()).getVersion(),
ConditionType.of("none"), ConditionType.of("none"),
taskNode.getConditionResult(), taskNode.getConditionResult(),
now, now,

Loading…
Cancel
Save