Browse Source

[python] Fix change exists pd attribute user error (#9140)

* [python] Fix change exists pd attribute user error

* Remove attribute from class ProcessDefinition, cause it should to
  object User instead of object ProcessDefinition.
* Grant project to user if attribute user change for exists ProcessDefinition

close: #8751

* Add py.test conftest.py for package integration
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
5289b09817
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-python/pydolphinscheduler/UPDATING.md
  2. 5
      dolphinscheduler-python/pydolphinscheduler/pytest.ini
  3. 22
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py
  4. 2
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py
  5. 21
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py
  6. 3
      dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py
  7. 46
      dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py
  8. 7
      dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py
  9. 50
      dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py
  10. 19
      dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py
  11. 42
      dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

1
dolphinscheduler-python/pydolphinscheduler/UPDATING.md

@ -24,5 +24,6 @@ It started after version 2.0.5 released
## dev
* Remove parameter `queue` from class `ProcessDefinition` to avoid confuse user when it change but not work
* Change `yaml_parser.py` method `to_string` to magic method `__str__` make it more pythonic.
* Use package ``ruamel.yaml`` replace ``pyyaml`` for write yaml file with comment.

5
dolphinscheduler-python/pydolphinscheduler/pytest.ini

@ -14,11 +14,8 @@
# limitations under the License.
[pytest]
# Do not test test_java_gateway.py due to we can not mock java gateway for now
addopts = --ignore=tests/test_java_gateway.py
# add path here to skip pytest scan it
norecursedirs =
tests/testing
# Integration test run seperated which do not calculate coverage
# Integration test run seperated which do not calculate coverage, it will run in `tox -e integrate-test`
tests/integration

22
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

@ -55,6 +55,14 @@ class ProcessDefinition(Base):
"""process definition object, will define process definition attribute, task, relation.
TODO: maybe we should rename this class, currently use DS object name.
:param user: The user for current process definition. Will create a new one if it do not exists. If your
parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
``project`` to ``user`` automatically.
:param project: The project for current process definition. You could see the workflow in this project
thought Web UI after it :func:`submit` or :func:`run`. It will create a new project belongs to
``user`` if it does not exists. And when ``project`` exists but project's create do not belongs
to ``user``, will grant `project` to ``user`` automatically.
"""
# key attribute for identify ProcessDefinition object
@ -91,7 +99,6 @@ class ProcessDefinition(Base):
user: Optional[str] = configuration.WORKFLOW_USER,
project: Optional[str] = configuration.WORKFLOW_PROJECT,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
queue: Optional[str] = configuration.WORKFLOW_QUEUE,
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
timeout: Optional[int] = 0,
release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE,
@ -105,7 +112,6 @@ class ProcessDefinition(Base):
self._user = user
self._project = project
self._tenant = tenant
self._queue = queue
self.worker_group = worker_group
self.timeout = timeout
self.release_state = release_state
@ -148,15 +154,7 @@ class ProcessDefinition(Base):
For now we just get from python side but not from java gateway side, so it may not correct.
"""
return User(
self._user,
configuration.USER_PASSWORD,
configuration.USER_EMAIL,
configuration.USER_PHONE,
self._tenant,
self._queue,
configuration.USER_STATE,
)
return User(name=self._user, tenant=self._tenant)
@staticmethod
def _parse_datetime(val: Any) -> Any:
@ -331,8 +329,6 @@ class ProcessDefinition(Base):
:class:`pydolphinscheduler.constants.ProcessDefinitionDefault`.
"""
# TODO used metaclass for more pythonic
self.tenant.create_if_not_exists(self._queue)
# model User have to create after Tenant created
self.user.create_if_not_exists()
# Project model need User object exists
self.project.create_if_not_exists(self._user)

2
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py

@ -37,6 +37,6 @@ class Project(BaseSide):
def create_if_not_exists(self, user=configuration.USER_NAME) -> None:
"""Create Project if not exists."""
gateway = launch_gateway()
gateway.entry_point.createProject(user, self.name, self.description)
gateway.entry_point.createOrGrantProject(user, self.name, self.description)
# TODO recover result checker
# gateway_result_checker(result, None)

21
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py

@ -19,8 +19,10 @@
from typing import Optional
from pydolphinscheduler.core import configuration
from pydolphinscheduler.core.base_side import BaseSide
from pydolphinscheduler.java_gateway import launch_gateway
from pydolphinscheduler.side.tenant import Tenant
class User(BaseSide):
@ -39,12 +41,12 @@ class User(BaseSide):
def __init__(
self,
name: str,
password: str,
email: str,
phone: str,
tenant: str,
queue: Optional[str] = None,
status: Optional[int] = 1,
password: Optional[str] = configuration.USER_PASSWORD,
email: Optional[str] = configuration.USER_EMAIL,
phone: Optional[str] = configuration.USER_PHONE,
tenant: Optional[str] = configuration.WORKFLOW_TENANT,
queue: Optional[str] = configuration.WORKFLOW_QUEUE,
status: Optional[int] = configuration.USER_STATE,
):
super().__init__(name)
self.password = password
@ -54,8 +56,15 @@ class User(BaseSide):
self.queue = queue
self.status = status
def create_tenant_if_not_exists(self) -> None:
"""Create tenant object."""
tenant = Tenant(name=self.tenant, queue=self.queue)
tenant.create_if_not_exists(self.queue)
def create_if_not_exists(self, **kwargs):
"""Create User if not exists."""
# Should make sure queue already exists.
self.create_tenant_if_not_exists()
gateway = launch_gateway()
gateway.entry_point.createUser(
self.name,

3
dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py

@ -309,15 +309,12 @@ def test_process_definition_simple_separate():
"user_attrs",
[
{"tenant": "tenant_specific"},
{"queue": "queue_specific"},
{"tenant": "tenant_specific", "queue": "queue_specific"},
],
)
def test_set_process_definition_user_attr(user_attrs):
"""Test user with correct attributes if we specific assigned to process definition object."""
default_value = {
"tenant": configuration.WORKFLOW_TENANT,
"queue": configuration.WORKFLOW_QUEUE,
}
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, **user_attrs) as pd:
user = pd.user

