diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py new file mode 100644 index 0000000000..255d599781 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +r""" +A example workflow for task dependent. + +This example will create two workflows named `task_dependent` and `task_dependent_external`. +`task_dependent` is true workflow define and run task dependent, while `task_dependent_external` +define outside workflow and task from dependent. + +After this script submit, we would get workflow as below: + +task_dependent_external: + +task_1 +task_2 +task_3 + +task_dependent: + +task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2). +""" +from constants import ProcessDefinitionDefault + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or +from pydolphinscheduler.tasks.shell import Shell + +with ProcessDefinition( + name="task_dependent_external", + tenant="tenant_exists", +) as pd: + task_1 = Shell(name="task_1", command="echo task 1") + task_2 = Shell(name="task_2", command="echo task 2") + task_3 = Shell(name="task_3", command="echo task 3") + pd.submit() + +with ProcessDefinition( + name="task_dependent", + tenant="tenant_exists", +) as pd: + task = Dependent( + name="task_dependent", + dependence=And( + Or( + DependentItem( + project_name=ProcessDefinitionDefault.PROJECT, + process_definition_name="task_dependent_external", + dependent_task_name="task_1", + ), + DependentItem( + project_name=ProcessDefinitionDefault.PROJECT, + process_definition_name="task_dependent_external", + dependent_task_name="task_2", + ), + ) + ), + ) + pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index c2899abd26..c2d2e7f254 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -73,6 +73,7 @@ class TaskType(str): SQL = "SQL" SUB_PROCESS = "SUB_PROCESS" PROCEDURE = "PROCEDURE" + DEPENDENT = "DEPENDENT" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py new file mode 100644 index 0000000000..760ccab92d --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py @@ -0,0 +1,277 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task dependent.""" + +from typing import Dict, Optional, Tuple + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.base import Base +from pydolphinscheduler.core.task import Task +from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException +from pydolphinscheduler.java_gateway import launch_gateway + +DEPENDENT_ALL_TASK_IN_WORKFLOW = "0" + + +class DependentDate(str): + """Constant of Dependent date value. + + These values set according to Java server side, if you want to add and change it, + please change Java server side first. + """ + + # TODO Maybe we should add parent level to DependentDate for easy to use, such as + # DependentDate.MONTH.THIS_MONTH + + # Hour + CURRENT_HOUR = "currentHour" + LAST_ONE_HOUR = "last1Hour" + LAST_TWO_HOURS = "last2Hours" + LAST_THREE_HOURS = "last3Hours" + LAST_TWENTY_FOUR_HOURS = "last24Hours" + + # Day + TODAY = "today" + LAST_ONE_DAYS = "last1Days" + LAST_TWO_DAYS = "last2Days" + LAST_THREE_DAYS = "last3Days" + LAST_SEVEN_DAYS = "last7Days" + + # Week + THIS_WEEK = "thisWeek" + LAST_WEEK = "lastWeek" + LAST_MONDAY = "lastMonday" + LAST_TUESDAY = "lastTuesday" + LAST_WEDNESDAY = "lastWednesday" + LAST_THURSDAY = "lastThursday" + LAST_FRIDAY = "lastFriday" + LAST_SATURDAY = "lastSaturday" + LAST_SUNDAY = "lastSunday" + + # Month + THIS_MONTH = "thisMonth" + LAST_MONTH = "lastMonth" + LAST_MONTH_BEGIN = "lastMonthBegin" + LAST_MONTH_END = "lastMonthEnd" + + +class DependentItem(Base): + """Dependent item object, minimal unit for task dependent. + + It declare which project, process_definition, task are dependent to this task. + """ + + _DEFINE_ATTR = { + "project_code", + "definition_code", + "dep_task_code", + "cycle", + "date_value", + } + + # TODO maybe we should conside overwrite operator `and` and `or` for DependentItem to + # support more easy way to set relation + def __init__( + self, + project_name: str, + process_definition_name: str, + dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW, + dependent_date: Optional[DependentDate] = DependentDate.TODAY, + ): + obj_name = f"{project_name}.{process_definition_name}.{dependent_task_name}.{dependent_date}" + super().__init__(obj_name) + self.project_name = project_name + self.process_definition_name = process_definition_name + self.dependent_task_name = dependent_task_name + if dependent_date is None: + raise PyDSParamException( + "Parameter dependent_date must provider by got None." + ) + else: + self.dependent_date = dependent_date + self._code = {} + + def __repr__(self) -> str: + return "depend_item_list" + + @property + def project_code(self) -> str: + """Get dependent project code.""" + return self.get_code_from_gateway().get("projectCode") + + @property + def definition_code(self) -> str: + """Get dependent definition code.""" + return self.get_code_from_gateway().get("processDefinitionCode") + + @property + def dep_task_code(self) -> str: + """Get dependent tasks code list.""" + if self.is_all_task: + return DEPENDENT_ALL_TASK_IN_WORKFLOW + else: + return self.get_code_from_gateway().get("taskDefinitionCode") + + # TODO Maybe we should get cycle from dependent date class. + @property + def cycle(self) -> str: + """Get dependent cycle.""" + if "Hour" in self.dependent_date: + return "hour" + elif self.dependent_date == "today" or "Days" in self.dependent_date: + return "day" + elif "Month" in self.dependent_date: + return "month" + else: + return "week" + + @property + def date_value(self) -> str: + """Get dependent date.""" + return self.dependent_date + + @property + def is_all_task(self) -> bool: + """Check whether dependent all tasks or not.""" + return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW + + @property + def code_parameter(self) -> Tuple: + """Get name info parameter to query code.""" + param = ( + self.project_name, + self.process_definition_name, + self.dependent_task_name if not self.is_all_task else None, + ) + return param + + def get_code_from_gateway(self) -> Dict: + """Get project, definition, task code from given parameter.""" + if self._code: + return self._code + else: + gateway = launch_gateway() + try: + self._code = gateway.entry_point.getDependentInfo(*self.code_parameter) + return self._code + except Exception: + raise PyDSJavaGatewayException("Function get_code_from_gateway error.") + + +class DependentOperator(Base): + """Set DependentItem or dependItemList with specific operator.""" + + _DEFINE_ATTR = { + "relation", + } + + DEPENDENT_ITEM = "DependentItem" + DEPENDENT_OPERATOR = "DependentOperator" + + def __init__(self, *args): + super().__init__(self.__class__.__name__) + self.args = args + + def __repr__(self) -> str: + return "depend_task_list" + + @classmethod + def operator_name(cls) -> str: + """Get operator name in different class.""" + return cls.__name__.upper() + + @property + def relation(self) -> str: + """Get operator name in different class, for function :func:`get_define`.""" + return self.operator_name() + + def set_define_attr(self) -> str: + """Set attribute to function :func:`get_define`. + + It is a wrapper for both `And` and `Or` operator. + """ + result = [] + attr = None + for dependent in self.args: + if isinstance(dependent, (DependentItem, DependentOperator)): + if attr is None: + attr = repr(dependent) + elif repr(dependent) != attr: + raise PyDSParamException( + "Dependent %s operator parameter only support same type.", + self.relation, + ) + else: + raise PyDSParamException( + "Dependent %s operator parameter support DependentItem and " + "DependentOperator but got %s.", + (self.relation, type(dependent)), + ) + result.append(dependent.get_define()) + setattr(self, attr, result) + return attr + + def get_define(self, camel_attr=True) -> Dict: + """Overwrite Base.get_define to get task dependent specific get define.""" + attr = self.set_define_attr() + dependent_define_attr = self._DEFINE_ATTR.union({attr}) + return super().get_define_custom( + camel_attr=True, custom_attr=dependent_define_attr + ) + + +class And(DependentOperator): + """Operator And for task dependent. + + It could accept both :class:`DependentItem` and children of :class:`DependentOperator`, + and set AND condition to those args. + """ + + def __init__(self, *args): + super().__init__(*args) + + +class Or(DependentOperator): + """Operator Or for task dependent. + + It could accept both :class:`DependentItem` and children of :class:`DependentOperator`, + and set OR condition to those args. + """ + + def __init__(self, *args): + super().__init__(*args) + + +class Dependent(Task): + """Task dependent object, declare behavior for dependent task to dolphinscheduler.""" + + def __init__(self, name: str, dependence: DependentOperator, *args, **kwargs): + super().__init__(name, TaskType.DEPENDENT, *args, **kwargs) + self.dependence = dependence + + @property + def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: + """Override Task.task_params for dependent task. + + Dependent task have some specials attribute `dependence`, and in most of the task + this attribute is None and use empty dict `{}` as default value. We do not use class + attribute `_task_custom_attr` due to avoid attribute cover. + """ + params = super().task_params + params["dependence"] = self.dependence.get_define() + return params diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py new file mode 100644 index 0000000000..f16e291c82 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py @@ -0,0 +1,793 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task dependent.""" +import itertools +from typing import Dict, List, Optional, Tuple, Union +from unittest.mock import patch + +import pytest + +from pydolphinscheduler.exceptions import PyDSParamException +from pydolphinscheduler.tasks.dependent import ( + And, + Dependent, + DependentDate, + DependentItem, + DependentOperator, + Or, +) + +TEST_PROJECT = "test-project" +TEST_PROCESS_DEFINITION = "test-process-definition" +TEST_TASK = "test-task" +TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567 + +TEST_OPERATOR_LIST = ("AND", "OR") + + +@pytest.mark.parametrize( + "dep_date, dep_cycle", + [ + # hour + (DependentDate.CURRENT_HOUR, "hour"), + (DependentDate.LAST_ONE_HOUR, "hour"), + (DependentDate.LAST_TWO_HOURS, "hour"), + (DependentDate.LAST_THREE_HOURS, "hour"), + (DependentDate.LAST_TWENTY_FOUR_HOURS, "hour"), + # day + (DependentDate.TODAY, "day"), + (DependentDate.LAST_ONE_DAYS, "day"), + (DependentDate.LAST_TWO_DAYS, "day"), + (DependentDate.LAST_THREE_DAYS, "day"), + (DependentDate.LAST_SEVEN_DAYS, "day"), + # week + (DependentDate.THIS_WEEK, "week"), + (DependentDate.LAST_WEEK, "week"), + (DependentDate.LAST_MONDAY, "week"), + (DependentDate.LAST_TUESDAY, "week"), + (DependentDate.LAST_WEDNESDAY, "week"), + (DependentDate.LAST_THURSDAY, "week"), + (DependentDate.LAST_FRIDAY, "week"), + (DependentDate.LAST_SATURDAY, "week"), + (DependentDate.LAST_SUNDAY, "week"), + # month + (DependentDate.THIS_MONTH, "month"), + (DependentDate.LAST_MONTH, "month"), + (DependentDate.LAST_MONTH_BEGIN, "month"), + (DependentDate.LAST_MONTH_END, "month"), + ], +) +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": TEST_PROJECT_CODE, + "processDefinitionCode": TEST_DEFINITION_CODE, + "taskDefinitionCode": TEST_TASK_CODE, + }, +) +def test_dependent_item_get_define(mock_task_info, dep_date, dep_cycle): + """Test dependent.DependentItem get define. + + Here we have test some cases as below. + ```py + { + "projectCode": "project code", + "definitionCode": "definition code", + "depTaskCode": "dep task code", + "cycle": "day", + "dateValue": "today" + } + ``` + """ + attr = { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": dep_date, + } + expect = { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": dep_cycle, + "dateValue": dep_date, + } + task = DependentItem(**attr) + assert expect == task.get_define() + + +def test_dependent_item_date_error(): + """Test error when pass None to dependent_date.""" + with pytest.raises( + PyDSParamException, match="Parameter dependent_date must provider.*?" + ): + DependentItem( + project_name=TEST_PROJECT, + process_definition_name=TEST_PROCESS_DEFINITION, + dependent_date=None, + ) + + +@pytest.mark.parametrize( + "task_name, result", + [ + ({"dependent_task_name": TEST_TASK}, TEST_TASK), + ({}, None), + ], +) +def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]): + """Test dependent item property code_parameter.""" + dependent_item = DependentItem( + project_name=TEST_PROJECT, + process_definition_name=TEST_PROCESS_DEFINITION, + **task_name, + ) + expect = (TEST_PROJECT, TEST_PROCESS_DEFINITION, result) + assert dependent_item.code_parameter == expect + + +@pytest.mark.parametrize( + "arg_list", + [ + [1, 2], + [ + DependentItem( + project_name=TEST_PROJECT, + process_definition_name=TEST_PROCESS_DEFINITION, + ), + 1, + ], + [ + And( + DependentItem( + project_name=TEST_PROJECT, + process_definition_name=TEST_PROCESS_DEFINITION, + ) + ), + 1, + ], + [ + DependentItem( + project_name=TEST_PROJECT, + process_definition_name=TEST_PROCESS_DEFINITION, + ), + And( + DependentItem( + project_name=TEST_PROJECT, + process_definition_name=TEST_PROCESS_DEFINITION, + ) + ), + ], + ], +) +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": TEST_PROJECT_CODE, + "processDefinitionCode": TEST_DEFINITION_CODE, + "taskDefinitionCode": TEST_TASK_CODE, + }, +) +def test_dependent_operator_set_define_error(mock_code, arg_list): + """Test dependent operator function :func:`set_define` with not support type.""" + dep_op = DependentOperator(*arg_list) + with pytest.raises(PyDSParamException, match="Dependent .*? operator.*?"): + dep_op.set_define_attr() + + +@pytest.mark.parametrize( + # Test dependent operator, Test dependent item parameters, expect operator define + "operators, kwargs, expect", + [ + # Test dependent operator (And | Or) with single dependent item + ( + (And, Or), + ( + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + ), + [ + { + "relation": op, + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + ], + } + for op in TEST_OPERATOR_LIST + ], + ), + # Test dependent operator (And | Or) with two dependent item + ( + (And, Or), + ( + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_WEEK, + }, + ), + [ + { + "relation": op, + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "week", + "dateValue": DependentDate.LAST_WEEK, + }, + ], + } + for op in TEST_OPERATOR_LIST + ], + ), + # Test dependent operator (And | Or) with multiply dependent item + ( + (And, Or), + ( + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_WEEK, + }, + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_ONE_DAYS, + }, + ), + [ + { + "relation": op, + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "week", + "dateValue": DependentDate.LAST_WEEK, + }, + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "day", + "dateValue": DependentDate.LAST_ONE_DAYS, + }, + ], + } + for op in TEST_OPERATOR_LIST + ], + ), + ], +) +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": TEST_PROJECT_CODE, + "processDefinitionCode": TEST_DEFINITION_CODE, + "taskDefinitionCode": TEST_TASK_CODE, + }, +) +def test_operator_dependent_item( + mock_code_info, + operators: Tuple[DependentOperator], + kwargs: Tuple[dict], + expect: List[Dict], +): + """Test DependentOperator(DependentItem) function get_define. + + Here we have test some cases as below, including single dependentItem and multiply dependentItem. + ```py + { + "relation": "AND", + "dependItemList": [ + { + "projectCode": "project code", + "definitionCode": "definition code", + "depTaskCode": "dep task code", + "cycle": "day", + "dateValue": "today" + }, + ... + ] + } + ``` + """ + for idx, operator in enumerate(operators): + # Use variable to keep one or more dependent item to test dependent operator behavior + dependent_item_list = [] + for kwarg in kwargs: + dependent_item = DependentItem(**kwarg) + dependent_item_list.append(dependent_item) + op = operator(*dependent_item_list) + assert expect[idx] == op.get_define() + + +@pytest.mark.parametrize( + # Test dependent operator, Test dependent item parameters, expect operator define + "operators, args, expect", + [ + # Test dependent operator (And | Or) with single dependent task list + ( + (And, Or), + ( + (And, Or), + ( + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + ), + ), + [ + { + "relation": par_op, + "dependTaskList": [ + { + "relation": chr_op, + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + ], + } + ], + } + for (par_op, chr_op) in itertools.product( + TEST_OPERATOR_LIST, TEST_OPERATOR_LIST + ) + ], + ), + # Test dependent operator (And | Or) with two dependent task list + ( + (And, Or), + ( + (And, Or), + ( + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_WEEK, + }, + ), + ), + [ + { + "relation": par_op, + "dependTaskList": [ + { + "relation": chr_op, + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "week", + "dateValue": DependentDate.LAST_WEEK, + }, + ], + } + ], + } + for (par_op, chr_op) in itertools.product( + TEST_OPERATOR_LIST, TEST_OPERATOR_LIST + ) + ], + ), + # Test dependent operator (And | Or) with multiply dependent task list + ( + (And, Or), + ( + (And, Or), + ( + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_WEEK, + }, + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_ONE_DAYS, + }, + ), + ), + [ + { + "relation": par_op, + "dependTaskList": [ + { + "relation": chr_op, + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "week", + "dateValue": DependentDate.LAST_WEEK, + }, + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "day", + "dateValue": DependentDate.LAST_ONE_DAYS, + }, + ], + } + ], + } + for (par_op, chr_op) in itertools.product( + TEST_OPERATOR_LIST, TEST_OPERATOR_LIST + ) + ], + ), + ], +) +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": TEST_PROJECT_CODE, + "processDefinitionCode": TEST_DEFINITION_CODE, + "taskDefinitionCode": TEST_TASK_CODE, + }, +) +def test_operator_dependent_task_list_multi_dependent_item( + mock_code_info, + operators: Tuple[DependentOperator], + args: Tuple[Union[Tuple, dict]], + expect: List[Dict], +): + """Test DependentOperator(DependentOperator(DependentItem)) single operator function get_define. + + Here we have test some cases as below. This test case only test single DependTaskList with one or + multiply dependItemList. + ```py + { + "relation": "OR", + "dependTaskList": [ + { + "relation": "AND", + "dependItemList": [ + { + "projectCode": "project code", + "definitionCode": "definition code", + "depTaskCode": "dep task code", + "cycle": "day", + "dateValue": "today" + }, + ... + ] + }, + ] + } + ``` + """ + # variable expect_idx record idx should be use to get specific expect + expect_idx = 0 + + for op_idx, operator in enumerate(operators): + dependent_operator = args[0] + dependent_item_kwargs = args[1] + + for dop_idx, dpt_op in enumerate(dependent_operator): + dependent_item_list = [] + for dpt_kwargs in dependent_item_kwargs: + dpti = DependentItem(**dpt_kwargs) + dependent_item_list.append(dpti) + child_dep_op = dpt_op(*dependent_item_list) + op = operator(child_dep_op) + assert expect[expect_idx] == op.get_define() + expect_idx += 1 + + +def get_dep_task_list(*operator): + """Return dependent task list from given operators list.""" + result = [] + for op in operator: + result.append( + { + "relation": op.operator_name(), + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "month", + "dateValue": DependentDate.LAST_MONTH_END, + }, + ], + } + ) + return result + + +@pytest.mark.parametrize( + # Test dependent operator, Test dependent item parameters, expect operator define + "operators, args, expect", + [ + # Test dependent operator (And | Or) with two dependent task list + ( + (And, Or), + ( + ((And, And), (And, Or), (Or, And), (Or, Or)), + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + ), + [ + { + "relation": parent_op.operator_name(), + "dependTaskList": get_dep_task_list(*child_ops), + } + for parent_op in (And, Or) + for child_ops in ((And, And), (And, Or), (Or, And), (Or, Or)) + ], + ), + # Test dependent operator (And | Or) with multiple dependent task list + ( + (And, Or), + ( + ((And, And, And), (And, And, And, And), (And, And, And, And, And)), + { + "project_name": TEST_PROJECT, + "process_definition_name": TEST_PROCESS_DEFINITION, + "dependent_task_name": TEST_TASK, + "dependent_date": DependentDate.LAST_MONTH_END, + }, + ), + [ + { + "relation": parent_op.operator_name(), + "dependTaskList": get_dep_task_list(*child_ops), + } + for parent_op in (And, Or) + for child_ops in ( + (And, And, And), + (And, And, And, And), + (And, And, And, And, And), + ) + ], + ), + ], +) +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": TEST_PROJECT_CODE, + "processDefinitionCode": TEST_DEFINITION_CODE, + "taskDefinitionCode": TEST_TASK_CODE, + }, +) +def test_operator_dependent_task_list_multi_dependent_list( + mock_code_info, + operators: Tuple[DependentOperator], + args: Tuple[Union[Tuple, dict]], + expect: List[Dict], +): + """Test DependentOperator(DependentOperator(DependentItem)) multiply operator function get_define. + + Here we have test some cases as below. This test case only test single DependTaskList with one or + multiply dependTaskList. + ```py + { + "relation": "OR", + "dependTaskList": [ + { + "relation": "AND", + "dependItemList": [ + { + "projectCode": "project code", + "definitionCode": "definition code", + "depTaskCode": "dep task code", + "cycle": "day", + "dateValue": "today" + } + ] + }, + ... + ] + } + ``` + """ + # variable expect_idx record idx should be use to get specific expect + expect_idx = 0 + for op_idx, operator in enumerate(operators): + dependent_operator = args[0] + dependent_item_kwargs = args[1] + + for dop_idx, dpt_ops in enumerate(dependent_operator): + dependent_task_list = [ + dpt_op(DependentItem(**dependent_item_kwargs)) for dpt_op in dpt_ops + ] + op = operator(*dependent_task_list) + assert ( + expect[expect_idx] == op.get_define() + ), f"Failed with operator syntax {operator}.{dpt_ops}" + expect_idx += 1 + + +@patch( + "pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway", + return_value={ + "projectCode": TEST_PROJECT_CODE, + "processDefinitionCode": TEST_DEFINITION_CODE, + "taskDefinitionCode": TEST_TASK_CODE, + }, +) +@patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(123, 1), +) +def test_dependent_get_define(mock_code_version, mock_dep_code): + """Test task dependent function get_define.""" + project_name = "test-dep-project" + process_definition_name = "test-dep-definition" + dependent_task_name = "test-dep-task" + dep_operator = And( + Or( + # test dependence with add tasks + DependentItem( + project_name=project_name, + process_definition_name=process_definition_name, + ) + ), + And( + # test dependence with specific task + DependentItem( + project_name=project_name, + process_definition_name=process_definition_name, + dependent_task_name=dependent_task_name, + ) + ), + ) + + name = "test_dependent_get_define" + expect = { + "code": 123, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "DEPENDENT", + "taskParams": { + "resourceList": [], + "localParams": [], + "dependence": { + "relation": "AND", + "dependTaskList": [ + { + "relation": "OR", + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": "0", + "cycle": "day", + "dateValue": "today", + } + ], + }, + { + "relation": "AND", + "dependItemList": [ + { + "projectCode": TEST_PROJECT_CODE, + "definitionCode": TEST_DEFINITION_CODE, + "depTaskCode": TEST_TASK_CODE, + "cycle": "day", + "dateValue": "today", + } + ], + }, + ], + }, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + + task = Dependent(name, dependence=dep_operator) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index e7bbab8e40..bc508520b4 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -62,11 +62,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; import org.springframework.context.annotation.ComponentScan; import py4j.GatewayServer; +@SpringBootApplication @ComponentScan(value = "org.apache.dolphinscheduler") public class PythonGatewayServer extends SpringBootServletInitializer { private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class); @@ -428,6 +430,41 @@ public class PythonGatewayServer extends SpringBootServletInitializer { return result; } + /** + * Get project, process definition, task code. + * Useful in Python API create dependent task which need processDefinition information. + * + * @param projectName project name which process definition belongs to + * @param processDefinitionName process definition name + * @param taskName task name + */ + public Map getDependentInfo(String projectName, String processDefinitionName, String taskName) { + Map result = new HashMap<>(); + + Project project = projectMapper.queryByName(projectName); + if (project == null) { + String msg = String.format("Can not find valid project by name %s", projectName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + long projectCode = project.getCode(); + result.put("projectCode", projectCode); + + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + if (processDefinition == null) { + String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + result.put("processDefinitionCode", processDefinition.getCode()); + + if (taskName != null) { + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName); + result.put("taskDefinitionCode", taskDefinition.getCode()); + } + return result; + } + @PostConstruct public void run() { GatewayServer server = new GatewayServer(this);