diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py index e5769f518d..1586757698 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -26,6 +26,7 @@ from pydolphinscheduler.constants import ( ProcessDefinitionReleaseState, ) from pydolphinscheduler.core.base import Base +from pydolphinscheduler.exceptions import PyDSParamException, PyDSTaskNoFoundException from pydolphinscheduler.java_gateway import launch_gateway from pydolphinscheduler.side import Project, Tenant, User from pydolphinscheduler.utils.date import MAX_DATETIME, conv_from_str, conv_to_schedule @@ -166,7 +167,7 @@ class ProcessDefinition(Base): elif isinstance(val, str): return conv_from_str(val) else: - raise ValueError("Do not support value type %s for now", type(val)) + raise PyDSParamException("Do not support value type %s for now", type(val)) @property def start_time(self) -> Any: @@ -271,7 +272,7 @@ class ProcessDefinition(Base): def get_task(self, code: str) -> "Task": # noqa: F821 """Get task object from process definition by given code.""" if code not in self.tasks: - raise ValueError( + raise PyDSTaskNoFoundException( "Task with code %s can not found in process definition %", (code, self.name), ) @@ -294,7 +295,7 @@ class ProcessDefinition(Base): """ tasks = self.get_tasks_by_name(name) if not tasks: - raise ValueError(f"Can not find task with name {name}.") + raise PyDSTaskNoFoundException(f"Can not find task with name {name}.") return tasks.pop() def run(self): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index b6aa516b9f..d22e5c8a41 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -17,6 +17,7 @@ """DolphinScheduler ObjectJsonBase, TaskParams and Task object.""" +import logging from typing import Dict, List, Optional, Sequence, Set, Tuple, Union from pydolphinscheduler.constants import ( @@ -162,6 +163,11 @@ class Task(Base): and self.code not in self.process_definition.tasks ): self.process_definition.add_task(self) + else: + logging.warning( + "Task code %d already in process definition, prohibit re-add task.", + self.code, + ) @property def process_definition(self) -> Optional[ProcessDefinition]: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py new file mode 100644 index 0000000000..2acd79ddb9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Exceptions for pydolphinscheduler.""" + + +class PyDSBaseException(Exception): + """Base exception for pydolphinscheduler.""" + + pass + + +class PyDSParamException(PyDSBaseException): + """Exception for pydolphinscheduler parameter verify error.""" + + pass + + +class PyDSTaskNoFoundException(PyDSBaseException): + """Exception for pydolphinscheduler workflow task no found error.""" + + pass + + +class PyDSJavaGatewayException(PyDSBaseException): + """Exception for pydolphinscheduler Java gateway error.""" + + pass diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py index f40bb363a6..d0b4c054d8 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py @@ -23,6 +23,7 @@ from py4j.java_collections import JavaMap from py4j.java_gateway import GatewayParameters, JavaGateway from pydolphinscheduler.constants import JavaGatewayDefault +from pydolphinscheduler.exceptions import PyDSJavaGatewayException def launch_gateway() -> JavaGateway: @@ -45,10 +46,10 @@ def gateway_result_checker( result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != JavaGatewayDefault.RESULT_STATUS_SUCCESS ): - raise RuntimeError("Failed when try to got result for java gateway") + raise PyDSJavaGatewayException("Failed when try to got result for java gateway") if ( msg_check is not None and result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check ): - raise ValueError("Get result state not success.") + raise PyDSJavaGatewayException("Get result state not success.") return result diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py index 5b0e76549d..445142e5b9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py @@ -21,6 +21,7 @@ from typing import Optional from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.exceptions import PyDSParamException class HttpMethod: @@ -67,11 +68,13 @@ class HttpTaskParams(TaskParams): super().__init__(*args, **kwargs) self.url = url if not hasattr(HttpMethod, http_method): - raise ValueError("Parameter http_method %s not support.", http_method) + raise PyDSParamException( + "Parameter http_method %s not support.", http_method + ) self.http_method = http_method self.http_params = http_params or [] if not hasattr(HttpCheckCondition, http_check_condition): - raise ValueError( + raise PyDSParamException( "Parameter http_check_condition %s not support.", http_check_condition ) self.http_check_condition = http_check_condition @@ -79,7 +82,7 @@ class HttpTaskParams(TaskParams): http_check_condition != HttpCheckCondition.STATUS_CODE_DEFAULT and condition is None ): - raise ValueError( + raise PyDSParamException( "Parameter condition must provider if http_check_condition not equal to STATUS_CODE_DEFAULT" ) self.condition = condition diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py index ee392fb4f3..9a7149a520 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/python.py @@ -23,6 +23,7 @@ from typing import Any from pydolphinscheduler.constants import TaskType from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.exceptions import PyDSParamException class PythonTaskParams(TaskParams): @@ -43,5 +44,7 @@ class Python(Task): py_function = inspect.getsource(code) task_params = PythonTaskParams(raw_script=py_function) else: - raise ValueError("Parameter code do not support % for now.", type(code)) + raise PyDSParamException( + "Parameter code do not support % for now.", type(code) + ) super().__init__(name, TaskType.PYTHON, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py index 0359b290f2..4c1597425d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/core/test_process_definition.py @@ -29,6 +29,7 @@ from pydolphinscheduler.constants import ( ) from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.core.task import TaskParams +from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.side import Project, Tenant, User from pydolphinscheduler.utils.date import conv_to_schedule from tests.testing.task import Task @@ -147,7 +148,7 @@ def test__parse_datetime(val, expect): def test__parse_datetime_not_support_type(val: Any): """Test process definition function _parse_datetime not support type error.""" with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: - with pytest.raises(ValueError): + with pytest.raises(PyDSParamException, match="Do not support value type.*?"): pd._parse_datetime(val) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py index 84af83fe3a..7c01517c94 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_http.py @@ -21,6 +21,7 @@ from unittest.mock import patch import pytest +from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.tasks.http import ( Http, HttpCheckCondition, @@ -64,7 +65,7 @@ def test_attr_exists(class_name, attrs): def test_http_task_param_not_support_param(param): """Test HttpTaskParams not support parameter.""" url = "https://www.apache.org" - with pytest.raises(ValueError, match="Parameter .*?"): + with pytest.raises(PyDSParamException, match="Parameter .*?"): HttpTaskParams(url, **param) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py index 0fdd0cca5c..f9e7f04678 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_python.py @@ -22,6 +22,7 @@ from unittest.mock import patch import pytest +from pydolphinscheduler.exceptions import PyDSParamException from pydolphinscheduler.tasks.python import Python, PythonTaskParams @@ -60,7 +61,9 @@ def test_python_task_not_support_code(script_code): "pydolphinscheduler.core.task.Task.gen_code_and_version", return_value=(code, version), ): - with pytest.raises(ValueError, match="Parameter code do not support .*?"): + with pytest.raises( + PyDSParamException, match="Parameter code do not support .*?" + ): Python(name, script_code) diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py index bb204c9294..b9f8ce5ff3 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/utils/test_date.py @@ -72,5 +72,7 @@ def test_conv_from_str_success(src: str, expect: datetime) -> None: ) def test_conv_from_str_not_impl(src: str) -> None: """Test function conv_from_str fail case.""" - with pytest.raises(NotImplementedError): + with pytest.raises( + NotImplementedError, match=".*? could not be convert to datetime for now." + ): conv_from_str(src)