From 5289b09817396046489b28857ad54a85c5ffddb2 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Thu, 24 Mar 2022 15:24:02 +0800 Subject: [PATCH] [python] Fix change exists pd attribute user error (#9140) * [python] Fix change exists pd attribute user error * Remove attribute from class ProcessDefinition, cause it should to object User instead of object ProcessDefinition. * Grant project to user if attribute user change for exists ProcessDefinition close: #8751 * Add py.test conftest.py for package integration --- .../pydolphinscheduler/UPDATING.md | 1 + .../pydolphinscheduler/pytest.ini | 5 +- .../core/process_definition.py | 22 ++++---- .../src/pydolphinscheduler/side/project.py | 2 +- .../src/pydolphinscheduler/side/user.py | 21 +++++--- .../tests/core/test_process_definition.py | 3 -- .../tests/integration/conftest.py | 46 +++++++++++++++++ .../{ => integration}/test_java_gateway.py | 7 +-- .../integration/test_process_definition.py | 50 +++++++++++++++++++ .../tests/integration/test_submit_examples.py | 19 +------ .../server/PythonGatewayServer.java | 42 ++++++++++++++-- 11 files changed, 167 insertions(+), 51 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py rename dolphinscheduler-python/pydolphinscheduler/tests/{ => integration}/test_java_gateway.py (90%) create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py diff --git a/dolphinscheduler-python/pydolphinscheduler/UPDATING.md b/dolphinscheduler-python/pydolphinscheduler/UPDATING.md index b5d69cd9d7..cf45c097fb 100644 --- a/dolphinscheduler-python/pydolphinscheduler/UPDATING.md +++ b/dolphinscheduler-python/pydolphinscheduler/UPDATING.md @@ -24,5 +24,6 @@ It started after version 2.0.5 released ## dev +* Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work * Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic. * Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment. diff --git a/dolphinscheduler-python/pydolphinscheduler/pytest.ini b/dolphinscheduler-python/pydolphinscheduler/pytest.ini index 9ed8ccf2b6..b1aa850346 100644 --- a/dolphinscheduler-python/pydolphinscheduler/pytest.ini +++ b/dolphinscheduler-python/pydolphinscheduler/pytest.ini @@ -14,11 +14,8 @@ # limitations under the License. [pytest] -# Do not test test_java_gateway.py due to we can not mock java gateway for now -addopts = --ignore=tests/test_java_gateway.py - # add path here to skip pytest scan it norecursedirs = tests/testing - # Integration test run seperated which do not calculate coverage + # Integration test run seperated which do not calculate coverage, it will run in `tox -e integrate-test` tests/integration diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index dd2b83a632..7615226de6 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -55,6 +55,14 @@ class ProcessDefinition(Base): """process definition object, will define process definition attribute, task, relation. TODO: maybe we should rename this class, currently use DS object name. + + :param user: The user for current process definition. Will create a new one if it do not exists. If your + parameter ``project`` already exists but project's create do not belongs to ``user``, will grant + ``project`` to ``user`` automatically. + :param project: The project for current process definition. You could see the workflow in this project + thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to + ``user`` if it does not exists. And when ``project`` exists but project's create do not belongs + to ``user``, will grant `project` to ``user`` automatically. """ # key attribute for identify ProcessDefinition object @@ -91,7 +99,6 @@ class ProcessDefinition(Base): user: Optional[str] = configuration.WORKFLOW_USER, project: Optional[str] = configuration.WORKFLOW_PROJECT, tenant: Optional[str] = configuration.WORKFLOW_TENANT, - queue: Optional[str] = configuration.WORKFLOW_QUEUE, worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP, timeout: Optional[int] = 0, release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, @@ -105,7 +112,6 @@ class ProcessDefinition(Base): self._user = user self._project = project self._tenant = tenant - self._queue = queue self.worker_group = worker_group self.timeout = timeout self.release_state = release_state @@ -148,15 +154,7 @@ class ProcessDefinition(Base): For now we just get from python side but not from java gateway side, so it may not correct. """ - return User( - self._user, - configuration.USER_PASSWORD, - configuration.USER_EMAIL, - configuration.USER_PHONE, - self._tenant, - self._queue, - configuration.USER_STATE, - ) + return User(name=self._user, tenant=self._tenant) @staticmethod def _parse_datetime(val: Any) -> Any: @@ -331,8 +329,6 @@ class ProcessDefinition(Base): :class:`pydolphinscheduler.constants.ProcessDefinitionDefault`. """ # TODO used metaclass for more pythonic - self.tenant.create_if_not_exists(self._queue) - # model User have to create after Tenant created self.user.create_if_not_exists() # Project model need User object exists self.project.create_if_not_exists(self._user) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py index b568cb4abf..750e3b8793 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py @@ -37,6 +37,6 @@ class Project(BaseSide): def create_if_not_exists(self, user=configuration.USER_NAME) -> None: """Create Project if not exists.""" gateway = launch_gateway() - gateway.entry_point.createProject(user, self.name, self.description) + gateway.entry_point.createOrGrantProject(user, self.name, self.description) # TODO recover result checker # gateway_result_checker(result, None) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py index cd0145aea7..510e3a8b36 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py @@ -19,8 +19,10 @@ from typing import Optional +from pydolphinscheduler.core import configuration from pydolphinscheduler.core.base_side import BaseSide from pydolphinscheduler.java_gateway import launch_gateway +from pydolphinscheduler.side.tenant import Tenant class User(BaseSide): @@ -39,12 +41,12 @@ class User(BaseSide): def __init__( self, name: str, - password: str, - email: str, - phone: str, - tenant: str, - queue: Optional[str] = None, - status: Optional[int] = 1, + password: Optional[str] = configuration.USER_PASSWORD, + email: Optional[str] = configuration.USER_EMAIL, + phone: Optional[str] = configuration.USER_PHONE, + tenant: Optional[str] = configuration.WORKFLOW_TENANT, + queue: Optional[str] = configuration.WORKFLOW_QUEUE, + status: Optional[int] = configuration.USER_STATE, ): super().__init__(name) self.password = password @@ -54,8 +56,15 @@ class User(BaseSide): self.queue = queue self.status = status + def create_tenant_if_not_exists(self) -> None: + """Create tenant object.""" + tenant = Tenant(name=self.tenant, queue=self.queue) + tenant.create_if_not_exists(self.queue) + def create_if_not_exists(self, **kwargs): """Create User if not exists.""" + # Should make sure queue already exists. + self.create_tenant_if_not_exists() gateway = launch_gateway() gateway.entry_point.createUser( self.name, diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 655b7fd4a0..e311be2ec1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -309,15 +309,12 @@ def test_process_definition_simple_separate(): "user_attrs", [ {"tenant": "tenant_specific"}, - {"queue": "queue_specific"}, - {"tenant": "tenant_specific", "queue": "queue_specific"}, ], ) def test_set_process_definition_user_attr(user_attrs): """Test user with correct attributes if we specific assigned to process definition object.""" default_value = { "tenant": configuration.WORKFLOW_TENANT, - "queue": configuration.WORKFLOW_QUEUE, } with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd: user = pd.user diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py new file mode 100644 index 0000000000..a9cd352710 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""py.test conftest.py file for package integration test.""" + +import pytest + +from tests.testing.docker_wrapper import DockerWrapper + + +@pytest.fixture(scope="package", autouse=True) +def docker_setup_teardown(): + """Fixture for whole package tests, Set up and teardown docker env. + + Fixture in file named ``conftest.py`` with ``scope=package`` could be auto import in the + whole package, and with attribute ``autouse=True`` will be auto-use for each test cases. + + .. seealso:: + For more information about conftest.py see: + https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups + """ + docker_wrapper = DockerWrapper( + image="apache/dolphinscheduler-standalone-server:ci", + container_name="ci-dolphinscheduler-standalone-server", + ) + ports = {"25333/tcp": 25333} + container = docker_wrapper.run_until_log( + log="Started StandaloneServer in", tty=True, ports=ports + ) + assert container is not None + yield + docker_wrapper.remove_container() diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py similarity index 90% rename from dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py rename to dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py index 3c8831e16d..8b7c5ff845 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py @@ -31,9 +31,10 @@ def test_gateway_connect(): def test_jvm_simple(): """Test use JVM build-in object and operator from java gateway.""" gateway = JavaGateway() - smaller = gateway.jvm.java.lang.Integer.MIN_VALUE - bigger = gateway.jvm.java.lang.Integer.MAX_VALUE - assert bigger > smaller + smallest = gateway.jvm.java.lang.Integer.MIN_VALUE + biggest = gateway.jvm.java.lang.Integer.MAX_VALUE + assert smallest is not None and biggest is not None + assert biggest > smallest def test_python_client_java_import_single(): diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py new file mode 100644 index 0000000000..1672bde530 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test process definition in integration.""" + +from typing import Dict + +import pytest + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.shell import Shell + +PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd" +TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}" + + +@pytest.mark.parametrize( + "pre, post", + [ + ( + { + "user": "pre_user", + }, + { + "user": "post_user", + }, + ) + ], +) +def test_change_process_definition_attr(pre: Dict, post: Dict): + """Test whether process definition success when specific attribute change.""" + assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute." + for attrs in [pre, post]: + with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd: + Shell(name=TASK_NAME, command="echo 1") + pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py index 423c6c3a6f..0964e1b975 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py @@ -22,26 +22,9 @@ from pathlib import Path import pytest from tests.testing.constants import ignore_exec_examples -from tests.testing.docker_wrapper import DockerWrapper from tests.testing.path import path_example -@pytest.fixture(scope="module") -def setup_docker(): - """Set up and teardown docker env for fixture.""" - docker_wrapper = DockerWrapper( - image="apache/dolphinscheduler-standalone-server:ci", - container_name="ci-dolphinscheduler-standalone-server", - ) - ports = {"25333/tcp": 25333} - container = docker_wrapper.run_until_log( - log="Started StandaloneServer in", tty=True, ports=ports - ) - assert container is not None - yield - docker_wrapper.remove_container() - - @pytest.mark.parametrize( "example_path", [ @@ -50,7 +33,7 @@ def setup_docker(): if path.is_file() and path.stem not in ignore_exec_examples ], ) -def test_exec_white_list_example(setup_docker, example_path: Path): +def test_exec_white_list_example(example_path: Path): """Test execute examples and submit DAG to PythonGatewayServer.""" try: exec(example_path.read_text()) diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index 9fa1366e69..218cf614b3 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; @@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.server.config.PythonGatewayConfig; @@ -59,6 +61,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -142,6 +145,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer { @Autowired private PythonGatewayConfig pythonGatewayConfig; + @Autowired + private ProjectUserMapper projectUserMapper; + @Value("${spring.jackson.time-zone:UTC}") private String timezone; @@ -231,8 +237,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer { String taskDefinitionJson, ProcessExecutionTypeEnum executionType) { User user = usersService.queryUser(userName); - Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); + Project project = projectMapper.queryByName(projectName); long projectCode = project.getCode(); + ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); long processDefinitionCode; // create or update process definition @@ -349,9 +356,38 @@ public class PythonGatewayServer extends SpringBootServletInitializer { } // side object - public Map createProject(String userName, String name, String desc) { + /* + Grant project's permission to user. Use when project's created user not current but + Python API use it to change process definition. + */ + private Integer grantProjectToUser(Project project, User user) { + Date now = new Date(); + ProjectUser projectUser = new ProjectUser(); + projectUser.setUserId(user.getId()); + projectUser.setProjectId(project.getId()); + projectUser.setPerm(Constants.AUTHORIZE_WRITABLE_PERM); + projectUser.setCreateTime(now); + projectUser.setUpdateTime(now); + return projectUserMapper.insert(projectUser); + } + + /* + Grant or create project. Create a new project if project do not exists, and grant the project + permission to user if project exists but without permission to this user. + */ + public void createOrGrantProject(String userName, String name, String desc) { User user = usersService.queryUser(userName); - return projectService.createProject(user, name, desc); + + Project project; + project = projectMapper.queryByName(name); + if (project == null) { + projectService.createProject(user, name, desc); + } else if (project.getUserId() != user.getId()) { + ProjectUser projectUser = projectUserMapper.queryProjectRelation(project.getId(), user.getId()); + if (projectUser == null) { + grantProjectToUser(project, user); + } + } } public Map createQueue(String name, String queueName) {