From 6ed16056808bca20ef562363de93193d4885df88 Mon Sep 17 00:00:00 2001 From: Jay Chung Date: Wed, 16 Nov 2022 21:23:39 +0800 Subject: [PATCH] [chore][python] Change name from process definition to workflow (#12918) only change its name in python gateway server code, incluing * Function name: all related to process definition * Parameter name and comment related ref: apache/dolphinscheduler-sdk-python#22 (cherry picked from commit f20c9b3102503a1306d5fa3504ddce56a76d58ab) --- .../api/python/PythonGateway.java | 170 +++++++++--------- 1 file changed, 90 insertions(+), 80 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index b79eaf307e..7fb225268c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -82,7 +82,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import py4j.GatewayServer; @Component public class PythonGateway { @@ -183,8 +182,10 @@ public class PythonGateway { return result; } - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); - // In the case project exists, but current process definition still not created, we should also return the init version of it + ProcessDefinition processDefinition = + processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + // In the case project exists, but current workflow still not created, we should also return the init + // version of it if (processDefinition == null) { result.put("code", CodeGenerateUtils.getInstance().genCode()); result.put("version", 0L); @@ -203,20 +204,20 @@ public class PythonGateway { } /** - * create or update process definition. - * If process definition do not exists in Project=`projectCode` would create a new one - * If process definition already exists in Project=`projectCode` would update it + * create or update workflow. + * If workflow do not exists in Project=`projectCode` would create a new one + * If workflow already exists in Project=`projectCode` would update it * - * @param userName user name who create or update process definition - * @param projectName project name which process definition belongs to - * @param name process definition name + * @param userName user name who create or update workflow + * @param projectName project name which workflow belongs to + * @param name workflow name * @param description description * @param globalParams global params - * @param schedule schedule for process definition, will not set schedule if null, + * @param schedule schedule for workflow, will not set schedule if null, * and if would always fresh exists schedule if not null * @param warningType warning type * @param warningGroupId warning group id - * @param timeout timeout for process definition working, if running time longer than timeout, + * @param timeout timeout for workflow working, if running time longer than timeout, * task will mark as fail * @param workerGroup run task in which worker group * @param tenantCode tenantCode @@ -225,33 +226,33 @@ public class PythonGateway { * @param otherParamsJson otherParamsJson handle other params * @return create result code */ - public Long createOrUpdateProcessDefinition(String userName, - String projectName, - String name, - String description, - String globalParams, - String schedule, - String warningType, - int warningGroupId, - int timeout, - String workerGroup, - String tenantCode, - int releaseState, - String taskRelationJson, - String taskDefinitionJson, - String otherParamsJson, - String executionType) { + public Long createOrUpdateWorkflow(String userName, + String projectName, + String name, + String description, + String globalParams, + String schedule, + String warningType, + int warningGroupId, + int timeout, + String workerGroup, + String tenantCode, + int releaseState, + String taskRelationJson, + String taskDefinitionJson, + String otherParamsJson, + String executionType) { User user = usersService.queryUser(userName); Project project = projectMapper.queryByName(projectName); long projectCode = project.getCode(); - ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); + ProcessDefinition processDefinition = getWorkflow(user, projectCode, name); ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType); long processDefinitionCode; - // create or update process definition + // create or update workflow if (processDefinition != null) { processDefinitionCode = processDefinition.getCode(); - // make sure process definition offline which could edit + // make sure workflow offline which could edit processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); processDefinitionService.updateProcessDefinition(user, projectCode, name, @@ -267,7 +268,7 @@ public class PythonGateway { processDefinitionCode = processDefinition.getCode(); } - // Fresh process definition schedule + // Fresh workflow schedule if (schedule != null) { createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); } @@ -276,21 +277,23 @@ public class PythonGateway { } /** - * get process definition + * get workflow * * @param user user who create or update schedule - * @param projectCode project which process definition belongs to - * @param processDefinitionName process definition name + * @param projectCode project which workflow belongs to + * @param workflowName workflow name */ - private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) { - Map verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0); + private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) { + Map verifyProcessDefinitionExists = + processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0); Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS); ProcessDefinition processDefinition = null; if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) { - processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName); } else if (verifyStatus != Status.SUCCESS) { - String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; + String msg = + "Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST."; logger.error(msg); throw new RuntimeException(msg); } @@ -299,13 +302,13 @@ public class PythonGateway { } /** - * create or update process definition schedule. + * create or update workflow schedule. * It would always use latest schedule define in workflow-as-code, and set schedule online when * it's not null * * @param user user who create or update schedule - * @param projectCode project which process definition belongs to - * @param processDefinitionCode process definition code + * @param projectCode project which workflow belongs to + * @param workflowCode workflow code * @param schedule schedule expression * @param workerGroup work group * @param warningType warning type @@ -313,43 +316,47 @@ public class PythonGateway { */ private void createOrUpdateSchedule(User user, long projectCode, - long processDefinitionCode, + long workflowCode, String schedule, String workerGroup, String warningType, int warningGroupId) { - Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); + Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode); // create or update schedule int scheduleId; if (scheduleObj == null) { - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.ONLINE); - Map result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, schedule, WarningType.valueOf(warningType), + processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, + ReleaseState.ONLINE); + Map result = schedulerService.insertSchedule(user, projectCode, workflowCode, + schedule, WarningType.valueOf(warningType), warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); scheduleId = (int) result.get("scheduleId"); } else { scheduleId = scheduleObj.getId(); - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); + processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, + ReleaseState.OFFLINE); schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType), warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); } schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); } - public void execProcessInstance(String userName, - String projectName, - String processDefinitionName, - String cronTime, - String workerGroup, - String warningType, - int warningGroupId, - Integer timeout - ) { + public void execWorkflowInstance(String userName, + String projectName, + String workflowName, + String cronTime, + String workerGroup, + String warningType, + Integer warningGroupId, + Integer timeout) { User user = usersService.queryUser(userName); Project project = projectMapper.queryByName(projectName); - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + ProcessDefinition processDefinition = + processDefinitionMapper.queryByDefineName(project.getCode(), workflowName); - // make sure process definition online - processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE); + // make sure workflow online + processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), + ReleaseState.ONLINE); executorService.execProcessInstance(user, project.getCode(), @@ -375,8 +382,8 @@ public class PythonGateway { // side object /* - Grant project's permission to user. Use when project's created user not current but - Python API use it to change process definition. + * Grant project's permission to user. Use when project's created user not current but Python API use it to change + * workflow. */ private Integer grantProjectToUser(Project project, User user) { Date now = new Date(); @@ -492,29 +499,31 @@ public class PythonGateway { } /** - * Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code. - * Useful in Python API create subProcess task which need processDefinition information. + * Get workflow object by given workflow name. It returns map contain workflow id, name, code. + * Useful in Python API create subProcess task which need workflow information. * * @param userName user who create or update schedule - * @param projectName project name which process definition belongs to - * @param processDefinitionName process definition name + * @param projectName project name which workflow belongs to + * @param workflowName workflow name */ - public Map getProcessDefinitionInfo(String userName, String projectName, String processDefinitionName) { + public Map getWorkflowInfo(String userName, String projectName, + String workflowName) { Map result = new HashMap<>(); User user = usersService.queryUser(userName); Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); long projectCode = project.getCode(); - ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName); - // get process definition info + ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName); + // get workflow info if (processDefinition != null) { - // make sure process definition online - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), ReleaseState.ONLINE); + // make sure workflow online + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), + ReleaseState.ONLINE); result.put("id", processDefinition.getId()); result.put("name", processDefinition.getName()); result.put("code", processDefinition.getCode()); } else { - String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + String msg = String.format("Can not find valid workflow by name %s", workflowName); logger.error(msg); throw new IllegalArgumentException(msg); } @@ -523,14 +532,14 @@ public class PythonGateway { } /** - * Get project, process definition, task code. - * Useful in Python API create dependent task which need processDefinition information. + * Get project, workflow, task code. + * Useful in Python API create dependent task which need workflow information. * - * @param projectName project name which process definition belongs to - * @param processDefinitionName process definition name + * @param projectName project name which workflow belongs to + * @param workflowName workflow name * @param taskName task name */ - public Map getDependentInfo(String projectName, String processDefinitionName, String taskName) { + public Map getDependentInfo(String projectName, String workflowName, String taskName) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -542,9 +551,10 @@ public class PythonGateway { long projectCode = project.getCode(); result.put("projectCode", projectCode); - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + ProcessDefinition processDefinition = + processDefinitionMapper.queryByDefineName(projectCode, workflowName); if (processDefinition == null) { - String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + String msg = String.format("Can not find valid workflow by name %s", workflowName); logger.error(msg); throw new IllegalArgumentException(msg); } @@ -558,8 +568,8 @@ public class PythonGateway { } /** - * Get resource by given program type and full name. It return map contain resource id, name. - * Useful in Python API create flink or spark task which need processDefinition information. + * Get resource by given program type and full name. It returns map contain resource id, name. + * Useful in Python API create flink or spark task which need workflow information. * * @param programType program type one of SCALA, JAVA and PYTHON * @param fullName full name of the resource @@ -602,7 +612,7 @@ public class PythonGateway { /** * Get resource by given resource type and full name. It return map contain resource id, name. - * Useful in Python API create task which need processDefinition information. + * Useful in Python API create task which need workflow information. * * @param userName user who query resource * @param fullName full name of the resource