Browse Source

[python] Add task dependent (#7405)

fix: #6926
3.0.0/version-upgrade
Jiajie Zhong 3 years ago committed by GitHub
parent
commit
6f93ebf3ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 73
      dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py
  2. 1
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py
  3. 277
      dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py
  4. 793
      dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py
  5. 37
      dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

73
dolphinscheduler-python/pydolphinscheduler/examples/task_dependent_example.py

@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
r"""
A example workflow for task dependent.
This example will create two workflows named `task_dependent` and `task_dependent_external`.
`task_dependent` is true workflow define and run task dependent, while `task_dependent_external`
define outside workflow and task from dependent.
After this script submit, we would get workflow as below:
task_dependent_external:
task_1
task_2
task_3
task_dependent:
task_dependent(this task dependent on task_dependent_external.task_1 and task_dependent_external.task_2).
"""
from constants import ProcessDefinitionDefault
from pydolphinscheduler.core.process_definition import ProcessDefinition
from pydolphinscheduler.tasks.dependent import And, Dependent, DependentItem, Or
from pydolphinscheduler.tasks.shell import Shell
with ProcessDefinition(
name="task_dependent_external",
tenant="tenant_exists",
) as pd:
task_1 = Shell(name="task_1", command="echo task 1")
task_2 = Shell(name="task_2", command="echo task 2")
task_3 = Shell(name="task_3", command="echo task 3")
pd.submit()
with ProcessDefinition(
name="task_dependent",
tenant="tenant_exists",
) as pd:
task = Dependent(
name="task_dependent",
dependence=And(
Or(
DependentItem(
project_name=ProcessDefinitionDefault.PROJECT,
process_definition_name="task_dependent_external",
dependent_task_name="task_1",
),
DependentItem(
project_name=ProcessDefinitionDefault.PROJECT,
process_definition_name="task_dependent_external",
dependent_task_name="task_2",
),
)
),
)
pd.submit()

1
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

@ -73,6 +73,7 @@ class TaskType(str):
SQL = "SQL"
SUB_PROCESS = "SUB_PROCESS"
PROCEDURE = "PROCEDURE"
DEPENDENT = "DEPENDENT"
class DefaultTaskCodeNum(str):

277
dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/dependent.py

@ -0,0 +1,277 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Task dependent."""
from typing import Dict, Optional, Tuple
from pydolphinscheduler.constants import TaskType
from pydolphinscheduler.core.base import Base
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.exceptions import PyDSJavaGatewayException, PyDSParamException
from pydolphinscheduler.java_gateway import launch_gateway
DEPENDENT_ALL_TASK_IN_WORKFLOW = "0"
class DependentDate(str):
"""Constant of Dependent date value.
These values set according to Java server side, if you want to add and change it,
please change Java server side first.
"""
# TODO Maybe we should add parent level to DependentDate for easy to use, such as
# DependentDate.MONTH.THIS_MONTH
# Hour
CURRENT_HOUR = "currentHour"
LAST_ONE_HOUR = "last1Hour"
LAST_TWO_HOURS = "last2Hours"
LAST_THREE_HOURS = "last3Hours"
LAST_TWENTY_FOUR_HOURS = "last24Hours"
# Day
TODAY = "today"
LAST_ONE_DAYS = "last1Days"
LAST_TWO_DAYS = "last2Days"
LAST_THREE_DAYS = "last3Days"
LAST_SEVEN_DAYS = "last7Days"
# Week
THIS_WEEK = "thisWeek"
LAST_WEEK = "lastWeek"
LAST_MONDAY = "lastMonday"
LAST_TUESDAY = "lastTuesday"
LAST_WEDNESDAY = "lastWednesday"
LAST_THURSDAY = "lastThursday"
LAST_FRIDAY = "lastFriday"
LAST_SATURDAY = "lastSaturday"
LAST_SUNDAY = "lastSunday"
# Month
THIS_MONTH = "thisMonth"
LAST_MONTH = "lastMonth"
LAST_MONTH_BEGIN = "lastMonthBegin"
LAST_MONTH_END = "lastMonthEnd"
class DependentItem(Base):
"""Dependent item object, minimal unit for task dependent.
It declare which project, process_definition, task are dependent to this task.
"""
_DEFINE_ATTR = {
"project_code",
"definition_code",
"dep_task_code",
"cycle",
"date_value",
}
# TODO maybe we should conside overwrite operator `and` and `or` for DependentItem to
# support more easy way to set relation
def __init__(
self,
project_name: str,
process_definition_name: str,
dependent_task_name: Optional[str] = DEPENDENT_ALL_TASK_IN_WORKFLOW,
dependent_date: Optional[DependentDate] = DependentDate.TODAY,
):
obj_name = f"{project_name}.{process_definition_name}.{dependent_task_name}.{dependent_date}"
super().__init__(obj_name)
self.project_name = project_name
self.process_definition_name = process_definition_name
self.dependent_task_name = dependent_task_name
if dependent_date is None:
raise PyDSParamException(
"Parameter dependent_date must provider by got None."
)
else:
self.dependent_date = dependent_date
self._code = {}
def __repr__(self) -> str:
return "depend_item_list"
@property
def project_code(self) -> str:
"""Get dependent project code."""
return self.get_code_from_gateway().get("projectCode")
@property
def definition_code(self) -> str:
"""Get dependent definition code."""
return self.get_code_from_gateway().get("processDefinitionCode")
@property
def dep_task_code(self) -> str:
"""Get dependent tasks code list."""
if self.is_all_task:
return DEPENDENT_ALL_TASK_IN_WORKFLOW
else:
return self.get_code_from_gateway().get("taskDefinitionCode")
# TODO Maybe we should get cycle from dependent date class.
@property
def cycle(self) -> str:
"""Get dependent cycle."""
if "Hour" in self.dependent_date:
return "hour"
elif self.dependent_date == "today" or "Days" in self.dependent_date:
return "day"
elif "Month" in self.dependent_date:
return "month"
else:
return "week"
@property
def date_value(self) -> str:
"""Get dependent date."""
return self.dependent_date
@property
def is_all_task(self) -> bool:
"""Check whether dependent all tasks or not."""
return self.dependent_task_name == DEPENDENT_ALL_TASK_IN_WORKFLOW
@property
def code_parameter(self) -> Tuple:
"""Get name info parameter to query code."""
param = (
self.project_name,
self.process_definition_name,
self.dependent_task_name if not self.is_all_task else None,
)
return param
def get_code_from_gateway(self) -> Dict:
"""Get project, definition, task code from given parameter."""
if self._code:
return self._code
else:
gateway = launch_gateway()
try:
self._code = gateway.entry_point.getDependentInfo(*self.code_parameter)
return self._code
except Exception:
raise PyDSJavaGatewayException("Function get_code_from_gateway error.")
class DependentOperator(Base):
"""Set DependentItem or dependItemList with specific operator."""
_DEFINE_ATTR = {
"relation",
}
DEPENDENT_ITEM = "DependentItem"
DEPENDENT_OPERATOR = "DependentOperator"
def __init__(self, *args):
super().__init__(self.__class__.__name__)
self.args = args
def __repr__(self) -> str:
return "depend_task_list"
@classmethod
def operator_name(cls) -> str:
"""Get operator name in different class."""
return cls.__name__.upper()
@property
def relation(self) -> str:
"""Get operator name in different class, for function :func:`get_define`."""
return self.operator_name()
def set_define_attr(self) -> str:
"""Set attribute to function :func:`get_define`.
It is a wrapper for both `And` and `Or` operator.
"""
result = []
attr = None
for dependent in self.args:
if isinstance(dependent, (DependentItem, DependentOperator)):
if attr is None:
attr = repr(dependent)
elif repr(dependent) != attr:
raise PyDSParamException(
"Dependent %s operator parameter only support same type.",
self.relation,
)
else:
raise PyDSParamException(
"Dependent %s operator parameter support DependentItem and "
"DependentOperator but got %s.",
(self.relation, type(dependent)),
)
result.append(dependent.get_define())
setattr(self, attr, result)
return attr
def get_define(self, camel_attr=True) -> Dict:
"""Overwrite Base.get_define to get task dependent specific get define."""
attr = self.set_define_attr()
dependent_define_attr = self._DEFINE_ATTR.union({attr})
return super().get_define_custom(
camel_attr=True, custom_attr=dependent_define_attr
)
class And(DependentOperator):
"""Operator And for task dependent.
It could accept both :class:`DependentItem` and children of :class:`DependentOperator`,
and set AND condition to those args.
"""
def __init__(self, *args):
super().__init__(*args)
class Or(DependentOperator):
"""Operator Or for task dependent.
It could accept both :class:`DependentItem` and children of :class:`DependentOperator`,
and set OR condition to those args.
"""
def __init__(self, *args):
super().__init__(*args)
class Dependent(Task):
"""Task dependent object, declare behavior for dependent task to dolphinscheduler."""
def __init__(self, name: str, dependence: DependentOperator, *args, **kwargs):
super().__init__(name, TaskType.DEPENDENT, *args, **kwargs)
self.dependence = dependence
@property
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
"""Override Task.task_params for dependent task.
Dependent task have some specials attribute `dependence`, and in most of the task
this attribute is None and use empty dict `{}` as default value. We do not use class
attribute `_task_custom_attr` due to avoid attribute cover.
"""
params = super().task_params
params["dependence"] = self.dependence.get_define()
return params

793
dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_dependent.py

@ -0,0 +1,793 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test Task dependent."""
import itertools
from typing import Dict, List, Optional, Tuple, Union
from unittest.mock import patch
import pytest
from pydolphinscheduler.exceptions import PyDSParamException
from pydolphinscheduler.tasks.dependent import (
And,
Dependent,
DependentDate,
DependentItem,
DependentOperator,
Or,
)
TEST_PROJECT = "test-project"
TEST_PROCESS_DEFINITION = "test-process-definition"
TEST_TASK = "test-task"
TEST_PROJECT_CODE, TEST_DEFINITION_CODE, TEST_TASK_CODE = 12345, 123456, 1234567
TEST_OPERATOR_LIST = ("AND", "OR")
@pytest.mark.parametrize(
"dep_date, dep_cycle",
[
# hour
(DependentDate.CURRENT_HOUR, "hour"),
(DependentDate.LAST_ONE_HOUR, "hour"),
(DependentDate.LAST_TWO_HOURS, "hour"),
(DependentDate.LAST_THREE_HOURS, "hour"),
(DependentDate.LAST_TWENTY_FOUR_HOURS, "hour"),
# day
(DependentDate.TODAY, "day"),
(DependentDate.LAST_ONE_DAYS, "day"),
(DependentDate.LAST_TWO_DAYS, "day"),
(DependentDate.LAST_THREE_DAYS, "day"),
(DependentDate.LAST_SEVEN_DAYS, "day"),
# week
(DependentDate.THIS_WEEK, "week"),
(DependentDate.LAST_WEEK, "week"),
(DependentDate.LAST_MONDAY, "week"),
(DependentDate.LAST_TUESDAY, "week"),
(DependentDate.LAST_WEDNESDAY, "week"),
(DependentDate.LAST_THURSDAY, "week"),
(DependentDate.LAST_FRIDAY, "week"),
(DependentDate.LAST_SATURDAY, "week"),
(DependentDate.LAST_SUNDAY, "week"),
# month
(DependentDate.THIS_MONTH, "month"),
(DependentDate.LAST_MONTH, "month"),
(DependentDate.LAST_MONTH_BEGIN, "month"),
(DependentDate.LAST_MONTH_END, "month"),
],
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": TEST_PROJECT_CODE,
"processDefinitionCode": TEST_DEFINITION_CODE,
"taskDefinitionCode": TEST_TASK_CODE,
},
)
def test_dependent_item_get_define(mock_task_info, dep_date, dep_cycle):
"""Test dependent.DependentItem get define.
Here we have test some cases as below.
```py
{
"projectCode": "project code",
"definitionCode": "definition code",
"depTaskCode": "dep task code",
"cycle": "day",
"dateValue": "today"
}
```
"""
attr = {
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": dep_date,
}
expect = {
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": dep_cycle,
"dateValue": dep_date,
}
task = DependentItem(**attr)
assert expect == task.get_define()
def test_dependent_item_date_error():
"""Test error when pass None to dependent_date."""
with pytest.raises(
PyDSParamException, match="Parameter dependent_date must provider.*?"
):
DependentItem(
project_name=TEST_PROJECT,
process_definition_name=TEST_PROCESS_DEFINITION,
dependent_date=None,
)
@pytest.mark.parametrize(
"task_name, result",
[
({"dependent_task_name": TEST_TASK}, TEST_TASK),
({}, None),
],
)
def test_dependent_item_code_parameter(task_name: dict, result: Optional[str]):
"""Test dependent item property code_parameter."""
dependent_item = DependentItem(
project_name=TEST_PROJECT,
process_definition_name=TEST_PROCESS_DEFINITION,
**task_name,
)
expect = (TEST_PROJECT, TEST_PROCESS_DEFINITION, result)
assert dependent_item.code_parameter == expect
@pytest.mark.parametrize(
"arg_list",
[
[1, 2],
[
DependentItem(
project_name=TEST_PROJECT,
process_definition_name=TEST_PROCESS_DEFINITION,
),
1,
],
[
And(
DependentItem(
project_name=TEST_PROJECT,
process_definition_name=TEST_PROCESS_DEFINITION,
)
),
1,
],
[
DependentItem(
project_name=TEST_PROJECT,
process_definition_name=TEST_PROCESS_DEFINITION,
),
And(
DependentItem(
project_name=TEST_PROJECT,
process_definition_name=TEST_PROCESS_DEFINITION,
)
),
],
],
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": TEST_PROJECT_CODE,
"processDefinitionCode": TEST_DEFINITION_CODE,
"taskDefinitionCode": TEST_TASK_CODE,
},
)
def test_dependent_operator_set_define_error(mock_code, arg_list):
"""Test dependent operator function :func:`set_define` with not support type."""
dep_op = DependentOperator(*arg_list)
with pytest.raises(PyDSParamException, match="Dependent .*? operator.*?"):
dep_op.set_define_attr()
@pytest.mark.parametrize(
# Test dependent operator, Test dependent item parameters, expect operator define
"operators, kwargs, expect",
[
# Test dependent operator (And | Or) with single dependent item
(
(And, Or),
(
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
),
[
{
"relation": op,
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
],
}
for op in TEST_OPERATOR_LIST
],
),
# Test dependent operator (And | Or) with two dependent item
(
(And, Or),
(
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_WEEK,
},
),
[
{
"relation": op,
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "week",
"dateValue": DependentDate.LAST_WEEK,
},
],
}
for op in TEST_OPERATOR_LIST
],
),
# Test dependent operator (And | Or) with multiply dependent item
(
(And, Or),
(
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_WEEK,
},
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_ONE_DAYS,
},
),
[
{
"relation": op,
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "week",
"dateValue": DependentDate.LAST_WEEK,
},
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "day",
"dateValue": DependentDate.LAST_ONE_DAYS,
},
],
}
for op in TEST_OPERATOR_LIST
],
),
],
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": TEST_PROJECT_CODE,
"processDefinitionCode": TEST_DEFINITION_CODE,
"taskDefinitionCode": TEST_TASK_CODE,
},
)
def test_operator_dependent_item(
mock_code_info,
operators: Tuple[DependentOperator],
kwargs: Tuple[dict],
expect: List[Dict],
):
"""Test DependentOperator(DependentItem) function get_define.
Here we have test some cases as below, including single dependentItem and multiply dependentItem.
```py
{
"relation": "AND",
"dependItemList": [
{
"projectCode": "project code",
"definitionCode": "definition code",
"depTaskCode": "dep task code",
"cycle": "day",
"dateValue": "today"
},
...
]
}
```
"""
for idx, operator in enumerate(operators):
# Use variable to keep one or more dependent item to test dependent operator behavior
dependent_item_list = []
for kwarg in kwargs:
dependent_item = DependentItem(**kwarg)
dependent_item_list.append(dependent_item)
op = operator(*dependent_item_list)
assert expect[idx] == op.get_define()
@pytest.mark.parametrize(
# Test dependent operator, Test dependent item parameters, expect operator define
"operators, args, expect",
[
# Test dependent operator (And | Or) with single dependent task list
(
(And, Or),
(
(And, Or),
(
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
),
),
[
{
"relation": par_op,
"dependTaskList": [
{
"relation": chr_op,
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
],
}
],
}
for (par_op, chr_op) in itertools.product(
TEST_OPERATOR_LIST, TEST_OPERATOR_LIST
)
],
),
# Test dependent operator (And | Or) with two dependent task list
(
(And, Or),
(
(And, Or),
(
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_WEEK,
},
),
),
[
{
"relation": par_op,
"dependTaskList": [
{
"relation": chr_op,
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "week",
"dateValue": DependentDate.LAST_WEEK,
},
],
}
],
}
for (par_op, chr_op) in itertools.product(
TEST_OPERATOR_LIST, TEST_OPERATOR_LIST
)
],
),
# Test dependent operator (And | Or) with multiply dependent task list
(
(And, Or),
(
(And, Or),
(
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_WEEK,
},
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_ONE_DAYS,
},
),
),
[
{
"relation": par_op,
"dependTaskList": [
{
"relation": chr_op,
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "week",
"dateValue": DependentDate.LAST_WEEK,
},
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "day",
"dateValue": DependentDate.LAST_ONE_DAYS,
},
],
}
],
}
for (par_op, chr_op) in itertools.product(
TEST_OPERATOR_LIST, TEST_OPERATOR_LIST
)
],
),
],
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": TEST_PROJECT_CODE,
"processDefinitionCode": TEST_DEFINITION_CODE,
"taskDefinitionCode": TEST_TASK_CODE,
},
)
def test_operator_dependent_task_list_multi_dependent_item(
mock_code_info,
operators: Tuple[DependentOperator],
args: Tuple[Union[Tuple, dict]],
expect: List[Dict],
):
"""Test DependentOperator(DependentOperator(DependentItem)) single operator function get_define.
Here we have test some cases as below. This test case only test single DependTaskList with one or
multiply dependItemList.
```py
{
"relation": "OR",
"dependTaskList": [
{
"relation": "AND",
"dependItemList": [
{
"projectCode": "project code",
"definitionCode": "definition code",
"depTaskCode": "dep task code",
"cycle": "day",
"dateValue": "today"
},
...
]
},
]
}
```
"""
# variable expect_idx record idx should be use to get specific expect
expect_idx = 0
for op_idx, operator in enumerate(operators):
dependent_operator = args[0]
dependent_item_kwargs = args[1]
for dop_idx, dpt_op in enumerate(dependent_operator):
dependent_item_list = []
for dpt_kwargs in dependent_item_kwargs:
dpti = DependentItem(**dpt_kwargs)
dependent_item_list.append(dpti)
child_dep_op = dpt_op(*dependent_item_list)
op = operator(child_dep_op)
assert expect[expect_idx] == op.get_define()
expect_idx += 1
def get_dep_task_list(*operator):
"""Return dependent task list from given operators list."""
result = []
for op in operator:
result.append(
{
"relation": op.operator_name(),
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "month",
"dateValue": DependentDate.LAST_MONTH_END,
},
],
}
)
return result
@pytest.mark.parametrize(
# Test dependent operator, Test dependent item parameters, expect operator define
"operators, args, expect",
[
# Test dependent operator (And | Or) with two dependent task list
(
(And, Or),
(
((And, And), (And, Or), (Or, And), (Or, Or)),
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
),
[
{
"relation": parent_op.operator_name(),
"dependTaskList": get_dep_task_list(*child_ops),
}
for parent_op in (And, Or)
for child_ops in ((And, And), (And, Or), (Or, And), (Or, Or))
],
),
# Test dependent operator (And | Or) with multiple dependent task list
(
(And, Or),
(
((And, And, And), (And, And, And, And), (And, And, And, And, And)),
{
"project_name": TEST_PROJECT,
"process_definition_name": TEST_PROCESS_DEFINITION,
"dependent_task_name": TEST_TASK,
"dependent_date": DependentDate.LAST_MONTH_END,
},
),
[
{
"relation": parent_op.operator_name(),
"dependTaskList": get_dep_task_list(*child_ops),
}
for parent_op in (And, Or)
for child_ops in (
(And, And, And),
(And, And, And, And),
(And, And, And, And, And),
)
],
),
],
)
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": TEST_PROJECT_CODE,
"processDefinitionCode": TEST_DEFINITION_CODE,
"taskDefinitionCode": TEST_TASK_CODE,
},
)
def test_operator_dependent_task_list_multi_dependent_list(
mock_code_info,
operators: Tuple[DependentOperator],
args: Tuple[Union[Tuple, dict]],
expect: List[Dict],
):
"""Test DependentOperator(DependentOperator(DependentItem)) multiply operator function get_define.
Here we have test some cases as below. This test case only test single DependTaskList with one or
multiply dependTaskList.
```py
{
"relation": "OR",
"dependTaskList": [
{
"relation": "AND",
"dependItemList": [
{
"projectCode": "project code",
"definitionCode": "definition code",
"depTaskCode": "dep task code",
"cycle": "day",
"dateValue": "today"
}
]
},
...
]
}
```
"""
# variable expect_idx record idx should be use to get specific expect
expect_idx = 0
for op_idx, operator in enumerate(operators):
dependent_operator = args[0]
dependent_item_kwargs = args[1]
for dop_idx, dpt_ops in enumerate(dependent_operator):
dependent_task_list = [
dpt_op(DependentItem(**dependent_item_kwargs)) for dpt_op in dpt_ops
]
op = operator(*dependent_task_list)
assert (
expect[expect_idx] == op.get_define()
), f"Failed with operator syntax {operator}.{dpt_ops}"
expect_idx += 1
@patch(
"pydolphinscheduler.tasks.dependent.DependentItem.get_code_from_gateway",
return_value={
"projectCode": TEST_PROJECT_CODE,
"processDefinitionCode": TEST_DEFINITION_CODE,
"taskDefinitionCode": TEST_TASK_CODE,
},
)
@patch(
"pydolphinscheduler.core.task.Task.gen_code_and_version",
return_value=(123, 1),
)
def test_dependent_get_define(mock_code_version, mock_dep_code):
"""Test task dependent function get_define."""
project_name = "test-dep-project"
process_definition_name = "test-dep-definition"
dependent_task_name = "test-dep-task"
dep_operator = And(
Or(
# test dependence with add tasks
DependentItem(
project_name=project_name,
process_definition_name=process_definition_name,
)
),
And(
# test dependence with specific task
DependentItem(
project_name=project_name,
process_definition_name=process_definition_name,
dependent_task_name=dependent_task_name,
)
),
)
name = "test_dependent_get_define"
expect = {
"code": 123,
"name": name,
"version": 1,
"description": None,
"delayTime": 0,
"taskType": "DEPENDENT",
"taskParams": {
"resourceList": [],
"localParams": [],
"dependence": {
"relation": "AND",
"dependTaskList": [
{
"relation": "OR",
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": "0",
"cycle": "day",
"dateValue": "today",
}
],
},
{
"relation": "AND",
"dependItemList": [
{
"projectCode": TEST_PROJECT_CODE,
"definitionCode": TEST_DEFINITION_CODE,
"depTaskCode": TEST_TASK_CODE,
"cycle": "day",
"dateValue": "today",
}
],
},
],
},
"conditionResult": {"successNode": [""], "failedNode": [""]},
"waitStartTimeout": {},
},
"flag": "YES",
"taskPriority": "MEDIUM",
"workerGroup": "default",
"failRetryTimes": 0,
"failRetryInterval": 1,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": None,
"timeout": 0,
}
task = Dependent(name, dependence=dep_operator)
assert task.get_define() == expect

37
dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

@ -62,11 +62,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.ComponentScan;
import py4j.GatewayServer;
@SpringBootApplication
@ComponentScan(value = "org.apache.dolphinscheduler")
public class PythonGatewayServer extends SpringBootServletInitializer {
private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class);
@ -428,6 +430,41 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
return result;
}
/**
* Get project, process definition, task code.
* Useful in Python API create dependent task which need processDefinition information.
*
* @param projectName project name which process definition belongs to
* @param processDefinitionName process definition name
* @param taskName task name
*/
public Map<String, Object> getDependentInfo(String projectName, String processDefinitionName, String taskName) {
Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByName(projectName);
if (project == null) {
String msg = String.format("Can not find valid project by name %s", projectName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
long projectCode = project.getCode();
result.put("projectCode", projectCode);
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName);
if (processDefinition == null) {
String msg = String.format("Can not find valid process definition by name %s", processDefinitionName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
result.put("processDefinitionCode", processDefinition.getCode());
if (taskName != null) {
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(projectCode, taskName);
result.put("taskDefinitionCode", taskDefinition.getCode());
}
return result;
}
@PostConstruct
public void run() {
GatewayServer server = new GatewayServer(this);

Loading…
Cancel
Save