Browse Source

add processDefinition releaseWorkflowAndSchedule (#7044)

3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
1e9973299f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
  3. 105
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

24
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -813,28 +813,4 @@ public class ProcessDefinitionController extends BaseController {
@RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) { @RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) {
return returnDataList(processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState)); return returnDataList(processDefinitionService.releaseWorkflowAndSchedule(loginUser, projectCode, code, releaseState));
} }
/**
* delete process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @return update result code
*/
@ApiOperation(value = "deleteWorkflowAndSchedule", notes = "DELETE_WORKFLOW_SCHEDULE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROCESS_DEFINITION_NAME", required = true, type = "Long"),
@ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789")
})
@DeleteMapping(value = "/{code}/delete-workflow")
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_PROCESS_DEFINE_BY_CODE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result deleteWorkflowAndSchedule(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code", required = true) long code) {
return returnDataList(processDefinitionService.deleteWorkflowAndSchedule(loginUser, projectCode, code));
}
} }

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java

@ -397,17 +397,5 @@ public interface ProcessDefinitionService {
long projectCode, long projectCode,
long code, long code,
ReleaseState releaseState); ReleaseState releaseState);
/**
* delete process definition and schedule
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @return update result code
*/
Map<String, Object> deleteWorkflowAndSchedule(User loginUser,
long projectCode,
long code);
} }

105
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -75,6 +75,7 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
@ -719,23 +720,28 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
} }
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
switch (releaseState) { switch (releaseState) {
case ONLINE: case ONLINE:
List<ProcessTaskRelation> relationList = processService.findRelationByCode(projectCode, code);
if (CollectionUtils.isEmpty(relationList)) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
processDefinition.setReleaseState(releaseState); processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition); processDefinitionMapper.updateById(processDefinition);
break; break;
case OFFLINE: case OFFLINE:
processDefinition.setReleaseState(releaseState); processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition); int updateProcess = processDefinitionMapper.updateById(processDefinition);
List<Schedule> scheduleList = scheduleMapper.selectAllByProcessDefineArray( Schedule schedule = scheduleMapper.queryByProcessDefinitionCode(code);
new long[]{processDefinition.getCode()} if (updateProcess > 0 && schedule != null) {
); logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}", projectCode, schedule.getId(), code);
if (updateProcess > 0 && scheduleList.size() == 1) {
Schedule schedule = scheduleList.get(0);
logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code);
// set status // set status
schedule.setReleaseState(ReleaseState.OFFLINE); schedule.setReleaseState(releaseState);
int updateSchedule = scheduleMapper.updateById(schedule); int updateSchedule = scheduleMapper.updateById(schedule);
if (updateSchedule == 0) { if (updateSchedule == 0) {
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR); putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
@ -1585,7 +1591,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result; return result;
} }
if (scheduleJson == null || scheduleJson.trim().isEmpty()) { if (StringUtils.isBlank(scheduleJson)) {
return result; return result;
} }
@ -1613,14 +1619,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
long projectCode, long projectCode,
long processDefinitionCode, long processDefinitionCode,
String scheduleJson) { String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class); Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (schedule == null) {
putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
// set default value // set default value
FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy(); FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType(); WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority(); Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId(); int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup(); String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
ScheduleParam param = new ScheduleParam(); ScheduleParam param = new ScheduleParam();
param.setStartTime(schedule.getStartTime()); param.setStartTime(schedule.getStartTime());
@ -1711,7 +1722,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result; return result;
} }
if (scheduleJson == null || scheduleJson.trim().isEmpty()) { if (StringUtils.isBlank(scheduleJson)) {
return result; return result;
} }
// update dag schedule // update dag schedule
@ -1746,14 +1757,19 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
long projectCode, long projectCode,
long processDefinitionCode, long processDefinitionCode,
String scheduleJson) { String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class); Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (schedule == null) {
putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
// set default value // set default value
FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy(); FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType(); WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority(); Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId(); int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup(); String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
Long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode(); long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
ScheduleParam param = new ScheduleParam(); ScheduleParam param = new ScheduleParam();
param.setStartTime(schedule.getStartTime()); param.setStartTime(schedule.getStartTime());
@ -1783,21 +1799,62 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @param releaseState releaseState * @param releaseState releaseState
* @return update result code * @return update result code
*/ */
@Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long projectCode, long code, ReleaseState releaseState) { public Map<String, Object> releaseWorkflowAndSchedule(User loginUser, long projectCode, long code, ReleaseState releaseState) {
return null; Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check state
if (null == releaseState) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
} }
/** ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
* delete process definition and schedule if (processDefinition == null) {
* putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
* @param loginUser login user return result;
* @param projectCode project code }
* @param code process definition code Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(code);
* @return update result code if (scheduleObj == null) {
*/ putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, "processDefinitionCode:" + code);
@Override return result;
public Map<String, Object> deleteWorkflowAndSchedule(User loginUser, long projectCode, long code) { }
return null; switch (releaseState) {
case ONLINE:
List<ProcessTaskRelation> relationList = processService.findRelationByCode(projectCode, code);
if (CollectionUtils.isEmpty(relationList)) {
putMsg(result, Status.PROCESS_DAG_IS_EMPTY);
return result;
}
processDefinition.setReleaseState(releaseState);
processDefinitionMapper.updateById(processDefinition);
scheduleMapper.updateById(scheduleObj);
break;
case OFFLINE:
processDefinition.setReleaseState(releaseState);
int updateProcess = processDefinitionMapper.updateById(processDefinition);
if (updateProcess > 0) {
logger.info("set schedule offline, project code: {}, schedule id: {}, process definition code: {}", projectCode, scheduleObj.getId(), code);
// set status
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
int updateSchedule = scheduleMapper.updateById(scheduleObj);
if (updateSchedule == 0) {
putMsg(result, Status.OFFLINE_SCHEDULE_ERROR);
throw new ServiceException(Status.OFFLINE_SCHEDULE_ERROR);
}
schedulerService.deleteSchedule(project.getId(), scheduleObj.getId());
}
break;
default:
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, RELEASESTATE);
return result;
}
putMsg(result, Status.SUCCESS);
return result;
} }
} }

Loading…
Cancel
Save