Browse Source

[Fix-6248] [API] fix processDefinnition release/processInstance update/taskDefinition transaction (#6288)

* fix processDefinition save/update/delete/move/import transaction

* fix ut

* fix processDefinnition release/processInstance update/taskDefinition transaction

* fix processDefinnition release/processInstance update/taskDefinition transaction

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinYong Li 3 years ago committed by GitHub
parent
commit
d7160874e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 18
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 37
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  5. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  6. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
  7. 3
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -26,7 +26,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_DEFINI
import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.IMPORT_PROCESS_DEFINE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.IMPORT_PROCESS_DEFINE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_VERSIONS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_DEFINITION_VERSIONS_ERROR;
@ -388,7 +388,7 @@ public class ProcessDefinitionController extends BaseController {
}) })
@GetMapping(value = "/{code}") @GetMapping(value = "/{code}")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR) @ApiException(QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result queryProcessDefinitionByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@ -411,7 +411,7 @@ public class ProcessDefinitionController extends BaseController {
}) })
@GetMapping(value = "/query-by-name") @GetMapping(value = "/query-by-name")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR) @ApiException(QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser") @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<ProcessDefinition> queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result<ProcessDefinition> queryProcessDefinitionByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode, @ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -136,7 +136,7 @@ public enum Status {
VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR(10106, "verify process definition name unique error", "工作流定义名称验证错误"), VERIFY_PROCESS_DEFINITION_NAME_UNIQUE_ERROR(10106, "verify process definition name unique error", "工作流定义名称验证错误"),
UPDATE_PROCESS_DEFINITION_ERROR(10107, "update process definition error", "更新工作流定义错误"), UPDATE_PROCESS_DEFINITION_ERROR(10107, "update process definition error", "更新工作流定义错误"),
RELEASE_PROCESS_DEFINITION_ERROR(10108, "release process definition error", "上线工作流错误"), RELEASE_PROCESS_DEFINITION_ERROR(10108, "release process definition error", "上线工作流错误"),
QUERY_DATAIL_OF_PROCESS_DEFINITION_ERROR(10109, "query datail of process definition error", "查询工作流详细信息错误"), QUERY_DETAIL_OF_PROCESS_DEFINITION_ERROR(10109, "query detail of process definition error", "查询工作流详细信息错误"),
QUERY_PROCESS_DEFINITION_LIST(10110, "query process definition list", "查询工作流列表错误"), QUERY_PROCESS_DEFINITION_LIST(10110, "query process definition list", "查询工作流列表错误"),
ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR(10111, "encapsulation treeview structure error", "查询工作流树形图数据错误"), ENCAPSULATION_TREEVIEW_STRUCTURE_ERROR(10111, "encapsulation treeview structure error", "查询工作流树形图数据错误"),
GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR(10112, "get tasks list by process definition id error", "查询工作流定义节点信息错误"), GET_TASKS_LIST_BY_PROCESS_DEFINITION_ID_ERROR(10112, "get tasks list by process definition id error", "查询工作流定义节点信息错误"),

18
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -637,7 +637,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} else if (schedules.size() == 1) { } else if (schedules.size() == 1) {
Schedule schedule = schedules.get(0); Schedule schedule = schedules.get(0);
if (schedule.getReleaseState() == ReleaseState.OFFLINE) { if (schedule.getReleaseState() == ReleaseState.OFFLINE) {
scheduleMapper.deleteById(schedule.getId()); int delete = scheduleMapper.deleteById(schedule.getId());
if (delete == 0) {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
} else if (schedule.getReleaseState() == ReleaseState.ONLINE) { } else if (schedule.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId()); putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE, schedule.getId());
return result; return result;
@ -702,16 +706,20 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
break; break;
case OFFLINE: case OFFLINE:
processDefinition.setReleaseState(releaseState); processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition); int updateProcess = processDefinitionMapper.updateById(processDefinition);
List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray( List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray(
new long[]{processDefinition.getCode()} new long[]{processDefinition.getCode()}
); );
if (updateProcess > 0 && scheduleList.size() == 1) {
for (Schedule schedule : scheduleList) { Schedule schedule = scheduleList.get(0);
logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code); logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code);
// set status // set status
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(ReleaseState.OFFLINE);
scheduleMapper.updateById(schedule); int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) {
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}
schedulerService.deleteSchedule(project.getId(), schedule.getId()); schedulerService.deleteSchedule(project.getId(), schedule.getId());
} }
break; break;

