Browse Source

[Fix-6248] [API] fix processDefinition save/update/delete/move/import transaction (#6285)

* fix processDefinition save/update/delete/move/import transaction

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinYong Li 3 years ago committed by GitHub
parent
commit
7a321505cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 178
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 3
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  4. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  5. 19
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  6. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  7. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  8. 12
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -264,7 +264,7 @@ public enum Status {
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"), TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"),
DELETE_PROCESS_TASK_RELATION_ERROR(50032, "delete process task relation error", "删除工作流任务关系错误"), CREATE_PROCESS_TASK_RELATION_ERROR(50032, "create process task relation error", "创建工作流任务关系错误"),
PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"), PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"),
PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"),
PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"), PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"),
@ -286,7 +286,6 @@ public enum Status {
QUERY_DATABASE_STATE_ERROR(70001, "query database state error", "查询数据库状态错误"), QUERY_DATABASE_STATE_ERROR(70001, "query database state error", "查询数据库状态错误"),
QUERY_ZOOKEEPER_STATE_ERROR(70002, "query zookeeper state error", "查询zookeeper状态错误"), QUERY_ZOOKEEPER_STATE_ERROR(70002, "query zookeeper state error", "查询zookeeper状态错误"),
CREATE_ACCESS_TOKEN_ERROR(70010, "create access token error", "创建访问token错误"), CREATE_ACCESS_TOKEN_ERROR(70010, "create access token error", "创建访问token错误"),
GENERATE_TOKEN_ERROR(70011, "generate token error", "生成token错误"), GENERATE_TOKEN_ERROR(70011, "generate token error", "生成token错误"),
QUERY_ACCESSTOKEN_LIST_PAGING_ERROR(70012, "query access token list paging error", "分页查询访问token列表错误"), QUERY_ACCESSTOKEN_LIST_PAGING_ERROR(70012, "query access token list paging error", "分页查询访问token列表错误"),

178
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -201,16 +201,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result; return result;
} }
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
createTaskDefinition(result, loginUser, projectCode, taskDefinitionLogs, taskDefinitionJson); Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
return result; return checkTaskDefinitions;
} }
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs); Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) { if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) {
return checkRelationJson; return checkRelationJson;
} }
int tenantId = -1; int tenantId = -1;
if (!Constants.DEFAULT.equals(tenantCode)) { if (!Constants.DEFAULT.equals(tenantCode)) {
Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); Tenant tenant = tenantMapper.queryByTenantCode(tenantCode);
@ -220,60 +219,66 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
tenantId = tenant.getId(); tenantId = tenant.getId();
} }
long processDefinitionCode; long processDefinitionCode;
try { try {
processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); processDefinitionCode = SnowFlakeUtils.getInstance().nextId();
} catch (SnowFlakeException e) { } catch (SnowFlakeException e) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS);
return result; return result;
} }
ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description, ProcessDefinition processDefinition = new ProcessDefinition(projectCode, name, processDefinitionCode, description,
globalParams, locations, timeout, loginUser.getId(), tenantId); globalParams, locations, timeout, loginUser.getId(), tenantId);
return createDagDefine(loginUser, taskRelationList, processDefinition, taskDefinitionLogs);
return createProcessDefine(loginUser, result, taskRelationList, processDefinition, taskDefinitionLogs);
} }
private void createTaskDefinition(Map<String, Object> result, private Map<String, Object> createDagDefine(User loginUser,
User loginUser, List<ProcessTaskRelationLog> taskRelationList,
long projectCode, ProcessDefinition processDefinition,
List<TaskDefinitionLog> taskDefinitionLogs, List<TaskDefinitionLog> taskDefinitionLogs) {
String taskDefinitionJson) { Map<String, Object> result = new HashMap<>();
if (taskDefinitionLogs.isEmpty()) { int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs);
logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson); if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson); logger.info("The task has not changed, so skip");
return;
} }
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { if (saveTaskResult == Constants.DEFINITION_FAILURE) {
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) { putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
logger.error("task definition {} parameter invalid", taskDefinitionLog.getName()); throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return;
}
} }
if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) { int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
if (insertVersion == 0) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else { } else {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
} }
return result;
} }
private Map<String, Object> createProcessDefine(User loginUser, private Map<String, Object> checkTaskDefinitionList(List<TaskDefinitionLog> taskDefinitionLogs, String taskDefinitionJson) {
Map<String, Object> result, Map<String, Object> result = new HashMap<>();
List<ProcessTaskRelationLog> taskRelationList, try {
ProcessDefinition processDefinition, if (taskDefinitionLogs.isEmpty()) {
List<TaskDefinitionLog> taskDefinitionLogs) { logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
if (insertVersion > 0) { return result;
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR);
} }
} else { for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
putMsg(result, Status.CREATE_PROCESS_DEFINITION_ERROR); if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionLog)) {
logger.error("task definition {} parameter invalid", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
return result;
}
}
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
result.put(Constants.MSG, e.getMessage());
} }
return result; return result;
} }
@ -455,8 +460,8 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param taskDefinitionJson taskDefinitionJson * @param taskDefinitionJson taskDefinitionJson
* @return update result code * @return update result code
*/ */
@Transactional(rollbackFor = RuntimeException.class)
@Override @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> updateProcessDefinition(User loginUser, public Map<String, Object> updateProcessDefinition(User loginUser,
long projectCode, long projectCode,
String name, String name,
@ -476,9 +481,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
createTaskDefinition(result, loginUser, projectCode, taskDefinitionLogs, taskDefinitionJson); Map<String, Object> checkTaskDefinitions = checkTaskDefinitionList(taskDefinitionLogs, taskDefinitionJson);
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (checkTaskDefinitions.get(Constants.STATUS) != Status.SUCCESS) {
return result; return checkTaskDefinitions;
} }
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs); Map<String, Object> checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson, taskDefinitionLogs);
@ -517,15 +522,23 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class); ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId); processDefinition.set(projectCode, name, description, globalParams, locations, timeout, tenantId);
return updateProcessDefine(loginUser, result, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs); return updateDagDefine(loginUser, taskRelationList, processDefinition, processDefinitionDeepCopy, taskDefinitionLogs);
} }
private Map<String, Object> updateProcessDefine(User loginUser, private Map<String, Object> updateDagDefine(User loginUser,
Map<String, Object> result, List<ProcessTaskRelationLog> taskRelationList,
List<ProcessTaskRelationLog> taskRelationList, ProcessDefinition processDefinition,
ProcessDefinition processDefinition, ProcessDefinition processDefinitionDeepCopy,
ProcessDefinition processDefinitionDeepCopy, List<TaskDefinitionLog> taskDefinitionLogs) {
List<TaskDefinitionLog> taskDefinitionLogs) { Map<String, Object> result = new HashMap<>();
int saveTaskResult = processService.saveTaskDefine(loginUser, processDefinition.getProjectCode(), taskDefinitionLogs);
if (saveTaskResult == Constants.EXIT_CODE_SUCCESS) {
logger.info("The task has not changed, so skip");
}
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
int insertVersion; int insertVersion;
if (processDefinition.equals(processDefinitionDeepCopy)) { if (processDefinition.equals(processDefinitionDeepCopy)) {
insertVersion = processDefinitionDeepCopy.getVersion(); insertVersion = processDefinitionDeepCopy.getVersion();
@ -533,17 +546,18 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setUpdateTime(new Date()); processDefinition.setUpdateTime(new Date());
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true); insertVersion = processService.saveProcessDefine(loginUser, processDefinition, true);
} }
if (insertVersion > 0) { if (insertVersion == 0) {
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
if (insertResult == Constants.EXIT_CODE_SUCCESS) { }
putMsg(result, Status.SUCCESS); int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
result.put(Constants.DATA_LIST, processDefinition); processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
} else { if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.SUCCESS);
} result.put(Constants.DATA_LIST, processDefinition);
} else { } else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
} }
return result; return result;
} }
@ -631,12 +645,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
int delete = processDefinitionMapper.deleteById(processDefinition.getId()); int delete = processDefinitionMapper.deleteById(processDefinition.getId());
processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode()); int deleteRelation = processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode());
if (delete > 0) { if ((delete & deleteRelation) == 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR); putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR);
} }
putMsg(result, Status.SUCCESS);
return result; return result;
} }
@ -863,7 +877,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList); int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogList);
if ((logInsert & insert) == 0) { if ((logInsert & insert) == 0) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return false; throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
} }
List<ProcessTaskRelation> taskRelationList = dagDataSchedule.getProcessTaskRelationList(); List<ProcessTaskRelation> taskRelationList = dagDataSchedule.getProcessTaskRelationList();
@ -876,12 +890,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
taskRelationLogList.add(processTaskRelationLog); taskRelationLogList.add(processTaskRelationLog);
} }
Map<String, Object> createProcessResult = createProcessDefine(loginUser, result, taskRelationLogList, processDefinition, null); Map<String, Object> createDagResult = createDagDefine(loginUser, taskRelationLogList, processDefinition, Lists.newArrayList());
if (Status.SUCCESS.equals(createProcessResult.get(Constants.STATUS))) { if (Status.SUCCESS.equals(createDagResult.get(Constants.STATUS))) {
putMsg(createProcessResult, Status.SUCCESS); putMsg(createDagResult, Status.SUCCESS);
} else { } else {
result.putAll(createProcessResult); result.putAll(createDagResult);
return false; throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
} }
Schedule schedule = dagDataSchedule.getSchedule(); Schedule schedule = dagDataSchedule.getSchedule();
@ -894,7 +908,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
int scheduleInsert = scheduleMapper.insert(schedule); int scheduleInsert = scheduleMapper.insert(schedule);
if (0 == scheduleInsert) { if (0 == scheduleInsert) {
putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR); putMsg(result, Status.IMPORT_PROCESS_DEFINE_ERROR);
return false; throw new ServiceException(Status.IMPORT_PROCESS_DEFINE_ERROR);
} }
} }
return true; return true;
@ -1250,6 +1264,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param targetProjectCode targetProjectCode * @param targetProjectCode targetProjectCode
*/ */
@Override @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> batchMoveProcessDefinition(User loginUser, public Map<String, Object> batchMoveProcessDefinition(User loginUser,
long projectCode, long projectCode,
String codes, String codes,
@ -1313,9 +1328,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
processDefinition.setProjectCode(targetProjectCode); processDefinition.setProjectCode(targetProjectCode);
if (isCopy) { if (isCopy) {
processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp()); processDefinition.setName(processDefinition.getName() + "_copy_" + DateUtils.getCurrentTimeStamp());
createProcessDefine(loginUser, result, taskRelationList, processDefinition, Lists.newArrayList()); try {
result.putAll(createDagDefine(loginUser, taskRelationList, processDefinition, Lists.newArrayList()));
} catch (Exception e) {
putMsg(result, Status.COPY_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.COPY_PROCESS_DEFINITION_ERROR);
}
} else { } else {
updateProcessDefine(loginUser, result, taskRelationList, processDefinition, null, Lists.newArrayList()); try {
result.putAll(updateDagDefine(loginUser, taskRelationList, processDefinition, null, Lists.newArrayList()));
} catch (Exception e) {
putMsg(result, Status.MOVE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.MOVE_PROCESS_DEFINITION_ERROR);
}
} }
if (result.get(Constants.STATUS) != Status.SUCCESS) { if (result.get(Constants.STATUS) != Status.SUCCESS) {
failedProcessList.add(processDefinition.getCode() + "[" + processDefinition.getName() + "]"); failedProcessList.add(processDefinition.getCode() + "[" + processDefinition.getName() + "]");
@ -1333,6 +1358,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return switch process definition version result code * @return switch process definition version result code
*/ */
@Override @Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) { public Map<String, Object> switchProcessDefinitionVersion(User loginUser, long projectCode, long code, int version) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
//check user access for project //check user access for project
@ -1354,10 +1380,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog); int switchVersion = processService.switchVersion(processDefinition, processDefinitionLog);
if (switchVersion > 0) { if (switchVersion > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR); putMsg(result, Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
throw new ServiceException(Status.SWITCH_PROCESS_DEFINITION_VERSION_ERROR);
} }
putMsg(result, Status.SUCCESS);
return result; return result;
} }
@ -1424,7 +1450,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param projectCode project code * @param projectCode project code
* @param code process definition code * @param code process definition code
* @param version version number * @param version version number
* @return delele result code * @return delete result code
*/ */
@Override @Override
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)

