diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py new file mode 100644 index 0000000000..75e09a9aea --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_map_reduce_example.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""A example workflow for task mr.""" + +from pydolphinscheduler.core.engine import ProgramType +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.map_reduce import MR + +with ProcessDefinition(name="task_map_reduce_example", tenant="tenant_exists") as pd: + task = MR( + name="task_mr", + main_class="wordcount", + main_package="hadoop-mapreduce-examples-3.3.1.jar", + program_type=ProgramType.JAVA, + main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds", + ) + pd.run() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 7f63a82194..7bd71b985b 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -79,6 +79,7 @@ class TaskType(str): SWITCH = "SWITCH" FLINK = "FLINK" SPARK = "SPARK" + MR = "MR" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py new file mode 100644 index 0000000000..5050bd3cf1 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/map_reduce.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task MR.""" + +from typing import Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.engine import Engine, ProgramType + + +class MR(Engine): + """Task mr object, declare behavior for mr task to dolphinscheduler.""" + + _task_custom_attr = { + "app_name", + "main_args", + "others", + } + + def __init__( + self, + name: str, + main_class: str, + main_package: str, + program_type: Optional[ProgramType] = ProgramType.SCALA, + app_name: Optional[str] = None, + main_args: Optional[str] = None, + others: Optional[str] = None, + *args, + **kwargs + ): + super().__init__( + name, TaskType.MR, main_class, main_package, program_type, *args, **kwargs + ) + self.app_name = app_name + self.main_args = main_args + self.others = others diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py new file mode 100644 index 0000000000..dbe9e513f5 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_map_reduce.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task MR.""" + +from unittest.mock import patch + +from pydolphinscheduler.tasks.map_reduce import MR, ProgramType + + +@patch( + "pydolphinscheduler.core.engine.Engine.get_resource_info", + return_value=({"id": 1, "name": "test"}), +) +def test_mr_get_define(mock_resource): + """Test task mr function get_define.""" + code = 123 + version = 1 + name = "test_mr_get_define" + main_class = "org.apache.mr.test_main_class" + main_package = "test_main_package" + program_type = ProgramType.JAVA + main_args = "/dolphinscheduler/resources/file.txt /output/ds" + + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "MR", + "taskParams": { + "mainClass": main_class, + "mainJar": { + "id": 1, + }, + "programType": program_type, + "appName": None, + "mainArgs": main_args, + "others": None, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + task = MR(name, main_class, main_package, program_type, main_args=main_args) + assert task.get_define() == expect