Browse Source

feat: Python add online_schedule for workflow control schedule state (#13551)

related: apache/dolphinscheduler-sdk-python#73
3.1.5-release
Jay Chung 2 years ago
parent
commit
9d2140a651
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -215,6 +215,7 @@ public class PythonGateway {
* @param globalParams global params * @param globalParams global params
* @param schedule schedule for workflow, will not set schedule if null, * @param schedule schedule for workflow, will not set schedule if null,
* and if would always fresh exists schedule if not null * and if would always fresh exists schedule if not null
* @param onlineSchedule Whether set the workflow's schedule to online state
* @param warningType warning type * @param warningType warning type
* @param warningGroupId warning group id * @param warningGroupId warning group id
* @param timeout timeout for workflow working, if running time longer than timeout, * @param timeout timeout for workflow working, if running time longer than timeout,
@ -231,6 +232,7 @@ public class PythonGateway {
String description, String description,
String globalParams, String globalParams,
String schedule, String schedule,
boolean onlineSchedule,
String warningType, String warningType,
int warningGroupId, int warningGroupId,
int timeout, int timeout,
@ -272,7 +274,9 @@ public class PythonGateway {
// Fresh workflow schedule // Fresh workflow schedule
if (schedule != null) { if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, onlineSchedule, workerGroup,
warningType,
warningGroupId);
} }
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.getEnum(releaseState)); processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.getEnum(releaseState));
return processDefinitionCode; return processDefinitionCode;
@ -312,6 +316,7 @@ public class PythonGateway {
* @param projectCode project which workflow belongs to * @param projectCode project which workflow belongs to
* @param workflowCode workflow code * @param workflowCode workflow code
* @param schedule schedule expression * @param schedule schedule expression
* @param onlineSchedule Whether set the workflow's schedule to online state
* @param workerGroup work group * @param workerGroup work group
* @param warningType warning type * @param warningType warning type
* @param warningGroupId warning group id * @param warningGroupId warning group id
@ -320,6 +325,7 @@ public class PythonGateway {
long projectCode, long projectCode,
long workflowCode, long workflowCode,
String schedule, String schedule,
boolean onlineSchedule,
String workerGroup, String workerGroup,
String warningType, String warningType,
int warningGroupId) { int warningGroupId) {
@ -340,9 +346,11 @@ public class PythonGateway {
schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType), schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType),
warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE);
} }
// Always set workflow online to set schedule online if (onlineSchedule) {
processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE); // set workflow online to make sure we can set schedule online
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE);
schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE);
}
} }
public void execWorkflowInstance(String userName, public void execWorkflowInstance(String userName,

Loading…
Cancel
Save