Browse Source

[fix-12675]edit workflow related task, workflow's task version change (#12692)

* edit workflow related task, workflow's task version change

* Update dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

Co-authored-by: fanwanlong <fanwanlong@kezaihui.com>
Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.2.0-release
jackfanwan 2 years ago committed by GitHub
parent
commit
f249f2b257
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  3. 32
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  4. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  5. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -371,6 +371,8 @@ public enum Status {
"批量删除工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
"批量创建工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
"批量修改工作流任务关系错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -765,6 +765,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
logger.info(
"Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
// update process task relation
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByTaskCode(taskDefinitionToUpdate.getCode());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskCode == processTaskRelation.getPreTaskCode()) {
processTaskRelation.setPreTaskVersion(version);
} else if (taskCode == processTaskRelation.getPostTaskCode()) {
processTaskRelation.setPostTaskVersion(version);
}
int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
if (count != 1) {
logger.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
}
}
}
return taskDefinitionToUpdate;
}

32
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -170,8 +170,19 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
// failure
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
((ServiceException) exception).getCode());
}
@Test
@ -531,4 +542,25 @@ public class TaskDefinitionServiceImplTest {
processTaskRelationList.add(processTaskRelation);
return processTaskRelationList;
}
private List<ProcessTaskRelation> getProcessTaskRelationList2() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(PROJECT_CODE);
processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation.setPreTaskCode(TASK_CODE);
processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
processTaskRelationList.add(processTaskRelation);
ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
processTaskRelation2.setProjectCode(PROJECT_CODE);
processTaskRelation2.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation2.setPreTaskCode(TASK_CODE - 1);
processTaskRelation2.setPostTaskCode(TASK_CODE);
processTaskRelationList.add(processTaskRelation2);
return processTaskRelationList;
}
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -211,4 +211,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
@Param("relation") ProcessTaskRelation processTaskRelation);
/**
* batch update process task relation version
*
* @param processTaskRelationList process task relation list
* @return update num
*/
int updateProcessTaskRelationTaskVersion(@Param("processTaskRelation") ProcessTaskRelation processTaskRelationList);
}

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -221,4 +221,12 @@
</where>
order by update_time desc, id asc
</select>
<update id="updateProcessTaskRelationTaskVersion">
update t_ds_process_task_relation
set
pre_task_version=#{processTaskRelation.preTaskVersion},
post_task_version=#{processTaskRelation.postTaskVersion}
where
id = #{processTaskRelation.id}
</update>
</mapper>

Loading…
Cancel
Save