diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py index 8b66b7996a..1415206368 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py @@ -22,11 +22,11 @@ This example will create five task in single workflow, with four shell task and condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow like: -pre_task_success_1 -> - \ -pre_task_success_2 -> --> conditions -> end - / -pre_task_fail -> +pre_task_1 -> -> success_branch + \ / +pre_task_2 -> -> conditions -> + / \ +pre_task_3 -> -> fail_branch . """ @@ -34,23 +34,24 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.tasks.condition import FAILURE, SUCCESS, And, Conditions from pydolphinscheduler.tasks.shell import Shell -with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd: - condition_pre_task_1 = Shell( - name="pre_task_success_1", command="echo pre_task_success_1" - ) - condition_pre_task_2 = Shell( - name="pre_task_success_2", command="echo pre_task_success_2" - ) - condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail") +with ProcessDefinition(name="task_conditions_example_1", tenant="tenant_exists") as pd: + pre_task_1 = Shell(name="pre_task_1", command="echo pre_task_1") + pre_task_2 = Shell(name="pre_task_2", command="echo pre_task_2") + pre_task_3 = Shell(name="pre_task_3", command="echo pre_task_3") cond_operator = And( And( - SUCCESS(condition_pre_task_1, condition_pre_task_2), - FAILURE(condition_pre_task_3), + SUCCESS(pre_task_1, pre_task_2), + FAILURE(pre_task_3), ), ) - end = Shell(name="end", command="echo parent") + success_branch = Shell(name="success_branch", command="success_branch parent") + fail_branch = Shell(name="fail_branch", command="echo fail_branch") - condition = Conditions(name="conditions", condition=cond_operator) - condition >> end + condition = Conditions( + name="conditions", + condition=cond_operator, + success_task=success_branch, + failed_task=fail_branch, + ) pd.submit() diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py index c9ca80cde8..9b4254ac48 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py @@ -29,10 +29,61 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.tasks.datax import CustomDataX, DataX # datax json template -JSON_TEMPLATE = "" +JSON_TEMPLATE = { + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "usr", + "password": "pwd", + "column": [ + "id", + "name", + "code", + "description" + ], + "splitPk": "id", + "connection": [ + { + "table": [ + "source_table" + ], + "jdbcUrl": [ + "jdbc:mysql://127.0.0.1:3306/source_db" + ] + } + ] + } + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "writeMode": "insert", + "username": "usr", + "password": "pwd", + "column": [ + "id", + "name" + ], + "connection": [ + { + "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db", + "table": [ + "target_table" + ] + } + ] + } + } + } + ] + } +} with ProcessDefinition( - name="task_datax", + name="task_datax_1", tenant="tenant_exists", ) as pd: # This task synchronizes the data in `t_ds_project` @@ -45,6 +96,7 @@ with ProcessDefinition( target_table="target_table", ) - # you can custom json_template of datax to sync data. - task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE) + # you can custom json_template of datax to sync data. This task create job + # same as task1 do + task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE)) pd.run() diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py index 5ab2aa50ad..b47b8e3bfd 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_switch_example.py @@ -34,7 +34,7 @@ from pydolphinscheduler.tasks.shell import Shell from pydolphinscheduler.tasks.switch import Branch, Default, Switch, SwitchCondition with ProcessDefinition( - name="task_dependent_external", + name="task_switch_example", tenant="tenant_exists", ) as pd: parent = Shell(name="parent", command="echo parent") diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py index d58bd753b0..52a8e97067 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py @@ -34,18 +34,14 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.tasks.shell import Shell with ProcessDefinition( - name="tutorial", - schedule="0 0 0 * * ? *", - start_time="2021-01-01", + name="aklsfkkalsfjkol", tenant="tenant_exists", ) as pd: - task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") task_child_one = Shell(name="task_child_one", command="echo 'child one'") task_child_two = Shell(name="task_child_two", command="echo 'child two'") task_union = Shell(name="task_union", command="echo union") task_group = [task_child_one, task_child_two] - task_parent.set_downstream(task_group) task_union << task_group diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py index 8a90efcd70..89fda5757d 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -26,6 +26,7 @@ from pydolphinscheduler.constants import ( TaskFlag, TaskPriority, TaskTimeoutFlag, + TaskType, ) from pydolphinscheduler.core.base import Base from pydolphinscheduler.core.process_definition import ( @@ -156,7 +157,8 @@ class Task(Base): self.resource_list = resource_list or [] self.dependence = dependence or {} self.wait_start_timeout = wait_start_timeout or {} - self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT + if task_type != TaskType.CONDITIONS: + self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT @property def process_definition(self) -> Optional[ProcessDefinition]: diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py index 905a41baa7..a5773b26f9 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py @@ -157,9 +157,19 @@ class Or(ConditionOperator): class Conditions(Task): """Task condition object, declare behavior for condition task to dolphinscheduler.""" - def __init__(self, name: str, condition: ConditionOperator, *args, **kwargs): + def __init__( + self, + name: str, + condition: ConditionOperator, + success_task: Task, + failed_task: Task, + *args, + **kwargs, + ): super().__init__(name, TaskType.CONDITIONS, *args, **kwargs) self.condition = condition + self.success_task = success_task + self.failed_task = failed_task # Set condition tasks as current task downstream self._set_dep() @@ -171,6 +181,15 @@ class Conditions(Task): for status in cond.args: upstream.extend(list(status.tasks)) self.set_upstream(upstream) + self.set_downstream([self.success_task, self.failed_task]) + + @property + def condition_result(self) -> Dict: + """Get condition result define for java gateway.""" + return { + "successNode": [self.success_task.code], + "failedNode": [self.failed_task.code], + } @property def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict: @@ -182,4 +201,5 @@ class Conditions(Task): """ params = super().task_params params["dependence"] = self.condition.get_define() + params["conditionResult"] = self.condition_result return params diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py index 9933c4f093..6085de23b0 100644 --- a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py @@ -324,7 +324,7 @@ def test_condition_operator_set_define_attr_mix_operator( "pydolphinscheduler.tasks.condition.Conditions.gen_code_and_version", return_value=(123, 1), ) -def test_dependent_get_define(mock_condition_code_version, mock_task_code_version): +def test_condition_get_define(mock_condition_code_version, mock_task_code_version): """Test task condition :func:`get_define`.""" common_task = Task(name="common_task", task_type="test_task_condition") cond_operator = And( @@ -372,7 +372,10 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio }, ], }, - "conditionResult": {"successNode": [""], "failedNode": [""]}, + "conditionResult": { + "successNode": [common_task.code], + "failedNode": [common_task.code], + }, "waitStartTimeout": {}, }, "flag": "YES", @@ -385,7 +388,9 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio "timeout": 0, } - task = Conditions(name, condition=cond_operator) + task = Conditions( + name, condition=cond_operator, success_task=common_task, failed_task=common_task + ) assert task.get_define() == expect @@ -396,49 +401,60 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio def test_condition_set_dep_workflow(mock_task_code_version): """Test task condition set dependence in workflow level.""" with ProcessDefinition(name="test-condition-set-dep-workflow") as pd: - condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE) - condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE) - condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE) + pre_task_1 = Task(name="pre_task_1", task_type=TEST_TYPE) + pre_task_2 = Task(name="pre_task_2", task_type=TEST_TYPE) + pre_task_3 = Task(name="pre_task_3", task_type=TEST_TYPE) cond_operator = And( And( - SUCCESS(condition_pre_task_1, condition_pre_task_2), - FAILURE(condition_pre_task_3), + SUCCESS(pre_task_1, pre_task_2), + FAILURE(pre_task_3), ), ) - end = Task(name="end", task_type=TEST_TYPE) - condition = Conditions(name="conditions", condition=cond_operator) - condition >> end + success_branch = Task(name="success_branch", task_type=TEST_TYPE) + fail_branch = Task(name="fail_branch", task_type=TEST_TYPE) + + condition = Conditions( + name="conditions", + condition=cond_operator, + success_task=success_branch, + failed_task=fail_branch, + ) # General tasks test - assert len(pd.tasks) == 5 + assert len(pd.tasks) == 6 assert sorted(pd.task_list, key=lambda t: t.name) == sorted( [ + pre_task_1, + pre_task_2, + pre_task_3, + success_branch, + fail_branch, condition, - condition_pre_task_1, - condition_pre_task_2, - condition_pre_task_3, - end, ], key=lambda t: t.name, ) # Task dep test - assert end._upstream_task_codes == {condition.code} - assert condition._downstream_task_codes == {end.code} + assert success_branch._upstream_task_codes == {condition.code} + assert fail_branch._upstream_task_codes == {condition.code} + assert condition._downstream_task_codes == { + success_branch.code, + fail_branch.code, + } # Condition task dep after ProcessDefinition function get_define called assert condition._upstream_task_codes == { - condition_pre_task_1.code, - condition_pre_task_2.code, - condition_pre_task_3.code, + pre_task_1.code, + pre_task_2.code, + pre_task_3.code, } assert all( [ child._downstream_task_codes == {condition.code} for child in [ - condition_pre_task_1, - condition_pre_task_2, - condition_pre_task_3, + pre_task_1, + pre_task_2, + pre_task_3, ] ] )