From fcbb5f4d8f337236de1aab17a2d1f00b51c2d535 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Wed, 5 Jan 2022 19:57:28 +0800 Subject: [PATCH] [python] Enhance task datax example (#7801) * [python] Enhance task datax example * Add full example for `CustomDataX.json` * Add comment about datasource need to exists. close: #7800 * Add missing parameter setting --- .../examples/task_datax_example.py | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py index 183292178f..cdc7f0f860 100644 --- a/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_datax_example.py @@ -29,7 +29,48 @@ from pydolphinscheduler.core.process_definition import ProcessDefinition from pydolphinscheduler.tasks.datax import CustomDataX, DataX # datax json template -JSON_TEMPLATE = "" +JSON_TEMPLATE = { + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "usr", + "password": "pwd", + "column": ["id", "name", "code", "description"], + "splitPk": "id", + "connection": [ + { + "table": ["source_table"], + "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/source_db"], + } + ], + }, + }, + "writer": { + "name": "mysqlwriter", + "parameter": { + "writeMode": "insert", + "username": "usr", + "password": "pwd", + "column": ["id", "name"], + "connection": [ + { + "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/target_db", + "table": ["target_table"], + } + ], + }, + }, + } + ], + "setting": { + "errorLimit": {"percentage": 0, "record": 0}, + "speed": {"channel": 1, "record": 1000}, + }, + } +} with ProcessDefinition( name="task_datax_example", @@ -37,6 +78,8 @@ with ProcessDefinition( ) as pd: # This task synchronizes the data in `t_ds_project` # of `first_mysql` database to `target_project` of `second_mysql` database. + # You have to make sure data source named `first_mysql` and `second_mysql` exists + # in your environment. task1 = DataX( name="task_datax", datasource_name="first_mysql", @@ -45,6 +88,7 @@ with ProcessDefinition( target_table="target_table", ) - # you can custom json_template of datax to sync data. - task2 = CustomDataX(name="task_custom_datax", json=JSON_TEMPLATE) + # You can custom json_template of datax to sync data. This task create a new + # datax job same as task1, transfer record from `first_mysql` to `second_mysql` + task2 = CustomDataX(name="task_custom_datax", json=str(JSON_TEMPLATE)) pd.run()