Browse Source

cherry-pick when delete workflow, delete related task #12681

3.1.3-release
jackfanwan 2 years ago committed by zhuangchong
parent
commit
47c06ae041
  1. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  2. 96
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  3. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  4. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -874,6 +874,26 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result;
}
}
List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper
.queryByProcessCode(project.getCode(), processDefinition.getCode());
if (CollectionUtils.isNotEmpty(processTaskRelations)) {
Set<Long> taskCodeList = new HashSet<>(processTaskRelations.size() * 2);
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() != 0) {
taskCodeList.add(processTaskRelation.getPreTaskCode());
}
if (processTaskRelation.getPostTaskCode() != 0) {
taskCodeList.add(processTaskRelation.getPostTaskCode());
}
}
if (CollectionUtils.isNotEmpty(taskCodeList)) {
int i = taskDefinitionMapper.deleteByBatchCodes(new ArrayList<>(taskCodeList));
if (i != taskCodeList.size()) {
logger.error("Delete task definition error, processDefinitionCode:{}.", code);
throw new ServiceException(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR);
}
}
}
int delete = processDefinitionMapper.deleteById(processDefinition.getId());
if (delete == 0) {
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);

96
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -57,9 +57,11 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.model.PageListingResult;
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionDao;
@ -89,6 +91,7 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
@ -120,7 +123,13 @@ public class ProcessDefinitionServiceTest {
private ProcessDefinitionServiceImpl processDefinitionService;
@Mock
private ProcessDefinitionMapper processDefineMapper;
private ProcessDefinitionMapper processDefinitionMapper;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock
private ProcessDefinitionLogMapper processDefinitionLogMapper;
@Mock
private ProcessDefinitionDao processDefinitionDao;
@ -155,6 +164,29 @@ public class ProcessDefinitionServiceTest {
@Mock
private WorkFlowLineageService workFlowLineageService;
protected User user;
protected Exception exception;
protected final static long projectCode = 1L;
protected final static long projectCodeOther = 2L;
protected final static long processDefinitionCode = 11L;
protected final static String name = "testProcessDefinitionName";
protected final static String description = "this is a description";
protected final static String releaseState = "ONLINE";
protected final static int warningGroupId = 1;
protected final static int timeout = 60;
protected final static String executionType = "PARALLEL";
protected final static String tenantCode = "tenant";
@BeforeEach
public void before() {
User loginUser = new User();
loginUser.setId(1);
loginUser.setTenantId(2);
loginUser.setUserType(UserType.GENERAL_USER);
loginUser.setUserName("admin");
user = loginUser;
}
@Test
public void testQueryProcessDefinitionList() {
long projectCode = 1L;
@ -180,7 +212,7 @@ public class ProcessDefinitionServiceTest {
.thenReturn(result);
List<ProcessDefinition> resourceList = new ArrayList<>();
resourceList.add(getProcessDefinition());
Mockito.when(processDefineMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
Mockito.when(processDefinitionMapper.queryAllDefinitionList(project.getCode())).thenReturn(resourceList);
Map<String, Object> checkSuccessRes =
processDefinitionService.queryProcessDefinitionList(loginUser, projectCode);
Assert.assertEquals(Status.SUCCESS, checkSuccessRes.get(Constants.STATUS));
@ -265,7 +297,7 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotexitRes.get(Constants.STATUS));
// instance exit
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
@ -300,14 +332,14 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
.thenReturn(result);
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test_def")).thenReturn(null);
Map<String, Object> instanceNotExitRes =
processDefinitionService.queryProcessDefinitionByName(loginUser, projectCode, "test_def");
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
// instance exit
Mockito.when(processDefineMapper.queryByDefineName(project.getCode(), "test"))
Mockito.when(processDefinitionMapper.queryByDefineName(project.getCode(), "test"))
.thenReturn(getProcessDefinition());
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION))
@ -356,7 +388,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition);
Set<Long> definitionCodes =
Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, "46", 1L);
@ -391,7 +423,7 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition);
Set<Long> definitionCodes =
Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, Boolean.TRUE, Boolean.TRUE)).thenReturn(2);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L))
.thenReturn(getProcessTaskRelation(projectCode));
@ -424,7 +456,7 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_DEFINITION_DELETE))
.thenReturn(result);
Mockito.when(processDefineMapper.queryByCode(1L)).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByCode(1L)).thenReturn(null);
Map<String, Object> instanceNotExitRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 1L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, instanceNotExitRes.get(Constants.STATUS));
@ -435,7 +467,7 @@ public class ProcessDefinitionServiceTest {
.thenReturn(result);
// user no auth
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> userNoAuthRes =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoAuthRes.get(Constants.STATUS));
@ -444,7 +476,7 @@ public class ProcessDefinitionServiceTest {
loginUser.setUserType(UserType.ADMIN_USER);
putMsg(result, Status.SUCCESS, projectCode);
processDefinition.setReleaseState(ReleaseState.ONLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Throwable exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L));
String formatter =
@ -453,11 +485,11 @@ public class ProcessDefinitionServiceTest {
// scheduler list elements > 1
processDefinition.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(scheduleMapper.deleteById(46)).thenReturn(1);
Mockito.when(processDefineMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processDefinitionMapper.deleteById(processDefinition.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
.thenReturn(1);
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
@ -491,17 +523,25 @@ public class ProcessDefinitionServiceTest {
// delete success
schedule.setReleaseState(ReleaseState.OFFLINE);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(processTaskRelationMapper.queryByProcessCode(1, 11))
.thenReturn(getProcessTaskRelation(projectCode));
Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(2);
Mockito.when(processDefinitionMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()))
.thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(getSchedule());
Mockito.when(workFlowLineageService.queryTaskDepOnProcess(project.getCode(), processDefinition.getCode()))
.thenReturn(Collections.emptySet());
putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> deleteSuccess =
processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, deleteSuccess.get(Constants.STATUS));
Assertions.assertDoesNotThrow(() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L, processDefinitionCode));
// delete fail
Mockito.when(taskDefinitionMapper.deleteByBatchCodes(Arrays.asList(100L, 200L))).thenReturn(1);
exception = Assertions.assertThrows(ServiceException.class,
() -> processDefinitionService.deleteProcessDefinitionByCode(user, 46L, processDefinitionCode));
Assertions.assertEquals(Status.DELETE_TASK_DEFINE_BY_CODE_ERROR.getCode(),
((ServiceException) exception).getCode());
}
@Test
@ -525,7 +565,7 @@ public class ProcessDefinitionServiceTest {
// project check auth success, processs definition online
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(getProcessDefinition());
List<ProcessTaskRelation> processTaskRelationList = new ArrayList<>();
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setProjectCode(projectCode);
@ -570,13 +610,13 @@ public class ProcessDefinitionServiceTest {
// project check auth success, process not exist
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf")).thenReturn(null);
Map<String, Object> processNotExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf", 0);
Assert.assertEquals(Status.SUCCESS, processNotExistRes.get(Constants.STATUS));
// process exist
Mockito.when(processDefineMapper.verifyByDefineName(project.getCode(), "test_pdf"))
Mockito.when(processDefinitionMapper.verifyByDefineName(project.getCode(), "test_pdf"))
.thenReturn(getProcessDefinition());
Map<String, Object> processExistRes = processDefinitionService.verifyProcessDefinitionName(loginUser,
projectCode, "test_pdf", 0);
@ -610,7 +650,7 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, null)).thenReturn(result);
// process definition not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(null);
Map<String, Object> processDefinitionNullRes =
processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionNullRes.get(Constants.STATUS));
@ -619,7 +659,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Map<String, Object> dataNotValidRes =
processDefinitionService.getTaskNodeListByDefinitionCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SUCCESS, dataNotValidRes.get(Constants.STATUS));
@ -643,7 +683,7 @@ public class ProcessDefinitionServiceTest {
String defineCodes = "46";
Set<Long> defineCodeSet = Lists.newArrayList(defineCodes.split(Constants.COMMA)).stream().map(Long::parseLong)
.collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(null);
Mockito.when(processDefinitionMapper.queryByCodes(defineCodeSet)).thenReturn(null);
Map<String, Object> processNotExistRes =
processDefinitionService.getNodeListMapByDefinitionCodes(loginUser, projectCode, defineCodes);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST, processNotExistRes.get(Constants.STATUS));
@ -653,7 +693,7 @@ public class ProcessDefinitionServiceTest {
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryByCodes(defineCodeSet)).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryByCodes(defineCodeSet)).thenReturn(processDefinitionList);
Mockito.when(processService.genDagData(Mockito.any())).thenReturn(new DagData(processDefinition, null, null));
Project project1 = getProject(projectCode);
List<Project> projects = new ArrayList<>();
@ -680,7 +720,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(processDefinition);
Mockito.when(processDefineMapper.queryAllDefinitionList(projectCode)).thenReturn(processDefinitionList);
Mockito.when(processDefinitionMapper.queryAllDefinitionList(projectCode)).thenReturn(processDefinitionList);
Map<String, Object> successRes =
processDefinitionService.queryAllProcessDefinitionByProjectCode(loginUser, projectCode);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
@ -709,7 +749,7 @@ public class ProcessDefinitionServiceTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectMapper.queryByCode(1)).thenReturn(project1);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project1, 1, WORKFLOW_TREE_VIEW)).thenReturn(result);
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new DAG<>());
Map<String, Object> taskNullRes =
processDefinitionService.viewTree(loginUser, processDefinition.getProjectCode(), 46, 10);
@ -727,7 +767,7 @@ public class ProcessDefinitionServiceTest {
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processDefinitionMapper.queryByCode(46L)).thenReturn(processDefinition);
Project project1 = getProject(1);
Map<String, Object> result = new HashMap<>();
@ -897,6 +937,8 @@ public class ProcessDefinitionServiceTest {
processTaskRelation.setProjectCode(projectCode);
processTaskRelation.setProcessDefinitionCode(46L);
processTaskRelation.setProcessDefinitionVersion(1);
processTaskRelation.setPreTaskCode(100);
processTaskRelation.setPostTaskCode(200);
processTaskRelations.add(processTaskRelation);
return processTaskRelations;
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -131,4 +131,12 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* @return task definition list
*/
List<TaskDefinition> queryByCodeList(@Param("codes") Collection<Long> codes);
/**
* batch delete task by task code
*
* @param taskCodeList task code list
* @return deleted row count
*/
int deleteByBatchCodes(@Param("taskCodeList") List<Long> taskCodeList);
}

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -86,6 +86,14 @@
delete from t_ds_task_definition
where code = #{code}
</delete>
<delete id="deleteByBatchCodes">
delete from t_ds_task_definition where code in
<foreach collection="taskCodeList" item="taskCode" open="(" separator="," close=")">
#{taskCode}
</foreach>
</delete>
<insert id="batchInsert">
insert into t_ds_task_definition (code, name, version, description, project_code, user_id,
task_type, task_params, flag, task_priority, worker_group, environment_code, fail_retry_times, fail_retry_interval,

Loading…
Cancel
Save