diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py b/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py new file mode 100644 index 0000000000..1ef259e57a --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/task_flink_example.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""A example workflow for task flink.""" + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.flink import DeployMode, Flink, ProgramType + +with ProcessDefinition(name="task_flink_example", tenant="tenant_exists") as pd: + task = Flink( + name="task_flink", + main_class="org.apache.flink.streaming.examples.wordcount.WordCount", + main_package="WordCount.jar", + program_type=ProgramType.JAVA, + deploy_mode=DeployMode.LOCAL, + ) + pd.run() diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py index 84becbcb4c..4619c91068 100644 --- a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -77,6 +77,7 @@ class TaskType(str): DEPENDENT = "DEPENDENT" CONDITIONS = "CONDITIONS" SWITCH = "SWITCH" + FLINK = "FLINK" class DefaultTaskCodeNum(str): diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py new file mode 100644 index 0000000000..f732a1540c --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/flink.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Task Flink.""" + +from typing import Dict, Optional + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task +from pydolphinscheduler.java_gateway import launch_gateway + + +class ProgramType(str): + """Type of program flink runs, for now it just contain `JAVA`, `SCALA` and `PYTHON`.""" + + JAVA = "JAVA" + SCALA = "SCALA" + PYTHON = "PYTHON" + + +class FlinkVersion(str): + """Flink version, for now it just contain `HIGHT` and `LOW`.""" + + LOW_VERSION = "<1.10" + HIGHT_VERSION = ">=1.10" + + +class DeployMode(str): + """Flink deploy mode, for now it just contain `LOCAL` and `CLUSTER`.""" + + LOCAL = "local" + CLUSTER = "cluster" + + +class Flink(Task): + """Task flink object, declare behavior for flink task to dolphinscheduler.""" + + _task_custom_attr = { + "main_class", + "main_jar", + "deploy_mode", + "flink_version", + "slot", + "task_manager", + "job_manager_memory", + "task_manager_memory", + "app_name", + "program_type", + "parallelism", + "main_args", + "others", + } + + def __init__( + self, + name: str, + main_class: str, + main_package: str, + program_type: Optional[ProgramType] = ProgramType.SCALA, + deploy_mode: Optional[DeployMode] = DeployMode.CLUSTER, + flink_version: Optional[FlinkVersion] = FlinkVersion.LOW_VERSION, + app_name: Optional[str] = None, + job_manager_memory: Optional[str] = "1G", + task_manager_memory: Optional[str] = "2G", + slot: Optional[int] = 1, + task_manager: Optional[int] = 2, + parallelism: Optional[int] = 1, + main_args: Optional[str] = None, + others: Optional[str] = None, + *args, + **kwargs + ): + super().__init__(name, TaskType.FLINK, *args, **kwargs) + self.main_class = main_class + self.main_package = main_package + self.program_type = program_type + self.deploy_mode = deploy_mode + self.flink_version = flink_version + self.app_name = app_name + self.job_manager_memory = job_manager_memory + self.task_manager_memory = task_manager_memory + self.slot = slot + self.task_manager = task_manager + self.parallelism = parallelism + self.main_args = main_args + self.others = others + self._resource = {} + + @property + def main_jar(self) -> Dict: + """Return main package of dict.""" + resource_info = self.get_resource_info(self.program_type, self.main_package) + return {"id": resource_info.get("id")} + + def get_resource_info(self, program_type, main_package) -> Dict: + """Get resource info from java gateway, contains resource id, name.""" + if not self._resource: + self._resource = launch_gateway().entry_point.getResourcesFileInfo( + program_type, + main_package, + ) + + return self._resource diff --git a/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py new file mode 100644 index 0000000000..743bdae3fa --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_flink.py @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Test Task Flink.""" + +from unittest.mock import patch + +from pydolphinscheduler.tasks.flink import DeployMode, Flink, FlinkVersion, ProgramType + + +@patch( + "pydolphinscheduler.tasks.flink.Flink.get_resource_info", + return_value=({"id": 1, "name": "test"}), +) +def test_flink_get_define(mock_resource): + """Test task flink function get_define.""" + code = 123 + version = 1 + name = "test_flink_get_define" + main_class = "org.apache.flink.test_main_class" + main_package = "test_main_package" + program_type = ProgramType.JAVA + deploy_mode = DeployMode.LOCAL + + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "FLINK", + "taskParams": { + "mainClass": main_class, + "mainJar": { + "id": 1, + }, + "programType": program_type, + "deployMode": deploy_mode, + "flinkVersion": FlinkVersion.LOW_VERSION, + "slot": 1, + "parallelism": 1, + "taskManager": 2, + "jobManagerMemory": "1G", + "taskManagerMemory": "2G", + "appName": None, + "mainArgs": None, + "others": None, + "localParams": [], + "resourceList": [], + "dependence": {}, + "conditionResult": {"successNode": [""], "failedNode": [""]}, + "waitStartTimeout": {}, + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "default", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0, + } + with patch( + "pydolphinscheduler.core.task.Task.gen_code_and_version", + return_value=(code, version), + ): + task = Flink(name, main_class, main_package, program_type, deploy_mode) + assert task.get_define() == expect diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java index bc508520b4..ef6deb45da 100644 --- a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -17,11 +17,13 @@ package org.apache.dolphinscheduler.server; +import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.QueueService; +import org.apache.dolphinscheduler.api.service.ResourcesService; import org.apache.dolphinscheduler.api.service.SchedulerService; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.service.TenantService; @@ -31,6 +33,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum; +import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.enums.RunMode; import org.apache.dolphinscheduler.common.enums.TaskDependType; @@ -50,11 +53,13 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; +import org.apache.dolphinscheduler.spi.enums.ResourceType; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -68,10 +73,12 @@ import org.springframework.context.annotation.ComponentScan; import py4j.GatewayServer; +import org.apache.commons.collections.CollectionUtils; + @SpringBootApplication @ComponentScan(value = "org.apache.dolphinscheduler") public class PythonGatewayServer extends SpringBootServletInitializer { - private static final Logger LOGGER = LoggerFactory.getLogger(PythonGatewayServer.class); + private static final Logger logger = LoggerFactory.getLogger(PythonGatewayServer.class); private static final WarningType DEFAULT_WARNING_TYPE = WarningType.NONE; private static final int DEFAULT_WARNING_GROUP_ID = 0; @@ -107,6 +114,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer { @Autowired private QueueService queueService; + @Autowired + private ResourcesService resourceService; + @Autowired private ProjectMapper projectMapper; @@ -244,7 +254,7 @@ public class PythonGatewayServer extends SpringBootServletInitializer { processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); } else if (verifyStatus != Status.SUCCESS) { String msg = "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; - LOGGER.error(msg); + logger.error(msg); throw new RuntimeException(msg); } @@ -465,6 +475,30 @@ public class PythonGatewayServer extends SpringBootServletInitializer { return result; } + /** + * Get resource by given program type and full name. It return map contain resource id, name. + * Useful in Python API create flink task which need processDefinition information. + * + * @param programType program type one of SCALA, JAVA and PYTHON + * @param fullName full name of the resource + */ + public Map getResourcesFileInfo(String programType, String fullName) { + Map result = new HashMap<>(); + + Map resources = resourceService.queryResourceByProgramType(dummyAdminUser, ResourceType.FILE, ProgramType.valueOf(programType)); + List resourcesComponent = (List) resources.get(Constants.DATA_LIST); + List namedResources = resourcesComponent.stream().filter(s -> fullName.equals(s.getFullName())).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(namedResources)) { + String msg = String.format("Can not find valid resource by program type %s and name %s", programType, fullName); + logger.error(msg); + throw new IllegalArgumentException(msg); + } + + result.put("id", namedResources.get(0).getId()); + result.put("name", namedResources.get(0).getName()); + return result; + } + @PostConstruct public void run() { GatewayServer server = new GatewayServer(this);