Browse Source

[Improvement][API][num-1] save schedule when workflow is offline in interface 'createEmptyProcessDefinition' (#7047)

* [Improvement] save schedule when workflow is offline

* [Improvement] rollback transactional when creating/updating schedule failed.

* [Improvement] resolve merge conflict

* [Improvement] do not set projectName and processDefinitionName when saving schedule

* [Improvement] do not set projectName and processDefinitionName when saving schedule

Co-authored-by: edward-yang <yangjianh210@gmail.com>
3.0.0/version-upgrade
EdwardYang 3 years ago committed by GitHub
parent
commit
88cd37ff0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 75
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

75
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1596,9 +1596,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
// save dag schedule
Map<String, Object> scheduleResult = createDagSchedule(loginUser, projectCode, processDefinitionCode, scheduleJson);
Map<String, Object> scheduleResult = createDagSchedule(loginUser, project, processDefinition, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
return scheduleResult;
Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS);
putMsg(result, scheduleResultStatus);
throw new ServiceException(scheduleResultStatus);
}
return result;
}
@ -1616,40 +1618,48 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
}
private Map<String, Object> createDagSchedule(User loginUser,
long projectCode,
long processDefinitionCode,
Project project,
ProcessDefinition processDefinition,
String scheduleJson) {
Map<String, Object> result = new HashMap<>();
Schedule schedule = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (schedule == null) {
Schedule scheduleObj = JSONUtils.parseObject(scheduleJson, Schedule.class);
if (scheduleObj == null) {
putMsg(result, Status.DATA_IS_NOT_VALID, scheduleJson);
throw new ServiceException(Status.DATA_IS_NOT_VALID);
}
// set default value
FailureStrategy failureStrategy = schedule.getFailureStrategy() == null ? FailureStrategy.CONTINUE : schedule.getFailureStrategy();
WarningType warningType = schedule.getWarningType() == null ? WarningType.NONE : schedule.getWarningType();
Priority processInstancePriority = schedule.getProcessInstancePriority() == null ? Priority.MEDIUM : schedule.getProcessInstancePriority();
int warningGroupId = schedule.getWarningGroupId() == 0 ? 1 : schedule.getWarningGroupId();
String workerGroup = schedule.getWorkerGroup() == null ? "default" : schedule.getWorkerGroup();
long environmentCode = schedule.getEnvironmentCode() == null ? -1 : schedule.getEnvironmentCode();
ScheduleParam param = new ScheduleParam();
param.setStartTime(schedule.getStartTime());
param.setEndTime(schedule.getEndTime());
param.setCrontab(schedule.getCrontab());
param.setTimezoneId(schedule.getTimezoneId());
Date now = new Date();
scheduleObj.setProcessDefinitionCode(processDefinition.getCode());
if (DateUtils.differSec(scheduleObj.getStartTime(), scheduleObj.getEndTime()) == 0) {
logger.warn("The start time must not be the same as the end");
putMsg(result, Status.SCHEDULE_START_TIME_END_TIME_SAME);
return result;
}
if (!org.quartz.CronExpression.isValidExpression(scheduleObj.getCrontab())) {
logger.error("{} verify failure", scheduleObj.getCrontab());
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, scheduleObj.getCrontab());
return result;
}
scheduleObj.setWarningType(scheduleObj.getWarningType() == null ? WarningType.NONE : scheduleObj.getWarningType());
scheduleObj.setWarningGroupId(scheduleObj.getWarningGroupId() == 0 ? 1 : scheduleObj.getWarningGroupId());
scheduleObj.setFailureStrategy(scheduleObj.getFailureStrategy() == null ? FailureStrategy.CONTINUE : scheduleObj.getFailureStrategy());
scheduleObj.setCreateTime(now);
scheduleObj.setUpdateTime(now);
scheduleObj.setUserId(loginUser.getId());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setProcessInstancePriority(scheduleObj.getProcessInstancePriority() == null ? Priority.MEDIUM : scheduleObj.getProcessInstancePriority());
scheduleObj.setWorkerGroup(scheduleObj.getWorkerGroup() == null ? "default" : scheduleObj.getWorkerGroup());
scheduleObj.setEnvironmentCode(scheduleObj.getEnvironmentCode() == null ? -1 : scheduleObj.getEnvironmentCode());
scheduleMapper.insert(scheduleObj);
/**
* updateProcessInstance receivers and cc by process definition id
*/
processDefinition.setWarningGroupId(scheduleObj.getWarningGroupId());
processDefinitionMapper.updateById(processDefinition);
return schedulerService.insertSchedule(
loginUser,
projectCode,
processDefinitionCode,
JSONUtils.toJsonString(param),
warningType,
warningGroupId,
failureStrategy,
processInstancePriority,
workerGroup,
environmentCode);
putMsg(result, Status.SUCCESS);
result.put("scheduleId", scheduleObj.getId());
return result;
}
/**
@ -1668,6 +1678,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
* @return update result code
*/
@Override
@Transactional(rollbackFor = RuntimeException.class)
public Map<String, Object> updateProcessDefinitionBasicInfo(User loginUser,
long projectCode,
String name,
@ -1728,7 +1739,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
// update dag schedule
Map<String, Object> scheduleResult = updateDagSchedule(loginUser, projectCode, code, scheduleJson);
if (scheduleResult.get(Constants.STATUS) != Status.SUCCESS) {
return scheduleResult;
Status scheduleResultStatus = (Status) scheduleResult.get(Constants.STATUS);
putMsg(result, scheduleResultStatus);
throw new ServiceException(scheduleResultStatus);
}
return result;
}

Loading…
Cancel
Save