From f480b8adf8f86b7bf1d6727385c8fad8307a452b Mon Sep 17 00:00:00 2001 From: Devosend Date: Fri, 3 Dec 2021 10:26:45 +0800 Subject: [PATCH] [Feature-6930] [Python]Add workflow as code task type sub_process (#7022) * [python] add subProcess task * refactor python gateway server and task for subprocess * add function comment for getProcessDefinition * change process_definition usage of Subprocess task --- .../src/pydolphinscheduler/constants.py | 1 + .../src/pydolphinscheduler/exceptions.py | 4 + .../pydolphinscheduler/tasks/sub_process.py | 76 +++++++++++++++ .../tests/tasks/test_sub_process.py | 97 +++++++++++++++++++ .../server/PythonGatewayServer.java | 78 ++++++++++++--- 5 files changed, 240 insertions(+), 16 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index ca0f368e0a..72034598a8 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -71,6 +71,7 @@ class TaskType(str): HTTP = "HTTP" PYTHON = "PYTHON" SQL = "SQL" + SUB_PROCESS = "SUB_PROCESS" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py index 2acd79ddb9..745ef3e99b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py @@ -40,3 +40,7 @@ class PyDSJavaGatewayException(PyDSBaseException): """Exception for pydolphinscheduler Java gateway error.""" pass + + +class PyDSProcessDefinitionNotAssignException(PyDSBaseException): + """Exception for pydolphinscheduler process definition not assign error.""" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py new file mode 100644 index 0000000000..1bf0bd1136 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task sub_process.""" + +from typing import Dict + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.process_definition import ProcessDefinitionContext +from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException +from pydolphinscheduler.java_gateway import launch_gateway + + +class SubProcessTaskParams(TaskParams): + """Parameter only for Sub Process task type.""" + + def __init__(self, process_definition_code, *args, **kwargs): + super().__init__(*args, **kwargs) + self.process_definition_code = process_definition_code + + +class SubProcess(Task): + """Task SubProcess object, declare behavior for SubProcess task to dolphinscheduler.""" + + def __init__(self, name: str, process_definition_name: str, *args, **kwargs): + self._process_definition_name = process_definition_name + self._process_definition_info = {} + # TODO: Optimize the way of obtaining process_definition + self.process_definition = kwargs.get( + "process_definition", ProcessDefinitionContext.get() + ) + if not self.process_definition: + raise PyDSProcessDefinitionNotAssignException( + "ProcessDefinition must be provider when SubProcess initialization." + ) + + task_params = SubProcessTaskParams( + process_definition_code=self.get_process_definition_code(), + ) + super().__init__(name, TaskType.SUB_PROCESS, task_params, *args, **kwargs) + + def get_process_definition_code(self) -> str: + """Get process definition code, a wrapper for :func:`get_process_definition_info`.""" + return self.get_process_definition_info(self._process_definition_name).get( + "code" + ) + + def get_process_definition_info(self, process_definition_name: str) -> Dict: + """Get process definition info from java gateway, contains process definition id, name, code.""" + if self._process_definition_info: + return self._process_definition_info + else: + gateway = launch_gateway() + self._process_definition_info = ( + gateway.entry_point.getProcessDefinitionInfo( + self.process_definition.user.name, + self.process_definition.project.name, + process_definition_name, + ) + ) + return self._process_definition_info diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py new file mode 100644 index 0000000000..4a5388a086 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task sub_process.""" + + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.sub_process import SubProcess, SubProcessTaskParams + +TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition" +TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320" +TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition" + + +@pytest.mark.parametrize( + "name, value", + [ + ("local_params", "local_params"), + ("resource_list", "resource_list"), + ("dependence", "dependence"), + ("wait_start_timeout", "wait_start_timeout"), + ("condition_result", "condition_result"), + ], +) +def test_sub_process_task_params_attr_setter(name, value): + """Test sub_process task parameters.""" + process_definition_code = "3643589832320" + sub_process_task_params = SubProcessTaskParams(process_definition_code) + assert process_definition_code == sub_process_task_params.process_definition_code + setattr(sub_process_task_params, name, value) + assert value == getattr(sub_process_task_params, name) + + +@patch( + "pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info", + return_value=( + { + "id": 1, + "name": TEST_SUB_PROCESS_DEFINITION_NAME, + "code": TEST_SUB_PROCESS_DEFINITION_CODE, + } + ), +) +def test_sub_process_to_dict(mock_process_definition): + """Test task sub_process function to_dict.""" + code = 123 + version = 1 + name = "test_sub_process_to_dict" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "SUB_PROCESS", + "taskParams": { + "resourceList": [], + "localParams": [], + "processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE, + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME): + sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME) + assert sub_process.to_dict() == expect diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index 531c55b1c1..46eb717339 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -79,7 +79,7 @@ import py4j.GatewayServer; }) public class PythonGatewayServer extends SpringBootServletInitializer { private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class); - + private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE; private static final int DEFAULT_WARNING_GROUP_ID = 0; private static final FailureStrategy DEFAULT_FAILURE_STRATEGY = FailureStrategy.CONTINUE; @@ -212,29 +212,22 @@ public class PythonGatewayServer extends SpringBootServletInitializer { User user = usersService.queryUser(userName); Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); long projectCode = project.getCode(); - Map verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name); - Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS); - + ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); long processDefinitionCode; // create or update process definition - if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) { - ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name); + if (processDefinition != null) { processDefinitionCode = processDefinition.getCode(); // make sure process definition offline which could edit processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); Map result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, - locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); - } else if (verifyStatus == Status.SUCCESS) { + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson, executionType); + } else { Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, - locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson,executionType); - ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson, executionType); + processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); processDefinitionCode = processDefinition.getCode(); - } else { - String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; - LOGGER.error(msg); - throw new RuntimeException(msg); } - + // Fresh process definition schedule if (schedule != null) { createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup); @@ -243,6 +236,28 @@ public class PythonGatewayServer extends SpringBootServletInitializer { return processDefinitionCode; } + /** + * get process definition + * @param user user who create or update schedule + * @param projectCode project which process definition belongs to + * @param processDefinitionName process definition name + */ + private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) { + Map verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName); + Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS); + + ProcessDefinition processDefinition = null; + if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) { + processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + } else if (verifyStatus != Status.SUCCESS) { + String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; + LOGGER.error(msg); + throw new RuntimeException(msg); + } + + return processDefinition; + } + /** * create or update process definition schedule. * It would always use latest schedule define in workflow-as-code, and set schedule online when @@ -369,7 +384,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer { * Get datasource by given datasource name. It return map contain datasource id, type, name. * Useful in Python API create sql task which need datasource information. * - * @param datasourceName user who create or update schedule + * @param datasourceName user who create or update schedule */ public Map getDatasourceInfo(String datasourceName) { Map result = new HashMap<>(); @@ -391,6 +406,37 @@ public class PythonGatewayServer extends SpringBootServletInitializer { return result; } + /** + * Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code. + * Useful in Python API create subProcess task which need processDefinition information. + * + * @param userName user who create or update schedule + * @param projectName project name which process definition belongs to + * @param processDefinitionName process definition name + */ + public Map getProcessDefinitionInfo(String userName, String projectName, String processDefinitionName) { + Map result = new HashMap<>(); + + User user = usersService.queryUser(userName); + Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); + long projectCode = project.getCode(); + ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName); + // get process definition info + if (processDefinition != null) { + // make sure process definition online + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), ReleaseState.ONLINE); + result.put("id", processDefinition.getId()); + result.put("name", processDefinition.getName()); + result.put("code", processDefinition.getCode()); + } else { + String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + + return result; + } + @PostConstruct public void run() { GatewayServer server = new GatewayServer(this);