Browse Source

when delete workflow, delete related task (#12681)

* when delete workflow, delete related task

Co-authored-by: fanwanlong <fanwanlong@kezaihui.com>
3.2.0-release
jackfanwan 2 years ago committed by GitHub
parent
commit
7d0e2cbbb9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 17
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  4. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1077,6 +1077,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId()); throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId());
} }
} }
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(project.getCode(), processDefinition.getCode());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
Set<Long> taskCodeList = new HashSet<>(processTaskRelations.size() * 2);
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() != 0) {
taskCodeList.add(processTaskRelation.getPreTaskCode());
}
if (processTaskRelation.getPostTaskCode() != 0) {
taskCodeList.add(processTaskRelation.getPostTaskCode());
}
}
if (CollectionUtils.isNotEmpty(taskCodeList)) {
int i = taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(taskCodeList));
if (i != taskCodeList.size()) {
logger.error("Delete task definition error, processDefinitionCode:{}.", code);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
}
}
int delete = processDefinitionMapper.deleteById(processDefinition.getId()); int delete = processDefinitionMapper.deleteById(processDefinition.getId());
if (delete == 0) { if (delete == 0) {
logger.error("Delete process definition error, processDefinitionCode:{}.", code); logger.error("Delete process definition error, processDefinitionCode:{}.", code);

17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
@ -78,6 +79,7 @@ import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
@ -127,6 +129,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
@Mock @Mock
private ProcessDefinitionMapper processDefinitionMapper; private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock @Mock
private ProcessDefinitionLogMapper processDefinitionLogMapper; private ProcessDefinitionLogMapper processDefinitionLogMapper;
@ -499,6 +504,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
// delete success // delete success
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processTaskRelationMapper.queryByProcessCode(1, 11))
.thenReturn(getProcessTaskRelation());
Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(2);
Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1); Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())) Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
@ -507,6 +515,13 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet()); .thenReturn(Collections.emptySet());
Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
// delete fail
Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(1);
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L));
Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR.getCode(),
((ServiceException) exception).getCode());
} }
@Test @Test
@ -1121,6 +1136,8 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool {
processTaskRelation.setProjectCode(projectCode); processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(46L); processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setProcessDefinitionVersion(1); processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelation.setPreTaskCode(100);
processTaskRelation.setPostTaskCode(200);
processTaskRelations.add(processTaskRelation); processTaskRelations.add(processTaskRelation);
return processTaskRelations; return processTaskRelations;
} }

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -141,4 +141,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
*/ */
IPage<TaskDefinition> filterTaskDefinition(IPage<TaskDefinition> page, IPage<TaskDefinition> filterTaskDefinition(IPage<TaskDefinition> page,
@Param("task") TaskDefinition taskDefinition); @Param("task") TaskDefinition taskDefinition);
/**
* batch delete task by task code
*
* @param taskCodeList task code list
* @return deleted row count
*/
int deleteByBatchCodes(@Param("taskCodeList") List<Long> taskCodeList);
} }

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -86,6 +86,14 @@
delete from t_ds_task_definition delete from t_ds_task_definition
where code = #{code} where code = #{code}
</delete> </delete>
<delete id="deleteByBatchCodes">
delete from t_ds_task_definition where code in
<foreach collection="taskCodeList" item="taskCode" open="(" separator="," close=")">
#{taskCode}
</foreach>
</delete>
<insert id="batchInsert"> <insert id="batchInsert">
insert into t_ds_task_definition (code, name, version, description, project_code, user_id, insert into t_ds_task_definition (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,

Loading…
Cancel
Save