From 696d8ae7f1357d3c10d781ade4c67d5a53cb2e27 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Sat, 12 Nov 2022 10:31:08 +0800 Subject: [PATCH] [feat] Support set execute type to pydolphinscheduler (#12871) Up to now, we can only submit workflow with parallel mode. this patch give users ability specific execute type to workflow (cherry picked from commit 87a88e36627017607c73cfc66a92be08d1da22ee) --- .../api/python/PythonGateway.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 00a817a569..d9f0c78674 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -234,28 +234,34 @@ public class PythonGateway { String taskRelationJson, String taskDefinitionJson, String otherParamsJson, - ProcessExecutionTypeEnum executionType) { + String executionType) { User user = usersService.queryUser(userName); Project project = projectMapper.queryByName(projectName); long projectCode = project.getCode(); ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); + ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType); long processDefinitionCode; // create or update process definition if (processDefinition != null) { processDefinitionCode = processDefinition.getCode(); // make sure process definition offline which could edit - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); - Map result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, - null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType); + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, + ReleaseState.OFFLINE); + processDefinitionService.updateProcessDefinition(user, projectCode, name, + processDefinitionCode, description, globalParams, + null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, + executionTypeEnum); } else { - Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, - null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, executionType); + Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, + description, globalParams, + null, timeout, tenantCode, taskRelationJson, taskDefinitionJson, otherParamsJson, + executionTypeEnum); processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); processDefinitionCode = processDefinition.getCode(); } - // Fresh process definition schedule + // Fresh process definition schedule if (schedule != null) { createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); }