3
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -449,7 +449,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
return result; return result;
} }
} }
if (!processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) { int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs);
if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result; return result;
} }

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -125,15 +125,16 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result; return result;
} }
} }
if (processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs)) { int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs);
Map<String, Object> resData = new HashMap<>(); if (saveTaskResult == Constants.DEFINITION_FAILURE) {
resData.put("total", taskDefinitionLogs.size());
resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ","));
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
} else {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result;
} }
Map<String, Object> resData = new HashMap<>();
resData.put("total", taskDefinitionLogs.size());
resData.put("code", StringUtils.join(taskDefinitionLogs.stream().map(TaskDefinition::getCode).collect(Collectors.toList()), ","));
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData);
return result; return result;
} }

19
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -289,10 +289,10 @@ public class ProcessDefinitionServiceTest {
processDefinitionList.add(definition); processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2);
Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition( Map<String, Object> map3 = processDefinitionService.batchCopyProcessDefinition(
loginUser, projectCode, "46", 1L); loginUser, projectCode, "46", 1L);
Assert.assertEquals(Status.COPY_PROCESS_DEFINITION_ERROR, map3.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, map3.get(Constants.STATUS));
} }
@Test @Test
@ -316,16 +316,18 @@ public class ProcessDefinitionServiceTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project2, projectCode2)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project2, projectCode2)).thenReturn(result);
ProcessDefinition definition = getProcessDefinition(); ProcessDefinition definition = getProcessDefinition();
definition.setVersion(1);
List<ProcessDefinition> processDefinitionList = new ArrayList<>(); List<ProcessDefinition> processDefinitionList = new ArrayList<>();
processDefinitionList.add(definition); processDefinitionList.add(definition);
Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet()); Set<Long> definitionCodes = Arrays.stream("46".split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList); Mockito.when(processDefineMapper.queryByCodes(definitionCodes)).thenReturn(processDefinitionList);
Mockito.when(processService.saveProcessDefine(loginUser, definition, true)).thenReturn(2);
Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode)); Mockito.when(processTaskRelationMapper.queryByProcessCode(projectCode, 46L)).thenReturn(getProcessTaskRelation(projectCode));
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
Map<String, Object> successRes = processDefinitionService.batchMoveProcessDefinition( Map<String, Object> successRes = processDefinitionService.batchMoveProcessDefinition(
loginUser, projectCode, "46", projectCode2); loginUser, projectCode, "46", projectCode2);
Assert.assertEquals(Status.MOVE_PROCESS_DEFINITION_ERROR, successRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
} }
@Test @Test
@ -393,19 +395,12 @@ public class ProcessDefinitionServiceTest {
Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L); Map<String, Object> schedulerOnlineRes = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS)); Assert.assertEquals(Status.SCHEDULE_CRON_STATE_ONLINE, schedulerOnlineRes.get(Constants.STATUS));
//delete fail //delete success
schedules.clear(); schedules.clear();
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(ReleaseState.OFFLINE);
schedules.add(schedule); schedules.add(schedule);
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(0);
Map<String, Object> deleteFail = processDefinitionService.deleteProcessDefinitionByCode(loginUser, projectCode, 46L);
Assert.assertEquals(Status.DELETE_PROCESS_DEFINE_BY_CODE_ERROR, deleteFail.get(Constants.STATUS));
//delete success
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -419,7 +419,7 @@ public class ProcessInstanceServiceTest {
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root"); shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root");
Assert.assertEquals(Status.CREATE_TASK_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS)); Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS));
//success //success
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -96,7 +96,7 @@ public class TaskDefinitionServiceImplTest {
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0," + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]"; + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class); List<TaskDefinitionLog> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinitionLog.class);
Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(true); Mockito.when(processService.saveTaskDefine(loginUser, projectCode, taskDefinitions)).thenReturn(1);
Map<String, Object> relation = taskDefinitionService Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson); .createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));

