diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 44c2a21a2d..5897807a0f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -1077,6 +1077,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro throw new ServiceException(Status.SCHEDULE_STATE_ONLINE, scheduleObj.getId()); } } + List processTaskRelations = processTaskRelationMapper + .queryByProcessCode(project.getCode(), processDefinition.getCode()); + if (CollectionUtils.isNotEmpty(processTaskRelations)) { + Set taskCodeList = new HashSet<>(processTaskRelations.size() * 2); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPreTaskCode() != 0) { + taskCodeList.add(processTaskRelation.getPreTaskCode()); + } + if (processTaskRelation.getPostTaskCode() != 0) { + taskCodeList.add(processTaskRelation.getPostTaskCode()); + } + } + if (CollectionUtils.isNotEmpty(taskCodeList)) { + int i = taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(taskCodeList)); + if (i != taskCodeList.size()) { + logger.error("Delete task definition error, processDefinitionCode:{}.", code); + throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR); + } + } + } int delete = processDefinitionMapper.deleteById(processDefinition.getId()); if (delete == 0) { logger.error("Delete process definition error, processDefinitionCode:{}.", code); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8722558d45..8b600696b7 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -65,6 +65,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.model.PageListingResult; import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao; @@ -78,6 +79,7 @@ import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -127,6 +129,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { @Mock private ProcessDefinitionMapper processDefinitionMapper; + @Mock + private TaskDefinitionMapper taskDefinitionMapper; + @Mock private ProcessDefinitionLogMapper processDefinitionLogMapper; @@ -499,6 +504,9 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { // delete success schedule.setReleaseState(ReleaseState.OFFLINE); + Mockito.when(processTaskRelationMapper.queryByProcessCode(1, 11)) + .thenReturn(getProcessTaskRelation()); + Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(2); Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1); Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())) @@ -507,6 +515,13 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode())) .thenReturn(Collections.emptySet()); Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); + + // delete fail + Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(1); + exception = Assertions.assertThrows(ServiceException.class, + () -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L)); + Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR.getCode(), + ((ServiceException) exception).getCode()); } @Test @@ -1121,6 +1136,8 @@ public class ProcessDefinitionServiceTest extends BaseServiceTestTool { processTaskRelation.setProjectCode(projectCode); processTaskRelation.setProcessDefinitionCode(46L); processTaskRelation.setProcessDefinitionVersion(1); + processTaskRelation.setPreTaskCode(100); + processTaskRelation.setPostTaskCode(200); processTaskRelations.add(processTaskRelation); return processTaskRelations; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java index 629cd9bf7b..17243de049 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java @@ -141,4 +141,12 @@ public interface TaskDefinitionMapper extends BaseMapper { */ IPage filterTaskDefinition(IPage page, @Param("task") TaskDefinition taskDefinition); + + /** + * batch delete task by task code + * + * @param taskCodeList task code list + * @return deleted row count + */ + int deleteByBatchCodes(@Param("taskCodeList") List taskCodeList); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml index 2c6a5cbe8a..46c6dfcbcf 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml @@ -86,6 +86,14 @@ delete from t_ds_task_definition where code = #{code} + + + delete from t_ds_task_definition where code in + + #{taskCode} + + + insert into t_ds_task_definition (code, name, version, description, project_code, user_id, task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,