46
dolphinscheduler-python/pydolphinscheduler/tests/integration/conftest.py

@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""py.test conftest.py file for package integration test."""
import pytest
from tests.testing.docker_wrapper import DockerWrapper
@pytest.fixture(scope="package", autouse=True)
def docker_setup_teardown():
"""Fixture for whole package tests, Set up and teardown docker env.
Fixture in file named ``conftest.py`` with ``scope=package`` could be auto import in the
whole package, and with attribute ``autouse=True`` will be auto-use for each test cases.
.. seealso::
For more information about conftest.py see:
https://docs.pytest.org/en/latest/example/simple.html#package-directory-level-fixtures-setups
"""
docker_wrapper = DockerWrapper(
image="apache/dolphinscheduler-standalone-server:ci",
container_name="ci-dolphinscheduler-standalone-server",
)
ports = {"25333/tcp": 25333}
container = docker_wrapper.run_until_log(
log="Started StandaloneServer in", tty=True, ports=ports
)
assert container is not None
yield
docker_wrapper.remove_container()

7
dolphinscheduler-python/pydolphinscheduler/tests/test_java_gateway.py → dolphinscheduler-python/pydolphinscheduler/tests/integration/test_java_gateway.py

@ -31,9 +31,10 @@ def test_gateway_connect():
def test_jvm_simple():
"""Test use JVM build-in object and operator from java gateway."""
gateway = JavaGateway()
smaller = gateway.jvm.java.lang.Integer.MIN_VALUE
bigger = gateway.jvm.java.lang.Integer.MAX_VALUE
assert bigger > smaller
smallest = gateway.jvm.java.lang.Integer.MIN_VALUE
biggest = gateway.jvm.java.lang.Integer.MAX_VALUE
assert smallest is not None and biggest is not None
assert biggest > smallest
def test_python_client_java_import_single():

50
dolphinscheduler-python/pydolphinscheduler/tests/integration/test_process_definition.py

@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test process definition in integration."""
from typing import Dict
import pytest
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.shell import Shell
PROCESS_DEFINITION_NAME = "test_change_exists_attr_pd"
TASK_NAME = f"task_{PROCESS_DEFINITION_NAME}"
@pytest.mark.parametrize(
"pre, post",
[
(
{
"user": "pre_user",
},
{
"user": "post_user",
},
)
],
)
def test_change_process_definition_attr(pre: Dict, post: Dict):
"""Test whether process definition success when specific attribute change."""
assert pre.keys() == post.keys(), "Not equal keys for pre and post attribute."
for attrs in [pre, post]:
with ProcessDefinition(name=PROCESS_DEFINITION_NAME, **attrs) as pd:
Shell(name=TASK_NAME, command="echo 1")
pd.submit()

