Browse Source

[feat] Support set execute type to pydolphinscheduler (#12871)

Up to now, we can only submit workflow with parallel mode. this patch give users ability specific execute type to workflow
3.2.0-release
Jay Chung 2 years ago committed by GitHub
parent
commit
87a88e3662
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

11
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -245,12 +245,13 @@ public class PythonGateway {
String taskRelationJson, String taskRelationJson,
String taskDefinitionJson, String taskDefinitionJson,
String otherParamsJson, String otherParamsJson,
ProcessExecutionTypeEnum executionType) { String executionType) {
User user = usersService.queryUser(userName); User user = usersService.queryUser(userName);
Project project = projectMapper.queryByName(projectName); Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode(); long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType);
long processDefinitionCode; long processDefinitionCode;
// create or update process definition // create or update process definition
if (processDefinition != null) { if (processDefinition != null) {
@ -258,13 +259,15 @@ public class PythonGateway {
// make sure process definition offline which could edit // make sure process definition offline which could edit
processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode,
ReleaseState.OFFLINE); ReleaseState.OFFLINE);
Map<String, Object> result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionService.updateProcessDefinition(user, projectCode, name,
processDefinitionCode, description, globalParams, processDefinitionCode, description, globalParams,
null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType); null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson,
executionTypeEnum);
} else { } else {
Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name, Map<String, Object> result = processDefinitionService.createProcessDefinition(user, projectCode, name,
description, globalParams, description, globalParams,
null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType); null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson,
executionTypeEnum);
processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST);
processDefinitionCode = processDefinition.getCode(); processDefinitionCode = processDefinition.getCode();
} }

Loading…
Cancel
Save