37
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -27,6 +27,7 @@ import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto; import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task; import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService;
@ -413,7 +414,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
* @param tenantCode tenantCode * @param tenantCode tenantCode
* @return update result code * @return update result code
*/ */
@Transactional @Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public Map<String, Object> updateProcessInstance(User loginUser, long projectCode, Integer processInstanceId, String taskRelationJson, public Map<String, Object> updateProcessInstance(User loginUser, long projectCode, Integer processInstanceId, String taskRelationJson,
String taskDefinitionJson, String scheduleTime, Boolean syncDefine, String globalParams, String taskDefinitionJson, String scheduleTime, Boolean syncDefine, String globalParams,
@ -451,8 +452,8 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
} }
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs);
if (saveTaskResult == Constants.DEFINITION_FAILURE) { if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
return result; throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} }
ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()); ProcessDefinition processDefinition = processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); List<ProcessTaskRelationLog> taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class);
@ -470,32 +471,37 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
} }
tenantId = tenant.getId(); tenantId = tenant.getId();
} }
ProcessDefinition processDefinitionDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinition), ProcessDefinition.class);
processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId); processDefinition.set(projectCode, processDefinition.getName(), processDefinition.getDescription(), globalParams, locations, timeout, tenantId);
processDefinition.setUpdateTime(new Date()); processDefinition.setUpdateTime(new Date());
int insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false); int insertVersion;
if (insertVersion > 0) { if (processDefinition.equals(processDefinitionDeepCopy)) {
insertVersion = processDefinitionDeepCopy.getVersion();
} else {
processDefinition.setUpdateTime(new Date());
insertVersion = processService.saveProcessDefine(loginUser, processDefinition, false);
}
if (insertVersion == 0) {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(), int insertResult = processService.saveTaskRelation(loginUser, processDefinition.getProjectCode(),
processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs); processDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs);
if (insertResult > 0) { if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition); result.put(Constants.DATA_LIST, processDefinition);
} else { } else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
return result; throw new ServiceException(Status.UPDATE_PROCESS_DEFINITION_ERROR);
}
} else {
putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR);
return result;
} }
processInstance.setProcessDefinitionVersion(insertVersion); processInstance.setProcessDefinitionVersion(insertVersion);
} }
int update = processService.updateProcessInstance(processInstance); int update = processService.updateProcessInstance(processInstance);
if (update > 0) { if (update == 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR); putMsg(result, Status.UPDATE_PROCESS_INSTANCE_ERROR);
throw new ServiceException(Status.UPDATE_PROCESS_INSTANCE_ERROR);
} }
putMsg(result, Status.SUCCESS);
return result; return result;
} }
@ -593,6 +599,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
} else { } else {
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR); putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
throw new ServiceException(Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
} }
return result; return result;

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.CheckUtils;
@ -128,7 +129,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs); int saveTaskResult = processService.saveTaskDefine(loginUser, projectCode, taskDefinitionLogs);
if (saveTaskResult == Constants.DEFINITION_FAILURE) { if (saveTaskResult == Constants.DEFINITION_FAILURE) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result; throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
} }
Map<String, Object> resData = new HashMap<>(); Map<String, Object> resData = new HashMap<>();
resData.put("total", taskDefinitionLogs.size()); resData.put("total", taskDefinitionLogs.size());
@ -171,7 +172,6 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
* @param projectCode project code * @param projectCode project code
* @param taskCode task code * @param taskCode task code
*/ */
@Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) { public Map<String, Object> deleteTaskDefinitionByCode(User loginUser, long projectCode, long taskCode) {
Project project = projectMapper.queryByCode(projectCode); Project project = projectMapper.queryByCode(projectCode);
@ -255,8 +255,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
taskDefinitionToUpdate.setCreateTime(now); taskDefinitionToUpdate.setCreateTime(now);
int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate); int insert = taskDefinitionLogMapper.insert(taskDefinitionToUpdate);
if ((update & insert) != 1) { if ((update & insert) != 1) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
return result; throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
} }
result.put(Constants.DATA_LIST, taskCode); result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update); putMsg(result, Status.SUCCESS, update);

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java

@ -400,6 +400,7 @@ public class ProcessDefinitionServiceTest {
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(ReleaseState.OFFLINE);
schedules.add(schedule); schedules.add(schedule);
Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1); Mockito.when(processDefineMapper.deleteById(46)).thenReturn(1);
Mockito.when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1); Mockito.when(processTaskRelationMapper.deleteByCode(project.getCode(), processDefinition.getCode())).thenReturn(1);
Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules); Mockito.when(scheduleMapper.queryByProcessDefinitionCode(46L)).thenReturn(schedules);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);

3
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -415,11 +415,12 @@ public class ProcessInstanceServiceTest {
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant); when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant); when(processService.getTenantForProcess(Mockito.anyInt(), Mockito.anyInt())).thenReturn(tenant);
when(processService.updateProcessInstance(processInstance)).thenReturn(1); when(processService.updateProcessInstance(processInstance)).thenReturn(1);
when(processService.saveProcessDefine(loginUser, processDefinition, false)).thenReturn(1);
when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result); when(processDefinitionService.checkProcessNodeList(shellJson)).thenReturn(result);
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1, Map<String, Object> processInstanceFinishRes = processInstanceService.updateProcessInstance(loginUser, projectCode, 1,
shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root"); shellJson, taskJson,"2020-02-21 00:00:00", true, "", "", 0, "root");
Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, processInstanceFinishRes.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS, processInstanceFinishRes.get(Constants.STATUS));
//success //success
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition); when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);

Loading…
Cancel
Save