12
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2151,7 +2151,7 @@ public class ProcessService {
return StringUtils.join(resourceIds, ","); return StringUtils.join(resourceIds, ",");
} }
public boolean saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) { public int saveTaskDefine(User operator, long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) {
Date now = new Date(); Date now = new Date();
List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>(); List<TaskDefinitionLog> newTaskDefinitionLogs = new ArrayList<>();
List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>(); List<TaskDefinitionLog> updateTaskDefinitionLogs = new ArrayList<>();
@ -2183,7 +2183,7 @@ public class ProcessService {
taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId()); taskDefinitionLog.setCode(SnowFlakeUtils.getInstance().nextId());
} catch (SnowFlakeException e) { } catch (SnowFlakeException e) {
logger.error("Task code get error, ", e); logger.error("Task code get error, ", e);
return false; return Constants.DEFINITION_FAILURE;
} }
} }
newTaskDefinitionLogs.add(taskDefinitionLog); newTaskDefinitionLogs.add(taskDefinitionLog);
@ -2196,17 +2196,15 @@ public class ProcessService {
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
taskDefinitionToUpdate.setId(task.getId()); taskDefinitionToUpdate.setId(task.getId());
int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate); int update = taskDefinitionMapper.updateById(taskDefinitionToUpdate);
if ((update & insert) != 1) { return update & insert;
return false;
}
} }
} }
if (!newTaskDefinitionLogs.isEmpty()) { if (!newTaskDefinitionLogs.isEmpty()) {
int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs); int insert = taskDefinitionMapper.batchInsert(newTaskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs); int logInsert = taskDefinitionLogMapper.batchInsert(newTaskDefinitionLogs);
return (logInsert & insert) != 0; return logInsert & insert;
} }
return true; return Constants.EXIT_CODE_SUCCESS;
} }
/** /**

Loading…
Cancel
Save