From 0dce68edd774af4656d44d6ba0ccf82ea6fe318a Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Fri, 19 Nov 2021 16:37:22 +0800 Subject: [PATCH] [python] Add task type python http (#6906) * [python] Add task type python http * Fix unittest error * Fix UT error --- .../src/pydolphinscheduler/constants.py | 2 + .../src/pydolphinscheduler/core/task.py | 6 +- .../src/pydolphinscheduler/tasks/http.py | 115 ++++++++++++++++++ .../src/pydolphinscheduler/tasks/python.py | 47 +++++++ .../src/pydolphinscheduler/tasks/shell.py | 16 ++- .../tests/core/test_process_definition.py | 2 +- .../tests/core/test_task.py | 30 ++--- .../tests/tasks/test_http.py | 112 +++++++++++++++++ .../tests/tasks/test_python.py | 115 ++++++++++++++++++ .../tests/tasks/test_shell.py | 23 +++- 10 files changed, 435 insertions(+), 33 deletions(-) create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 315a98c0b5..c27f9bd958 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -68,6 +68,8 @@ class TaskType(str): """Constants for task type, it will also show you which kind we support up to now.""" SHELL = "SHELL" + HTTP = "HTTP" + PYTHON = "PYTHON" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index c8eb54ad43..b6aa516b9f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -65,15 +65,15 @@ class TaskParams(ObjectJsonBase): def __init__( self, - raw_script: str, local_params: Optional[List] = None, resource_list: Optional[List] = None, dependence: Optional[Dict] = None, wait_start_timeout: Optional[Dict] = None, condition_result: Optional[Dict] = None, + *args, + **kwargs, ): - super().__init__() - self.raw_script = raw_script + super().__init__(*args, **kwargs) self.local_params = local_params or [] self.resource_list = resource_list or [] self.dependence = dependence or {} diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py new file mode 100644 index 0000000000..5b0e76549d --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task shell.""" + +from typing import Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task, TaskParams + + +class HttpMethod: + """Constant of HTTP method.""" + + GET = "GET" + POST = "POST" + HEAD = "HEAD" + PUT = "PUT" + DELETE = "DELETE" + + +class HttpCheckCondition: + """Constant of HTTP check condition. + + For now it contain four value: + - STATUS_CODE_DEFAULT: when http response code equal to 200, mark as success. + - STATUS_CODE_CUSTOM: when http response code equal to the code user define, mark as success. + - BODY_CONTAINS: when http response body contain text user define, mark as success. + - BODY_NOT_CONTAINS: when http response body do not contain text user define, mark as success. + """ + + STATUS_CODE_DEFAULT = "STATUS_CODE_DEFAULT" + STATUS_CODE_CUSTOM = "STATUS_CODE_CUSTOM" + BODY_CONTAINS = "BODY_CONTAINS" + BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS" + + +class HttpTaskParams(TaskParams): + """Parameter only for Http task types.""" + + def __init__( + self, + url: str, + http_method: Optional[str] = HttpMethod.GET, + http_params: Optional[str] = None, + http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT, + condition: Optional[str] = None, + connect_timeout: Optional[int] = 60000, + socket_timeout: Optional[int] = 60000, + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + self.url = url + if not hasattr(HttpMethod, http_method): + raise ValueError("Parameter http_method %s not support.", http_method) + self.http_method = http_method + self.http_params = http_params or [] + if not hasattr(HttpCheckCondition, http_check_condition): + raise ValueError( + "Parameter http_check_condition %s not support.", http_check_condition + ) + self.http_check_condition = http_check_condition + if ( + http_check_condition != HttpCheckCondition.STATUS_CODE_DEFAULT + and condition is None + ): + raise ValueError( + "Parameter condition must provider if http_check_condition not equal to STATUS_CODE_DEFAULT" + ) + self.condition = condition + self.connect_timeout = connect_timeout + self.socket_timeout = socket_timeout + + +class Http(Task): + """Task HTTP object, declare behavior for HTTP task to dolphinscheduler.""" + + def __init__( + self, + name: str, + url: str, + http_method: Optional[str] = HttpMethod.GET, + http_params: Optional[str] = None, + http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT, + condition: Optional[str] = None, + connect_timeout: Optional[int] = 60000, + socket_timeout: Optional[int] = 60000, + *args, + **kwargs + ): + task_params = HttpTaskParams( + url=url, + http_method=http_method, + http_params=http_params, + http_check_condition=http_check_condition, + condition=condition, + connect_timeout=connect_timeout, + socket_timeout=socket_timeout, + ) + super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py new file mode 100644 index 0000000000..ee392fb4f3 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task Python.""" + +import inspect +import types +from typing import Any + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task, TaskParams + + +class PythonTaskParams(TaskParams): + """Parameter only for Python task types.""" + + def __init__(self, raw_script: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.raw_script = raw_script + + +class Python(Task): + """Task Python object, declare behavior for Python task to dolphinscheduler.""" + + def __init__(self, name: str, code: Any, *args, **kwargs): + if isinstance(code, str): + task_params = PythonTaskParams(raw_script=code) + elif isinstance(code, types.FunctionType): + py_function = inspect.getsource(code) + task_params = PythonTaskParams(raw_script=py_function) + else: + raise ValueError("Parameter code do not support % for now.", type(code)) + super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py index 825902fce3..25e82f54f1 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py @@ -21,6 +21,14 @@ from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task, TaskParams +class ShellTaskParams(TaskParams): + """Parameter only for shell task types.""" + + def __init__(self, raw_script: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.raw_script = raw_script + + class Shell(Task): """Task shell object, declare behavior for shell task to dolphinscheduler. @@ -29,8 +37,6 @@ class Shell(Task): task.name assign to `task_shell` """ - def __init__( - self, name: str, command: str, task_type: str = TaskType.SHELL, *args, **kwargs - ): - task_params = TaskParams(raw_script=command) - super().__init__(name, task_type, task_params, *args, **kwargs) + def __init__(self, name: str, command: str, *args, **kwargs): + task_params = ShellTaskParams(raw_script=command) + super().__init__(name, TaskType.SHELL, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index a678649041..ae048c29dd 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -175,7 +175,7 @@ def test_process_definition_simple(): expect_tasks_num = 5 with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: for i in range(expect_tasks_num): - task_params = TaskParams(raw_script=f"test-raw-script-{i}") + task_params = TaskParams() curr_task = Task( name=f"task-{i}", task_type=f"type-{i}", task_params=task_params ) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py index 63d6496672..6d09820543 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_task.py @@ -27,16 +27,14 @@ from tests.testing.task import Task as testTask def test_task_params_to_dict(): """Test TaskParams object function to_dict.""" - raw_script = "test_task_params_to_dict" expect = { "resourceList": [], "localParams": [], - "rawScript": raw_script, "dependence": {}, "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT, "waitStartTimeout": {}, } - task_param = TaskParams(raw_script=raw_script) + task_param = TaskParams() assert task_param.to_dict() == expect @@ -65,7 +63,6 @@ def test_task_to_dict(): version = 1 name = "test_task_to_dict" task_type = "test_task_to_dict_type" - raw_script = "test_task_params_to_dict" expect = { "code": code, "name": name, @@ -76,7 +73,6 @@ def test_task_to_dict(): "taskParams": { "resourceList": [], "localParams": [], - "rawScript": raw_script, "dependence": {}, "conditionResult": {"successNode": [""], "failedNode": [""]}, "waitStartTimeout": {}, @@ -94,7 +90,7 @@ def test_task_to_dict(): "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(code, version), ): - task = Task(name=name, task_type=task_type, task_params=TaskParams(raw_script)) + task = Task(name=name, task_type=task_type, task_params=TaskParams()) assert task.to_dict() == expect @@ -104,13 +100,8 @@ def test_two_tasks_shift(shift: str): Here we test both `>>` and `<<` bit operator. """ - raw_script = "script" - upstream = testTask( - name="upstream", task_type=shift, task_params=TaskParams(raw_script) - ) - downstream = testTask( - name="downstream", task_type=shift, task_params=TaskParams(raw_script) - ) + upstream = testTask(name="upstream", task_type=shift, task_params=TaskParams()) + downstream = testTask(name="downstream", task_type=shift, task_params=TaskParams()) if shift == "<<": downstream << upstream elif shift == ">>": @@ -146,17 +137,10 @@ def test_tasks_list_shift(dep_expr: str, flag: str): "downstream": "upstream", } task_type = "dep_task_and_tasks" - raw_script = "script" - task = testTask( - name="upstream", task_type=task_type, task_params=TaskParams(raw_script) - ) + task = testTask(name="upstream", task_type=task_type, task_params=TaskParams()) tasks = [ - testTask( - name="downstream1", task_type=task_type, task_params=TaskParams(raw_script) - ), - testTask( - name="downstream2", task_type=task_type, task_params=TaskParams(raw_script) - ), + testTask(name="downstream1", task_type=task_type, task_params=TaskParams()), + testTask(name="downstream2", task_type=task_type, task_params=TaskParams()), ] # Use build-in function eval to simply test case and reduce duplicate code diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py new file mode 100644 index 0000000000..84af83fe3a --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task HTTP.""" + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.http import ( + Http, + HttpCheckCondition, + HttpMethod, + HttpTaskParams, +) + + +@pytest.mark.parametrize( + "class_name, attrs", + [ + (HttpMethod, ("GET", "POST", "HEAD", "PUT", "DELETE")), + ( + HttpCheckCondition, + ( + "STATUS_CODE_DEFAULT", + "STATUS_CODE_CUSTOM", + "BODY_CONTAINS", + "BODY_NOT_CONTAINS", + ), + ), + ], +) +def test_attr_exists(class_name, attrs): + """Test weather class HttpMethod and HttpCheckCondition contain specific attribute.""" + assert all(hasattr(class_name, attr) for attr in attrs) + + +@pytest.mark.parametrize( + "param", + [ + {"http_method": "http_method"}, + {"http_check_condition": "http_check_condition"}, + {"http_check_condition": HttpCheckCondition.STATUS_CODE_CUSTOM}, + { + "http_check_condition": HttpCheckCondition.STATUS_CODE_CUSTOM, + "condition": None, + }, + ], +) +def test_http_task_param_not_support_param(param): + """Test HttpTaskParams not support parameter.""" + url = "https://www.apache.org" + with pytest.raises(ValueError, match="Parameter .*?"): + HttpTaskParams(url, **param) + + +def test_http_to_dict(): + """Test task HTTP function to_dict.""" + code = 123 + version = 1 + name = "test_http_to_dict" + url = "https://www.apache.org" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "HTTP", + "taskParams": { + "localParams": [], + "httpParams": [], + "url": url, + "httpMethod": "GET", + "httpCheckCondition": "STATUS_CODE_DEFAULT", + "condition": None, + "connectTimeout": 60000, + "socketTimeout": 60000, + "dependence": {}, + "resourceList": [], + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + http = Http(name, url) + assert http.to_dict() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py new file mode 100644 index 0000000000..0fdd0cca5c --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py @@ -0,0 +1,115 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task python.""" + + +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.tasks.python import Python, PythonTaskParams + + +@pytest.mark.parametrize( + "name, value", + [ + ("local_params", "local_params"), + ("resource_list", "resource_list"), + ("dependence", "dependence"), + ("wait_start_timeout", "wait_start_timeout"), + ("condition_result", "condition_result"), + ], +) +def test_python_task_params_attr_setter(name, value): + """Test python task parameters.""" + command = 'print("hello world.")' + python_task_params = PythonTaskParams(command) + assert command == python_task_params.raw_script + setattr(python_task_params, name, value) + assert value == getattr(python_task_params, name) + + +@pytest.mark.parametrize( + "script_code", + [ + 123, + ("print", "hello world"), + ], +) +def test_python_task_not_support_code(script_code): + """Test python task parameters.""" + name = "not_support_code_type" + code = 123 + version = 1 + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + with pytest.raises(ValueError, match="Parameter code do not support .*?"): + Python(name, script_code) + + +def foo(): # noqa: D103 + print("hello world.") + + +@pytest.mark.parametrize( + "name, script_code, raw", + [ + ("string_define", 'print("hello world.")', 'print("hello world.")'), + ( + "function_define", + foo, + 'def foo(): # noqa: D103\n print("hello world.")\n', + ), + ], +) +def test_python_to_dict(name, script_code, raw): + """Test task python function to_dict.""" + code = 123 + version = 1 + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "PYTHON", + "taskParams": { + "resourceList": [], + "localParams": [], + "rawScript": raw, + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + shell = Python(name, script_code) + assert shell.to_dict() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py index f5f5cfa373..56fae1c51f 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_shell.py @@ -20,7 +20,28 @@ from unittest.mock import patch -from pydolphinscheduler.tasks.shell import Shell +import pytest + +from pydolphinscheduler.tasks.shell import Shell, ShellTaskParams + + +@pytest.mark.parametrize( + "name, value", + [ + ("local_params", "local_params"), + ("resource_list", "resource_list"), + ("dependence", "dependence"), + ("wait_start_timeout", "wait_start_timeout"), + ("condition_result", "condition_result"), + ], +) +def test_shell_task_params_attr_setter(name, value): + """Test shell task parameters.""" + raw_script = "echo shell task parameter" + shell_task_params = ShellTaskParams(raw_script) + assert raw_script == shell_task_params.raw_script + setattr(shell_task_params, name, value) + assert value == getattr(shell_task_params, name) def test_shell_to_dict():