Browse Source

[Fix-15036] [API] Fix task definition edit doesn't work (#14801)

* [Bug] [task definition] 修复任务定义修改,工作流无法生效问题

* [Bug] [API] Fix task check apply

* [BUG][API] fix updateTaskDefinition method ut

* [BUG][API] fix updateTaskDefinition method code specification

* [BUG][API] fix one task corresponds to multiple workflow problems

* [BUG][API] queryProcessTaskRelationByTaskCodeAndTaskVersion method adjust

* [BUG][API] fix updateTaskDefinition method update

---------

Co-authored-by: David Zollo <dailidong66@gmail.com>
Co-authored-by: 旺阳 <wang@lqwang.net>
Co-authored-by: xiangzihao <460888207@qq.com>
augit-log
小可耐 7 months ago committed by GitHub
parent
commit
ae847ba331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  2. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  3. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  4. 18
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

68
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils.CodeGenerateException;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.Project;
@ -58,6 +59,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskMainInfo;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
@ -141,6 +143,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessDefinitionService processDefinitionService;
@Autowired
private ProcessDefinitionLogMapper processDefinitionLogMapper;
/**
* create task definition
*
@ -781,20 +786,59 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
projectCode, taskCode, taskDefinitionToUpdate.getVersion());
// update process task relation
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByTaskCode(taskDefinitionToUpdate.getCode());
.queryProcessTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(),
taskDefinition.getVersion());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (taskCode == processTaskRelation.getPreTaskCode()) {
processTaskRelation.setPreTaskVersion(version);
} else if (taskCode == processTaskRelation.getPostTaskCode()) {
processTaskRelation.setPostTaskVersion(version);
Map<Long, List<ProcessTaskRelation>> processTaskRelationGroupList = processTaskRelations.stream()
.collect(Collectors.groupingBy(ProcessTaskRelation::getProcessDefinitionCode));
for (Map.Entry<Long, List<ProcessTaskRelation>> processTaskRelationMap : processTaskRelationGroupList
.entrySet()) {
Long processDefinitionCode = processTaskRelationMap.getKey();
int processDefinitionVersion =
processDefinitionLogMapper.queryMaxVersionForDefinition(processDefinitionCode)
+ 1;
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMap.getValue();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
if (taskCode == processTaskRelation.getPreTaskCode()) {
processTaskRelation.setPreTaskVersion(version);
} else if (taskCode == processTaskRelation.getPostTaskCode()) {
processTaskRelation.setPostTaskVersion(version);
}
processTaskRelation.setProcessDefinitionVersion(processDefinitionVersion);
int updateProcessDefinitionVersionCount =
processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
if (updateProcessDefinitionVersionCount != 1) {
log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
}
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setId(null);
processTaskRelationLog.setOperateTime(now);
int insertProcessTaskRelationLogCount = processTaskRelationLogDao.insert(processTaskRelationLog);
if (insertProcessTaskRelationLogCount != 1) {
log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_LOG_ERROR);
}
}
int count = processTaskRelationMapper.updateProcessTaskRelationTaskVersion(processTaskRelation);
if (count != 1) {
log.error("batch update process task relation error, projectCode:{}, taskDefinitionCode:{}.",
projectCode, taskCode);
putMsg(result, Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
throw new ServiceException(Status.PROCESS_TASK_RELATION_BATCH_UPDATE_ERROR);
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
processDefinition.setVersion(processDefinitionVersion);
processDefinition.setUpdateTime(now);
processDefinition.setUserId(loginUser.getId());
// update process definition
int updateProcessDefinitionCount = processDefinitionMapper.updateById(processDefinition);
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setId(null);
processDefinitionLog.setOperator(loginUser.getId());
int insertProcessDefinitionLogCount = processDefinitionLogMapper.insert(processDefinitionLog);
if ((updateProcessDefinitionCount & insertProcessDefinitionLogCount) != 1) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
}
}

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -129,6 +129,9 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessTaskRelationLogDao processTaskRelationLogDao;
@Mock
private ProcessDefinitionLogMapper processDefinitionLogMapper;
private static final String TASK_PARAMETER =
"{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"echo 1\",\"conditionResult\":{\"successNode\":[\"\"],\"failedNode\":[\"\"]},\"dependence\":{}}";;
private static final long PROJECT_CODE = 1L;
@ -188,9 +191,14 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(processTaskRelationLogDao.insert(Mockito.any(ProcessTaskRelationLog.class))).thenReturn(1);
Mockito.when(processDefinitionMapper.queryByCode(2L)).thenReturn(new ProcessDefinition());
Mockito.when(processDefinitionMapper.updateById(Mockito.any(ProcessDefinition.class))).thenReturn(1);
Mockito.when(processDefinitionLogMapper.insert(Mockito.any(ProcessDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1);
Mockito.when(taskPluginManager.checkTaskParameters(Mockito.any())).thenReturn(true);
Mockito.when(processTaskRelationMapper.queryByTaskCode(3)).thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper.queryProcessTaskRelationByTaskCodeAndTaskVersion(TASK_CODE, 0))
.thenReturn(getProcessTaskRelationList2());
Mockito.when(processTaskRelationMapper
.updateProcessTaskRelationTaskVersion(Mockito.any(ProcessTaskRelation.class))).thenReturn(1);
result = taskDefinitionService.updateTaskDefinition(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson);

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -225,4 +225,14 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
void deleteByWorkflowDefinitionCode(@Param("workflowDefinitionCode") long workflowDefinitionCode,
@Param("workflowDefinitionVersion") int workflowDefinitionVersion);
/**
* process task relation by taskCode and postTaskVersion
*
* @param taskCode taskCode
* @param postTaskVersion postTaskVersion
* @return ProcessTaskRelation
*/
List<ProcessTaskRelation> queryProcessTaskRelationByTaskCodeAndTaskVersion(@Param("taskCode") long taskCode,
@Param("postTaskVersion") long postTaskVersion);
}

18
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -231,7 +231,8 @@
<update id="updateProcessTaskRelationTaskVersion">
update t_ds_process_task_relation
set pre_task_version=#{processTaskRelation.preTaskVersion},
post_task_version=#{processTaskRelation.postTaskVersion}
post_task_version=#{processTaskRelation.postTaskVersion},
process_definition_version=#{processTaskRelation.processDefinitionVersion}
where id = #{processTaskRelation.id}
</update>
@ -240,4 +241,19 @@
from t_ds_process_task_relation
where process_definition_code = #{workflowDefinitionCode} and process_definition_version = #{workflowDefinitionVersion}
</delete>
<select id="queryProcessTaskRelationByTaskCodeAndTaskVersion" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
<include refid="baseSql"/>
from t_ds_process_task_relation
WHERE process_definition_code in (
SELECT
process_definition_code
FROM
t_ds_process_task_relation
WHERE
post_task_code = #{taskCode}
and post_task_version = #{postTaskVersion}
)
</select>
</mapper>

Loading…
Cancel
Save