Browse Source

cherry-pick [fix-12675]edit workflow related task, workflow's task version change

3.1.1-release
jackfanwan 2 years ago committed by zhuangchong
parent
commit
c7f990791e
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  3. 41
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  4. 17
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  5. 31
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -311,6 +311,22 @@ public enum Status {
NOT_SUPPORT_UPDATE_TASK_DEFINITION(50056, "task state does not support modification", "当前任务不支持修改"),
BATCH_EXECUTE_PROCESS_INSTANCE_ERROR(50058, "change process instance status error: {0}", "修改工作实例状态错误: {0}"),
START_TASK_INSTANCE_ERROR(50059, "start task instance error", "运行任务流实例错误"),
DELETE_PROCESS_DEFINE_ERROR(50060, "delete process definition [{0}] error: {1}", "删除工作流定义[{0}]错误: {1}"),
CREATE_TASK_DEFINITION_LOG_ERROR(50061, "create task definition log {0} error", "创建任务操作记录 {0} 错误"),
DELETE_TASK_DEFINE_BY_CODE_MSG_ERROR(50062, "delete task definition {0} error", "删除任务定义 {0} 错误"),
TASK_DEFINITION_NOT_CHANGE(50063, "task definition {0} do not change", "任务定义 {0} 没有变化"),
TASK_DEFINITION_NOT_EXISTS(50064, "task definition {0} do not exists", "任务定义 {0} 不存在"),
UPDATE_UPSTREAM_TASK_PROCESS_RELATION_ERROR(50065, "update task upstream relation error", "更新任务上游关系错误"),
CREATE_PROCESS_TASK_RELATION_LOG_ERROR(50066, "create process task relation log {0}-{1} error",
"创建任务关系日志 {0}-{1} 错误"),
PROCESS_TASK_RELATION_NOT_EXPECT(50067, "process task relation number not expect, expect {0} but get {1}",
"工作流任务关系数量不符合预期,预期 {0} 但是实际 {1}"),
PROCESS_TASK_RELATION_BATCH_DELETE_ERROR(50068, "batch delete process task relation {0} error",
"批量删除工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_CREATE_ERROR(50069, "batch create process task relation {0} error",
"批量创建工作流任务关系 {0} 错误"),
PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR(50070, "batch update process task relation error",
"批量修改工作流任务关系错误"),
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -534,6 +534,25 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
logger.info(
"Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.",
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
// update process task relation
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByTaskCode(taskDefinitionToUpdate.getCode());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskCode == processTaskRelation.getPreTaskCode()) {
processTaskRelation.setPreTaskVersion(version);
} else if (taskCode == processTaskRelation.getPostTaskCode()) {
processTaskRelation.setPostTaskVersion(version);
}
int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
if (count != 1) {
logger.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
}
}
}
return taskDefinitionToUpdate;
}

41
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -26,10 +26,7 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -140,8 +137,19 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
result = taskDefinitionService.updateTaskDefinition(loginUser, projectCode, taskCode, taskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
// failure
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(2);
exception = Assertions.assertThrows(ServiceException.class,
() -> taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson));
Assertions.assertEquals(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR.getCode(),
((ServiceException) exception).getCode());
}
@Test
@ -336,4 +344,25 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> failResult = taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode, ReleaseState.getEnum(2));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS));
}
private List<ProcessTaskRelation> getProcessTaskRelationList2() {
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(PROJECT_CODE);
processTaskRelation.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation.setPreTaskCode(TASK_CODE);
processTaskRelation.setPostTaskCode(TASK_CODE + 1L);
processTaskRelationList.add(processTaskRelation);
ProcessTaskRelation processTaskRelation2 = new ProcessTaskRelation();
processTaskRelation2.setProjectCode(PROJECT_CODE);
processTaskRelation2.setProcessDefinitionCode(PROCESS_DEFINITION_CODE);
processTaskRelation2.setPreTaskCode(TASK_CODE - 1);
processTaskRelation2.setPostTaskCode(TASK_CODE);
processTaskRelationList.add(processTaskRelation2);
return processTaskRelationList;
}
}

17
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@ -197,5 +198,21 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/
List<ProcessTaskRelation> queryDownstreamByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* Filter process task relation
*
* @param page page
* @param processTaskRelation process definition object
* @return process task relation IPage
*/
IPage<ProcessTaskRelation> filterProcessTaskRelation(IPage<ProcessTaskRelation> page,
@Param("relation") ProcessTaskRelation processTaskRelation);
/**
* batch update process task relation version
*
* @param processTaskRelationList process task relation list
* @return update num
*/
int updateProcessTaskRelationTaskVersion(@Param("processTaskRelation") ProcessTaskRelation processTaskRelationList);
}

31
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -200,4 +200,35 @@
and post_task_code = #{postTaskCode}
</if>
</select>
<select id="filterProcessTaskRelation"
parameterType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
SELECT
<include refid="baseSql"/>
FROM t_ds_process_task_relation
<where>
<if test=" relation.projectCode != 0">
and project_code = #{relation.projectCode}
</if>
<if test=" relation.processDefinitionCode != 0">
and process_definition_code = #{relation.processDefinitionCode}
</if>
<if test=" relation.preTaskCode != 0">
and pre_task_code = #{relation.preTaskCode}
</if>
<if test=" relation.postTaskCode != 0">
and post_task_code = #{relation.postTaskCode}
</if>
</where>
order by update_time desc, id asc
</select>
<update id="updateProcessTaskRelationTaskVersion">
update t_ds_process_task_relation
set
pre_task_version=#{processTaskRelation.preTaskVersion},
post_task_version=#{processTaskRelation.postTaskVersion}
where
id = #{processTaskRelation.id}
</update>
</mapper>

Loading…
Cancel
Save