19
dolphinscheduler-python/pydolphinscheduler/tests/integration/test_submit_examples.py

@ -22,26 +22,9 @@ from pathlib import Path
import pytest
from tests.testing.constants import ignore_exec_examples
from tests.testing.docker_wrapper import DockerWrapper
from tests.testing.path import path_example
@pytest.fixture(scope="module")
def setup_docker():
"""Set up and teardown docker env for fixture."""
docker_wrapper = DockerWrapper(
image="apache/dolphinscheduler-standalone-server:ci",
container_name="ci-dolphinscheduler-standalone-server",
)
ports = {"25333/tcp": 25333}
container = docker_wrapper.run_until_log(
log="Started StandaloneServer in", tty=True, ports=ports
)
assert container is not None
yield
docker_wrapper.remove_container()
@pytest.mark.parametrize(
"example_path",
[
@ -50,7 +33,7 @@ def setup_docker():
if path.is_file() and path.stem not in ignore_exec_examples
],
)
def test_exec_white_list_example(setup_docker, example_path: Path):
def test_exec_white_list_example(example_path: Path):
"""Test execute examples and submit DAG to PythonGatewayServer."""
try:
exec(example_path.read_text())

42
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -52,6 +53,7 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.server.config.PythonGatewayConfig;
@ -59,6 +61,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -142,6 +145,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
@Autowired
private PythonGatewayConfig pythonGatewayConfig;
@Autowired
private ProjectUserMapper projectUserMapper;
@Value("${spring.jackson.time-zone:UTC}")
private String timezone;
@ -231,8 +237,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
String taskDefinitionJson,
ProcessExecutionTypeEnum executionType) {
User user = usersService.queryUser(userName);
Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST);
Project project = projectMapper.queryByName(projectName);
long projectCode = project.getCode();
ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name);
long processDefinitionCode;
// create or update process definition
@ -349,9 +356,38 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
}
// side object
public Map<String, Object> createProject(String userName, String name, String desc) {
/*
Grant project's permission to user. Use when project's created user not current but
Python API use it to change process definition.
*/
private Integer grantProjectToUser(Project project, User user) {
Date now = new Date();
ProjectUser projectUser = new ProjectUser();
projectUser.setUserId(user.getId());
projectUser.setProjectId(project.getId());
projectUser.setPerm(Constants.AUTHORIZE_WRITABLE_PERM);
projectUser.setCreateTime(now);
projectUser.setUpdateTime(now);
return projectUserMapper.insert(projectUser);
}
/*
Grant or create project. Create a new project if project do not exists, and grant the project
permission to user if project exists but without permission to this user.
*/
public void createOrGrantProject(String userName, String name, String desc) {
User user = usersService.queryUser(userName);
return projectService.createProject(user, name, desc);
Project project;
project = projectMapper.queryByName(name);
if (project == null) {
projectService.createProject(user, name, desc);
} else if (project.getUserId() != user.getId()) {
ProjectUser projectUser = projectUserMapper.queryProjectRelation(project.getId(), user.getId());
if (projectUser == null) {
grantProjectToUser(project, user);
}
}
}
public Map<String, Object> createQueue(String name, String queueName) {

Loading…
Cancel
Save