diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index e2a79e819d..be0294799e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -264,7 +264,7 @@ public enum Status { BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"), - DELETE_PROCESS_TASK_RELATION_ERROR(50032, "delete process task relation error", "删除工作流任务关系错误"), + CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"), PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"), PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"), @@ -286,7 +286,6 @@ public enum Status { QUERY_DATABASE_STATE_ERROR(70001, "query database state error", "查询数据库状态错误"), QUERY_ZOOKEEPER_STATE_ERROR(70002, "query zookeeper state error", "查询zookeeper状态错误"), - CREATE_ACCESS_TOKEN_ERROR(70010, "create access token error", "创建访问token错误"), GENERATE_TOKEN_ERROR(70011, "generate token error", "生成token错误"), QUERY_ACCESSTOKEN_LIST_PAGING_ERROR(70012, "query access token list paging error", "分页查询访问token列表错误"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index f9c0952dce..fe126ce521 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -201,16 +201,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); - createTaskDefinition(result, loginUser, projectCode, taskDefinitionLogs, taskDefinitionJson); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; + Map checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson); + if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) { + return checkTaskDefinitions; } List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); Map checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs); if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) { return checkRelationJson; } - int tenantId = -1; if (!Constants.DEFAULT.equals(tenantCode)) { Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); @@ -220,60 +219,66 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } tenantId = tenant.getId(); } - long processDefinitionCode; try { processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); } catch (SnowFlakeException e) { - putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); + putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS); return result; } ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, globalParams, locations, timeout, loginUser.getId(), tenantId); - - return createProcessDefine(loginUser, result, taskRelationList, processDefinition, taskDefinitionLogs); + return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs); } - private void createTaskDefinition(Map result, - User loginUser, - long projectCode, - List taskDefinitionLogs, - String taskDefinitionJson) { - if (taskDefinitionLogs.isEmpty()) { - logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson); - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); - return; + private Map createDagDefine(User loginUser, + List taskRelationList, + ProcessDefinition processDefinition, + List taskDefinitionLogs) { + Map result = new HashMap<>(); + int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs); + if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { + logger.info("The task has not changed, so skip"); } - for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { - logger.error("task definition {} parameter invalid", taskDefinitionLog.getName()); - putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); - return; - } + if (saveTaskResult == Constants.DEFINITION_FAILURE) { + putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } - if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) { + int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); + if (insertVersion == 0) { + putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR); + } + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); } else { - putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR); + throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR); } + return result; } - private Map createProcessDefine(User loginUser, - Map result, - List taskRelationList, - ProcessDefinition processDefinition, - List taskDefinitionLogs) { - int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); - if (insertVersion > 0) { - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); - if (insertResult == Constants.EXIT_CODE_SUCCESS) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - } else { - putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); + private Map checkTaskDefinitionList(List taskDefinitionLogs, String taskDefinitionJson) { + Map result = new HashMap<>(); + try { + if (taskDefinitionLogs.isEmpty()) { + logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson); + putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); + return result; } - } else { - putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { + logger.error("task definition {} parameter invalid", taskDefinitionLog.getName()); + putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName()); + return result; + } + } + putMsg(result, Status.SUCCESS); + } catch (Exception e) { + result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); + result.put(Constants.MSG, e.getMessage()); } return result; } @@ -455,8 +460,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param taskDefinitionJson taskDefinitionJson * @return update result code */ - @Transactional(rollbackFor = RuntimeException.class) @Override + @Transactional(rollbackFor = RuntimeException.class) public Map updateProcessDefinition(User loginUser, long projectCode, String name, @@ -476,9 +481,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } List taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); - createTaskDefinition(result, loginUser, projectCode, taskDefinitionLogs, taskDefinitionJson); - if (result.get(Constants.STATUS) != Status.SUCCESS) { - return result; + Map checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson); + if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) { + return checkTaskDefinitions; } List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); Map checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs); @@ -517,15 +522,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId); - return updateProcessDefine(loginUser, result, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs); + return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs); } - private Map updateProcessDefine(User loginUser, - Map result, - List taskRelationList, - ProcessDefinition processDefinition, - ProcessDefinition processDefinitionDeepCopy, - List taskDefinitionLogs) { + private Map updateDagDefine(User loginUser, + List taskRelationList, + ProcessDefinition processDefinition, + ProcessDefinition processDefinitionDeepCopy, + List taskDefinitionLogs) { + Map result = new HashMap<>(); + int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs); + if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) { + logger.info("The task has not changed, so skip"); + } + if (saveTaskResult == Constants.DEFINITION_FAILURE) { + putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); + } int insertVersion; if (processDefinition.equals(processDefinitionDeepCopy)) { insertVersion = processDefinitionDeepCopy.getVersion(); @@ -533,17 +546,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setUpdateTime(new Date()); insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); } - if (insertVersion > 0) { - int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), - processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); - if (insertResult == Constants.EXIT_CODE_SUCCESS) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); - } else { - putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); - } + if (insertVersion == 0) { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); + } + int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), + processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); + if (insertResult == Constants.EXIT_CODE_SUCCESS) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); } else { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR); } return result; } @@ -631,12 +645,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } int delete = processDefinitionMapper.deleteById(processDefinition.getId()); - processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); - if (delete > 0) { - putMsg(result, Status.SUCCESS); - } else { + int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); + if ((delete & deleteRelation) == 0) { putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); + throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); } + putMsg(result, Status.SUCCESS); return result; } @@ -863,7 +877,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList); if ((logInsert & insert) == 0) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); - return false; + throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR); } List taskRelationList = dagDataSchedule.getProcessTaskRelationList(); @@ -876,12 +890,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); taskRelationLogList.add(processTaskRelationLog); } - Map createProcessResult = createProcessDefine(loginUser, result, taskRelationLogList, processDefinition, null); - if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) { - putMsg(createProcessResult, Status.SUCCESS); + Map createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition, Lists.newArrayList()); + if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) { + putMsg(createDagResult, Status.SUCCESS); } else { - result.putAll(createProcessResult); - return false; + result.putAll(createDagResult); + throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR); } Schedule schedule = dagDataSchedule.getSchedule(); @@ -894,7 +908,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro int scheduleInsert = scheduleMapper.insert(schedule); if (0 == scheduleInsert) { putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); - return false; + throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR); } } return true; @@ -1250,6 +1264,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param targetProjectCode targetProjectCode */ @Override + @Transactional(rollbackFor = RuntimeException.class) public Map batchMoveProcessDefinition(User loginUser, long projectCode, String codes, @@ -1313,9 +1328,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processDefinition.setProjectCode(targetProjectCode); if (isCopy) { processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); - createProcessDefine(loginUser, result, taskRelationList, processDefinition, Lists.newArrayList()); + try { + result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, Lists.newArrayList())); + } catch (Exception e) { + putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR); + } } else { - updateProcessDefine(loginUser, result, taskRelationList, processDefinition, null, Lists.newArrayList()); + try { + result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null, Lists.newArrayList())); + } catch (Exception e) { + putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR); + throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR); + } } if (result.get(Constants.STATUS) != Status.SUCCESS) { failedProcessList.add(processDefinition.getCode() + "[" + processDefinition.getName() + "]"); @@ -1333,6 +1358,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @return switch process definition version result code */ @Override + @Transactional(rollbackFor = RuntimeException.class) public Map switchProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) { Project project = projectMapper.queryByCode(projectCode); //check user access for project @@ -1354,10 +1380,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog); if (switchVersion > 0) { - putMsg(result, Status.SUCCESS); - } else { putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); + throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); } + putMsg(result, Status.SUCCESS); return result; } @@ -1424,7 +1450,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param projectCode project code * @param code process definition code * @param version version number - * @return delele result code + * @return delete result code */ @Override @Transactional(rollbackFor = RuntimeException.class) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index f88e20eb44..be0cfffe15 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -449,7 +449,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce return result; } } - if (!processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) { + int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); + if (saveTaskResult == Constants.DEFINITION_FAILURE) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index c35099f4f6..adc9961f25 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -125,15 +125,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe return result; } } - if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) { - Map resData = new HashMap<>(); - resData.put("total", taskDefinitionLogs.size()); - resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, resData); - } else { + int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); + if (saveTaskResult == Constants.DEFINITION_FAILURE) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); + return result; } + Map resData = new HashMap<>(); + resData.put("total", taskDefinitionLogs.size()); + resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ",")); + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, resData); return result; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index cac9c360bc..44ea0b63f1 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -289,10 +289,10 @@ public class ProcessDefinitionServiceTest { processDefinitionList.add(definition); Set definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); - + Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2); Map map3 = processDefinitionService.batchCopyProcessDefinition( loginUser, projectCode, "46", 1L); - Assert.assertEquals(Status.COPY_PROCESS_DEFINITION_ERROR, map3.get(Constants.STATUS)); + Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS)); } @Test @@ -316,16 +316,18 @@ public class ProcessDefinitionServiceTest { Mockito.when(projectService.checkProjectAndAuth(loginUser, project2, projectCode2)).thenReturn(result); ProcessDefinition definition = getProcessDefinition(); + definition.setVersion(1); List processDefinitionList = new ArrayList<>(); processDefinitionList.add(definition); Set definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); + Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2); Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode)); putMsg(result, Status.SUCCESS); Map successRes = processDefinitionService.batchMoveProcessDefinition( loginUser, projectCode, "46", projectCode2); - Assert.assertEquals(Status.MOVE_PROCESS_DEFINITION_ERROR, successRes.get(Constants.STATUS)); + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } @Test @@ -393,19 +395,12 @@ public class ProcessDefinitionServiceTest { Map schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS)); - //delete fail + //delete success schedules.clear(); schedule.setReleaseState(ReleaseState.OFFLINE); schedules.add(schedule); - putMsg(result, Status.SUCCESS, projectCode); - Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); - Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); - Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0); - Map deleteFail = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); - Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, deleteFail.get(Constants.STATUS)); - - //delete success Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); + Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1); Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); putMsg(result, Status.SUCCESS, projectCode); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 630769344b..0e93f66e9d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -419,7 +419,7 @@ public class ProcessInstanceServiceTest { putMsg(result, Status.SUCCESS, projectCode); Map processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root"); - Assert.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS)); + Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS)); //success when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index 18d42144e5..d4e635ba93 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -96,7 +96,7 @@ public class TaskDefinitionServiceImplTest { + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; List taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class); - Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(true); + Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(1); Map relation = taskDefinitionService .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index d02f005fa1..c8619b9785 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2151,7 +2151,7 @@ public class ProcessService { return StringUtils.join(resourceIds, ","); } - public boolean saveTaskDefine(User operator, long projectCode, List taskDefinitionLogs) { + public int saveTaskDefine(User operator, long projectCode, List taskDefinitionLogs) { Date now = new Date(); List newTaskDefinitionLogs = new ArrayList<>(); List updateTaskDefinitionLogs = new ArrayList<>(); @@ -2183,7 +2183,7 @@ public class ProcessService { taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId()); } catch (SnowFlakeException e) { logger.error("Task code get error, ", e); - return false; + return Constants.DEFINITION_FAILURE; } } newTaskDefinitionLogs.add(taskDefinitionLog); @@ -2196,17 +2196,15 @@ public class ProcessService { int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); taskDefinitionToUpdate.setId(task.getId()); int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate); - if ((update & insert) != 1) { - return false; - } + return update & insert; } } if (!newTaskDefinitionLogs.isEmpty()) { int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); - return (logInsert & insert) != 0; + return logInsert & insert; } - return true; + return Constants.EXIT_CODE_SUCCESS; } /**