From dd6ed36f65d2add3bf8e31cad24ab25f4606c9d9 Mon Sep 17 00:00:00 2001 From: Jiajie Zhong Date: Sun, 31 Oct 2021 20:35:46 +0800 Subject: [PATCH] Add Python API implementation of workflows-as-code (#6269) * Init DS python SDK pydolphinscheduler: python code definition * Doc first * Add quick start and developer doc * Java documentation change * Add LICENSE-py4j.txt * Add py4j to release-docs/LICENSE * Move dependency version to parent pom * Remove outdated code * Add tenant parameter to tutorial --- .gitignore | 5 + .../api/service/ProjectService.java | 9 + .../api/service/QueueService.java | 8 + .../api/service/TenantService.java | 16 + .../api/service/impl/ProjectServiceImpl.java | 15 + .../api/service/impl/QueueServiceImpl.java | 26 ++ .../api/service/impl/TenantServiceImpl.java | 19 +- .../dolphinscheduler/common/Constants.java | 1 + .../dao/mapper/QueueMapper.java | 7 + .../dao/mapper/QueueMapper.xml | 9 + dolphinscheduler-dist/release-docs/LICENSE | 2 +- .../release-docs/licenses/LICENSE-py4j.txt | 26 ++ dolphinscheduler-python/pom.xml | 60 ++++ .../pydolphinscheduler/README.md | 103 ++++++ .../pydolphinscheduler/ROADMAP.md | 34 ++ .../pydolphinscheduler/examples/tutorial.py | 43 +++ .../pydolphinscheduler/requirements.txt | 18 + .../pydolphinscheduler/requirements_dev.txt | 24 ++ .../pydolphinscheduler/setup.cfg | 16 + .../pydolphinscheduler/setup.py | 90 +++++ .../pydolphinscheduler/src/__init__.py | 16 + .../src/pydolphinscheduler/__init__.py | 16 + .../src/pydolphinscheduler/constants.py | 74 +++++ .../src/pydolphinscheduler/core/__init__.py | 16 + .../src/pydolphinscheduler/core/base.py | 72 ++++ .../src/pydolphinscheduler/core/base_side.py | 43 +++ .../core/process_definition.py | 249 ++++++++++++++ .../src/pydolphinscheduler/core/task.py | 237 +++++++++++++ .../src/pydolphinscheduler/java_gateway.py | 43 +++ .../src/pydolphinscheduler/side/__init__.py | 20 ++ .../src/pydolphinscheduler/side/project.py | 45 +++ .../src/pydolphinscheduler/side/queue.py | 44 +++ .../src/pydolphinscheduler/side/tenant.py | 45 +++ .../src/pydolphinscheduler/side/user.py | 68 ++++ .../pydolphinscheduler/side/worker_group.py | 35 ++ .../src/pydolphinscheduler/tasks/__init__.py | 16 + .../src/pydolphinscheduler/tasks/shell.py | 34 ++ .../src/pydolphinscheduler/utils/__init__.py | 16 + .../src/pydolphinscheduler/utils/string.py | 30 ++ .../pydolphinscheduler/test/__init__.py | 16 + .../pydolphinscheduler/test/core/__init__.py | 16 + .../test/core/test_process_definition.py | 118 +++++++ .../pydolphinscheduler/test/core/test_task.py | 96 ++++++ .../test/example/__init__.py | 16 + .../pydolphinscheduler/test/tasks/__init__.py | 16 + .../test/tasks/test_shell.py | 61 ++++ .../test/test_java_gateway.py | 46 +++ .../server/PythonGatewayServer.java | 310 ++++++++++++++++++ dolphinscheduler-standalone-server/pom.xml | 4 + .../server/StandaloneServer.java | 3 +- pom.xml | 13 + tools/dependencies/known-dependencies.txt | 1 + 52 files changed, 2363 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt create mode 100644 dolphinscheduler-python/pom.xml create mode 100644 dolphinscheduler-python/pydolphinscheduler/README.md create mode 100644 dolphinscheduler-python/pydolphinscheduler/ROADMAP.md create mode 100644 dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/requirements.txt create mode 100644 dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt create mode 100644 dolphinscheduler-python/pydolphinscheduler/setup.cfg create mode 100644 dolphinscheduler-python/pydolphinscheduler/setup.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py create mode 100644 dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py create mode 100644 dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java diff --git a/.gitignore b/.gitignore index 9011db1479..3dccfdf899 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,8 @@ dolphinscheduler-server/src/main/resources/logback.xml dolphinscheduler-ui/dist dolphinscheduler-ui/node docker/build/apache-dolphinscheduler* + +# pydolphinscheduler +__pycache__/ +build/ +*egg-info/ diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java index dffa866ac8..df05dee106 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProjectService.java @@ -46,6 +46,15 @@ public interface ProjectService { */ Map queryByCode(User loginUser, long projectCode); + /** + * query project details by name + * + * @param loginUser login user + * @param projectName project name + * @return project detail information + */ + Map queryByName(User loginUser, String projectName); + /** * check project and authorization * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java index 7013520422..f978b96b6b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/QueueService.java @@ -76,4 +76,12 @@ public interface QueueService { */ Result verifyQueue(String queue, String queueName); + /** + * query queue by queueName + * + * @param queueName queue name + * @return queue object for provide queue name + */ + Map queryQueueName(String queueName); + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java index 30d98a130a..47a4082a30 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TenantService.java @@ -92,4 +92,20 @@ public interface TenantService { * @return true if tenant code can user, otherwise return false */ Result verifyTenantCode(String tenantCode); + + /** + * check if provide tenant code object exists + * + * @param tenantCode tenant code + * @return true if tenant code exists, false if not + */ + boolean checkTenantExists(String tenantCode); + + /** + * query tenant by tenant code + * + * @param tenantCode tenant code + * @return tenant list + */ + Map queryByTenantCode(String tenantCode); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java index 7d49cfaae0..9fc5aab84d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProjectServiceImpl.java @@ -139,6 +139,21 @@ public class ProjectServiceImpl extends BaseServiceImpl implements ProjectServic return result; } + @Override + public Map queryByName(User loginUser, String projectName) { + Map result = new HashMap<>(); + Project project = projectMapper.queryByName(projectName); + boolean hasProjectAndPerm = hasProjectAndPerm(loginUser, project, result); + if (!hasProjectAndPerm) { + return result; + } + if (project != null) { + result.put(Constants.DATA_LIST, project); + putMsg(result, Status.SUCCESS); + } + return result; + } + /** * check project and authorization * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java index edd0e1d819..e29883307e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java @@ -262,6 +262,32 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { return result; } + /** + * query queue by queueName + * + * @param queueName queue name + * @return queue object for provide queue name + */ + @Override + public Map queryQueueName(String queueName) { + Map result = new HashMap<>(); + + if (StringUtils.isEmpty(queueName)) { + putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.QUEUE_NAME); + return result; + } + + if (!checkQueueNameExist(queueName)) { + putMsg(result, Status.QUEUE_NOT_EXIST, queueName); + return result; + } + + List queueList = queueMapper.queryQueueName(queueName); + result.put(Constants.DATA_LIST, queueList); + putMsg(result, Status.SUCCESS); + return result; + } + /** * check queue exist * if exists return true,not exists return false diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index fc01bd7d20..58ab8656f9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -320,8 +320,25 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService * @param tenantCode tenant code * @return ture if the tenant code exists, otherwise return false */ - private boolean checkTenantExists(String tenantCode) { + public boolean checkTenantExists(String tenantCode) { Boolean existTenant = tenantMapper.existTenant(tenantCode); return existTenant == Boolean.TRUE; } + + /** + * query tenant by tenant code + * + * @param tenantCode tenant code + * @return tenant detail information + */ + @Override + public Map queryByTenantCode(String tenantCode) { + Map result = new HashMap<>(); + Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); + if (tenant != null) { + result.put(Constants.DATA_LIST, tenant); + putMsg(result, Status.SUCCESS); + } + return result; + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index e662347a90..fc9e36fd9f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -415,6 +415,7 @@ public final class Constants { public static final String NULL = "NULL"; public static final String THREAD_NAME_MASTER_SERVER = "Master-Server"; public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server"; + public static final String THREAD_NAME_GATEWAY_SERVER = "Gateway-Server"; /** * command parameter keys diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java index 027bfd2b52..e48607007d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/QueueMapper.java @@ -53,4 +53,11 @@ public interface QueueMapper extends BaseMapper { * @return true if exist else return null */ Boolean existQueue(@Param("queue") String queue, @Param("queueName") String queueName); + + /** + * query queue by queue name + * @param queueName queueName + * @return queue list + */ + List queryQueueName(@Param("queueName") String queueName); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml index 0d75c6e364..cf381e494a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/QueueMapper.xml @@ -54,4 +54,13 @@ and queue_name =#{queueName} + diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index 63ab3f63cc..937216f9b6 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -436,7 +436,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. threetenbp 1.3.6: https://mvnrepository.com/artifact/org.threeten/threetenbp/1.3.6, BSD 3-clause xmlenc 0.52: https://mvnrepository.com/artifact/xmlenc/xmlenc/0.52, BSD hamcrest-core 1.3: https://mvnrepository.com/artifact/org.hamcrest/hamcrest-core/1.3, BSD 2-Clause - + py4j 0.10.9: https://mvnrepository.com/artifact/net.sf.py4j/py4j/0.10.9, BSD 2-clause ======================================================================== CDDL licenses diff --git a/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt b/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt new file mode 100644 index 0000000000..2c7adb6928 --- /dev/null +++ b/dolphinscheduler-dist/release-docs/licenses/LICENSE-py4j.txt @@ -0,0 +1,26 @@ +Copyright (c) 2009-2018, Barthelemy Dagenais and individual contributors. All +rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +- Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +- Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +- The name of the author may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/dolphinscheduler-python/pom.xml b/dolphinscheduler-python/pom.xml new file mode 100644 index 0000000000..ce7968cd1f --- /dev/null +++ b/dolphinscheduler-python/pom.xml @@ -0,0 +1,60 @@ + + + + +4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler + 2.0.0-SNAPSHOT + +dolphinscheduler-python +${project.artifactId} +jar + + + + + org.apache.dolphinscheduler + dolphinscheduler-api + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-tomcat + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + + net.sf.py4j + py4j + + + + diff --git a/dolphinscheduler-python/pydolphinscheduler/README.md b/dolphinscheduler-python/pydolphinscheduler/README.md new file mode 100644 index 0000000000..a987483360 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/README.md @@ -0,0 +1,103 @@ + + +# pydolphinscheduler + +pydolphinscheduler is python API for Apache DolphinScheduler, which allow you definition +your workflow by python code, aka workflow-as-codes. + +## Quick Start + +> **_Notice:_** For now, due to pydolphinscheduler without release to any binary tarball or [PyPI][pypi], you +> have to clone Apache DolphinScheduler code from GitHub to ensure quick start setup + +Here we show you how to install and run a simple example of pydolphinscheduler + +### Prepare + +```shell +# Clone code from github +git clone git@github.com:apache/dolphinscheduler.git + +# Install pydolphinscheduler from source +cd dolphinscheduler-python/pydolphinscheduler +pip setup.py install +``` + +### Start Server And Run Example + +Before you run an example, you have to start backend server. You could follow [development setup][dev-setup] +section "DolphinScheduler Standalone Quick Start" to set up developer environment. You have to start backend +and frontend server in this step, which mean that you could view DolphinScheduler UI in your browser with URL +http://localhost:12345/dolphinscheduler + +After backend server is being start, all requests from `pydolphinscheduler` would be sends to backend server. +And for now we could run a simple example by: + +```shell +cd dolphinscheduler-python/pydolphinscheduler +python example/tutorial.py +``` + +> **_NOTICE:_** Since Apache DolphinScheduler's tenant is requests while running command, you might need to change +> tenant value in `example/tutorial.py`. For now the value is `tenant_exists`, please change it to username exists +> in you environment. + +After command execute, you could see a new project with single process definition named *tutorial* in the [UI][ui-project]. + +Until now, we finish quick start by an example of pydolphinscheduler and run it. If you want to inspect or join +pydolphinscheduler develop, you could take a look at [develop](#develop) + +## Develop + +pydolphinscheduler is python API for Apache DolphinScheduler, it just defines what workflow look like instead of +store or execute it. We here use [py4j][py4j] to dynamically access Java Virtual Machine. + +### Setup Develop Environment + +We already clone the code in [quick start](#quick-start), so next step we have to open pydolphinscheduler project +in you editor. We recommend you use [pycharm][pycharm] instead of [IntelliJ IDEA][idea] to open it. And you could +just open directory `dolphinscheduler-python/pydolphinscheduler` instead of `dolphinscheduler-python`. + +### Brief Concept + +Apache DolphinScheduler is design to define workflow by UI, and pydolphinscheduler try to define it by code. When +define by code, user usually do not care user, tanant, or queue exists or not. All user care about is create +a new workflow by the code his/her definition. So we have some **side object** in `pydolphinscheduler/side` +directory, their only check object exists or not, and create them if not exists. + +#### Process Definition + +pydolphinscheduler workflow object name, process definition is also same name as Java object(maybe would be change to +other word for more simple). + +#### Tasks + +pydolphinscheduler tasks object, we use tasks to define exact job we want DolphinScheduler do for us. For now, +we only support `shell` task to execute shell task. [This link][all-task] list all tasks support in DolphinScheduler +and would be implement in the further. + + +[pypi]: https://pypi.org/ +[dev-setup]: https://dolphinscheduler.apache.org/en-us/development/development-environment-setup.html +[ui-project]: http://8.142.34.29:12345/dolphinscheduler/ui/#/projects/list +[py4j]: https://www.py4j.org/index.html +[pycharm]: https://www.jetbrains.com/pycharm +[idea]: https://www.jetbrains.com/idea/ +[all-task]: https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/guide/task/shell.html diff --git a/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md new file mode 100644 index 0000000000..32ad5e2b39 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/ROADMAP.md @@ -0,0 +1,34 @@ + + +## Roadmap + +### v0.0.3 + +Add other features, tasks, parameters in DS, keep code coverage up to 90% + +### v0.0.2 + +Add docs about how to use and develop package, code coverage up to 90%, add CI/CD +for package + +### v0.0.1(current) + +Setup up POC, for defining DAG with python code, running DAG manually, +releasing to pypi \ No newline at end of file diff --git a/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py new file mode 100644 index 0000000000..775653448b --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +r""" +After tutorial.py file submit to Apache DolphinScheduler server a DAG would be create, +and workflow DAG graph as below: + + --> task_child_one + / \ +task_parent --> --> task_union + \ / + --> task_child_two +""" + +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.tasks.shell import Shell + +with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd: + task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler") + task_child_one = Shell(name="task_child_one", command="echo 'child one'") + task_child_two = Shell(name="task_child_two", command="echo 'child two'") + task_union = Shell(name="task_union", command="echo union") + + task_group = [task_child_one, task_child_two] + task_parent.set_downstream(task_group) + + task_union << task_group + + pd.run() diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements.txt b/dolphinscheduler-python/pydolphinscheduler/requirements.txt new file mode 100644 index 0000000000..cdec3cabb2 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/requirements.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +py4j~=0.10.9.2 \ No newline at end of file diff --git a/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt new file mode 100644 index 0000000000..be98ce9658 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# testting +pytest~=6.2.5 +# code linting and formatting +flake8-black~=0.2.3 +# flake8 +# flake8-docstrings +# flake8-black diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.cfg b/dolphinscheduler-python/pydolphinscheduler/setup.cfg new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/setup.cfg @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/setup.py b/dolphinscheduler-python/pydolphinscheduler/setup.py new file mode 100644 index 0000000000..8e9cea44e9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/setup.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import sys +from os.path import dirname, join + +from setuptools import find_packages, setup + +version = '0.0.1.dev0' + +if sys.version_info[0] < 3: + raise Exception("pydolphinscheduler does not support Python 2. Please upgrade to Python 3.") + + +def read(*names, **kwargs): + return open( + join(dirname(__file__), *names), encoding=kwargs.get("encoding", "utf8") + ).read() + + +setup( + name="pydolphinscheduler", + version=version, + license="Apache License 2.0", + description="Apache DolphinScheduler python SDK", + long_description=read("README.md"), + # Make sure pypi is expecting markdown + long_description_content_type="text/markdown", + author="Apache Software Foundation", + author_email="dev@dolphinscheduler.apache.org", + url="https://dolphinscheduler.apache.org/", + python_requires=">=3.6", + keywords=[ + "dolphinscheduler", + "workflow", + "scheduler", + "taskflow", + ], + project_urls={ + "Homepage": "https://dolphinscheduler.apache.org", + "Documentation": "https://dolphinscheduler.apache.org/en-us/docs/latest/user_doc/quick-start.html", + "Source": "https://github.com/apache/dolphinscheduler", + "Issue Tracker": "https://github.com/apache/dolphinscheduler/issues", + "Discussion": "https://github.com/apache/dolphinscheduler/discussions", + "Twitter": "https://twitter.com/dolphinschedule", + }, + packages=find_packages(where="src"), + package_dir={"": "src"}, + include_package_data=True, + classifiers=[ + # complete classifier list: http://pypi.python.org/pypi?%3Aaction=list_classifiers + "Development Status :: 1 - Planning", + "Environment :: Console", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Operating System :: Unix", + "Operating System :: POSIX", + "Operating System :: Microsoft :: Windows", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", + "Topic :: Software Development :: User Interfaces", + ], + install_requires=[ + # Core + "py4j~=0.10", + # Dev + "pytest~=6.2", + ] +) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py new file mode 100644 index 0000000000..eda07aa3e6 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +class ProcessDefinitionReleaseState: + """ + ProcessDefinition release state + """ + ONLINE: str = "ONLINE" + OFFLINE: str = "OFFLINE" + + +class ProcessDefinitionDefault: + """ + ProcessDefinition default values + """ + PROJECT: str = "project-pydolphin" + TENANT: str = "tenant_pydolphin" + USER: str = "userPythonGateway" + # TODO simple set password same as username + USER_PWD: str = "userPythonGateway" + USER_EMAIL: str = "userPythonGateway@dolphinscheduler.com" + USER_PHONE: str = "11111111111" + USER_STATE: int = 1 + QUEUE: str = "queuePythonGateway" + WORKER_GROUP: str = "default" + + +class TaskPriority(str): + HIGHEST = "HIGHEST" + HIGH = "HIGH" + MEDIUM = "MEDIUM" + LOW = "LOW" + LOWEST = "LOWEST" + + +class TaskFlag(str): + YES = "YES" + NO = "NO" + + +class TaskTimeoutFlag(str): + CLOSE = "CLOSE" + + +class TaskType(str): + SHELL = "SHELL" + + +class DefaultTaskCodeNum(str): + DEFAULT = 1 + + +class JavaGatewayDefault(str): + RESULT_MESSAGE_KEYWORD = "msg" + RESULT_MESSAGE_SUCCESS = "success" + + RESULT_STATUS_KEYWORD = "status" + RESULT_STATUS_SUCCESS = "SUCCESS" + + RESULT_DATA = "data" diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py new file mode 100644 index 0000000000..175754fc65 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional, Dict + +# from pydolphinscheduler.side.user import User +from pydolphinscheduler.utils.string import attr2camel + + +class Base: + """ + Base + """ + + _KEY_ATTR: set = { + "name", + "description" + } + + _TO_DICT_ATTR: set = set() + + DEFAULT_ATTR: Dict = {} + + def __init__( + self, + name: str, + description: Optional[str] = None + ): + self.name = name + self.description = description + + def __repr__(self) -> str: + return f'<{type(self).__name__}: name="{self.name}">' + + def __eq__(self, other): + return type(self) == type(other) and \ + all(getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR) + + # TODO check how Redash do + # TODO DRY + def to_dict(self, camel_attr=True) -> Dict: + # content = {} + # for attr, value in self.__dict__.items(): + # # Don't publish private variables + # if attr.startswith("_"): + # continue + # else: + # content[snake2camel(attr)] = value + # content.update(self.DEFAULT_ATTR) + # return content + content = {} + for attr in self._TO_DICT_ATTR: + val = getattr(self, attr, None) + if camel_attr: + content[attr2camel(attr)] = val + else: + content[attr] = val + return content diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py new file mode 100644 index 0000000000..cf0f14e4af --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base_side.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pydolphinscheduler.constants import ProcessDefinitionDefault +from pydolphinscheduler.core.base import Base + + +class BaseSide(Base): + def __init__( + self, + name: str, + description: Optional[str] = None + ): + super().__init__(name, description) + + @classmethod + def create_if_not_exists( + cls, + # TODO comment for avoiding cycle import + # user: Optional[User] = ProcessDefinitionDefault.USER + user=ProcessDefinitionDefault.USER + ): + """ + Create Base if not exists + """ + + raise NotImplementedError diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py new file mode 100644 index 0000000000..8d295d0809 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py @@ -0,0 +1,249 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import json +from typing import Optional, List, Dict, Set + +from pydolphinscheduler.constants import ProcessDefinitionReleaseState, ProcessDefinitionDefault +from pydolphinscheduler.core.base import Base +from pydolphinscheduler.java_gateway import launch_gateway +from pydolphinscheduler.side import Tenant, Project, User + + +class ProcessDefinitionContext: + _context_managed_process_definition: Optional["ProcessDefinition"] = None + + @classmethod + def set(cls, pd: "ProcessDefinition") -> None: + cls._context_managed_process_definition = pd + + @classmethod + def get(cls) -> Optional["ProcessDefinition"]: + return cls._context_managed_process_definition + + @classmethod + def delete(cls) -> None: + cls._context_managed_process_definition = None + + +class ProcessDefinition(Base): + """ + ProcessDefinition + TODO :ref: comment may not correct ref + TODO: maybe we should rename this class, currently use DS object name + """ + + # key attribute for identify ProcessDefinition object + _KEY_ATTR = { + "name", + "project", + "tenant", + "release_state", + "param", + } + + _TO_DICT_ATTR = { + "name", + "description", + "_project", + "_tenant", + "timeout", + "release_state", + "param", + "tasks", + "task_definition_json", + "task_relation_json", + } + + def __init__( + self, + name: str, + description: Optional[str] = None, + user: Optional[str] = ProcessDefinitionDefault.USER, + project: Optional[str] = ProcessDefinitionDefault.PROJECT, + tenant: Optional[str] = ProcessDefinitionDefault.TENANT, + queue: Optional[str] = ProcessDefinitionDefault.QUEUE, + timeout: Optional[int] = 0, + release_state: Optional[str] = ProcessDefinitionReleaseState.ONLINE, + param: Optional[List] = None + ): + super().__init__(name, description) + self._user = user + self._project = project + self._tenant = tenant + self._queue = queue + self.timeout = timeout + self.release_state = release_state + self.param = param + self.tasks: dict = {} + # TODO how to fix circle import + self._task_relations: set["TaskRelation"] = set() + self._process_definition_code = None + + def __enter__(self) -> "ProcessDefinition": + ProcessDefinitionContext.set(self) + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + ProcessDefinitionContext.delete() + + @property + def tenant(self) -> Tenant: + return Tenant(self._tenant) + + @tenant.setter + def tenant(self, tenant: Tenant) -> None: + self._tenant = tenant.name + + @property + def project(self) -> Project: + return Project(self._project) + + @project.setter + def project(self, project: Project) -> None: + self._project = project.name + + @property + def user(self) -> User: + return User(self._user, + ProcessDefinitionDefault.USER_PWD, + ProcessDefinitionDefault.USER_EMAIL, + ProcessDefinitionDefault.USER_PHONE, + ProcessDefinitionDefault.TENANT, + ProcessDefinitionDefault.QUEUE, + ProcessDefinitionDefault.USER_STATE) + + @property + def task_definition_json(self) -> List[Dict]: + if not self.tasks: + return [self.tasks] + else: + return [task.to_dict() for task in self.tasks.values()] + + @property + def task_relation_json(self) -> List[Dict]: + if not self.tasks: + return [self.tasks] + else: + self._handle_root_relation() + return [tr.to_dict() for tr in self._task_relations] + + # TODO inti DAG's tasks are in the same place + @property + def task_location(self) -> List[Dict]: + if not self.tasks: + return [self.tasks] + else: + return [{"taskCode": task_code, "x": 0, "y": 0} for task_code in self.tasks] + + @property + def task_list(self) -> List["Task"]: + return list(self.tasks.values()) + + def _handle_root_relation(self): + from pydolphinscheduler.core.task import TaskRelation + post_relation_code = set() + for relation in self._task_relations: + post_relation_code.add(relation.post_task_code) + for task in self.task_list: + if task.code not in post_relation_code: + root_relation = TaskRelation(pre_task_code=0, post_task_code=task.code) + self._task_relations.add(root_relation) + + def add_task(self, task: "Task") -> None: + self.tasks[task.code] = task + task._process_definition = self + + def add_tasks(self, tasks: List["Task"]) -> None: + for task in tasks: + self.add_task(task) + + def get_task(self, code: str) -> "Task": + if code not in self.tasks: + raise ValueError("Task with code %s can not found in process definition %", (code, self.name)) + return self.tasks[code] + + # TODO which tying should return in this case + def get_tasks_by_name(self, name: str) -> Set["Task"]: + find = set() + for task in self.tasks.values(): + if task.name == name: + find.add(task) + return find + + def get_one_task_by_name(self, name: str) -> "Task": + tasks = self.get_tasks_by_name(name) + if not tasks: + raise ValueError(f"Can not find task with name {name}.") + return tasks.pop() + + def run(self): + """ + Run ProcessDefinition instance, a shortcut for :ref: submit and :ref: start + Only support manual for now, schedule run will coming soon + :return: + """ + self.submit() + self.start() + + def _ensure_side_model_exists(self): + """ + Ensure side model exists which including :ref: Project, Tenant, User. + If those model not exists, would create default value in :ref: ProcessDefinitionDefault + """ + # TODO used metaclass for more pythonic + self.tenant.create_if_not_exists(self._queue) + # model User have to create after Tenant created + self.user.create_if_not_exists() + # Project model need User object exists + self.project.create_if_not_exists(self._user) + + def submit(self) -> int: + """ + Submit ProcessDefinition instance to java gateway + :return: + """ + self._ensure_side_model_exists() + gateway = launch_gateway() + self._process_definition_code = gateway.entry_point.createOrUpdateProcessDefinition( + self._user, + self._project, + self.name, + str(self.description) if self.description else "", + str(self.param) if self.param else None, + json.dumps(self.task_location), + self.timeout, + self._tenant, + # TODO add serialization function + json.dumps(self.task_relation_json), + json.dumps(self.task_definition_json), + ) + return self._process_definition_code + + def start(self) -> None: + """ + Start ProcessDefinition instance which post to `start-process-instance` to java gateway + :return: + """ + gateway = launch_gateway() + gateway.entry_point.execProcessInstance( + self._user, + self._project, + self.name, + "", + "default", + 24 * 3600, + ) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py new file mode 100644 index 0000000000..fa97c76b66 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py @@ -0,0 +1,237 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional, List, Dict, Set, Union, Sequence, Tuple + +from pydolphinscheduler.constants import TaskPriority, ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \ + DefaultTaskCodeNum, JavaGatewayDefault +from pydolphinscheduler.core.base import Base +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.core.process_definition import ProcessDefinitionContext +from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker +from pydolphinscheduler.utils.string import snake2camel, class_name2camel + + +class ObjectJsonBase: + DEFAULT_ATTR = {} + + def __int__(self, *args, **kwargs): + pass + + def __str__(self) -> str: + content = [] + for attribute, value in self.__dict__.items(): + content.append(f"\"{snake2camel(attribute)}\": {value}") + content = ",".join(content) + return f"\"{class_name2camel(type(self).__name__)}\":{{{content}}}" + + # TODO check how Redash do + # TODO DRY + def to_dict(self) -> Dict: + content = {snake2camel(attr): value for attr, value in self.__dict__.items()} + content.update(self.DEFAULT_ATTR) + return content + + +class TaskParams(ObjectJsonBase): + DEFAULT_CONDITION_RESULT = { + "successNode": [ + "" + ], + "failedNode": [ + "" + ] + } + + def __init__( + self, + raw_script: str, + local_params: Optional[List] = None, + resource_list: Optional[List] = None, + dependence: Optional[Dict] = None, + wait_start_timeout: Optional[Dict] = None, + condition_result: Optional[Dict] = None, + ): + super().__init__() + self.raw_script = raw_script + self.local_params = local_params or [] + self.resource_list = resource_list or [] + self.dependence = dependence or {} + self.wait_start_timeout = wait_start_timeout or {} + # TODO need better way to handle it, this code just for POC + self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT + + +class TaskRelation(ObjectJsonBase): + DEFAULT_ATTR = { + "name": "", + "preTaskVersion": 1, + "postTaskVersion": 1, + "conditionType": 0, + "conditionParams": {} + } + + def __init__( + self, + pre_task_code: int, + post_task_code: int, + ): + super().__init__() + self.pre_task_code = pre_task_code + self.post_task_code = post_task_code + + def __hash__(self): + return hash(f"{self.post_task_code}, {self.post_task_code}") + + +class Task(Base): + + DEFAULT_DEPS_ATTR = { + "name": "", + "preTaskVersion": 1, + "postTaskVersion": 1, + "conditionType": 0, + "conditionParams": {} + } + + def __init__( + self, + name: str, + task_type: str, + task_params: TaskParams, + description: Optional[str] = None, + flag: Optional[str] = TaskFlag.YES, + task_priority: Optional[str] = TaskPriority.MEDIUM, + worker_group: Optional[str] = ProcessDefinitionDefault.WORKER_GROUP, + delay_time: Optional[int] = 0, + fail_retry_times: Optional[int] = 0, + fail_retry_interval: Optional[int] = 1, + timeout_flag: Optional[int] = TaskTimeoutFlag.CLOSE, + timeout_notify_strategy: Optional = None, + timeout: Optional[int] = 0, + process_definition: Optional[ProcessDefinition] = None, + ): + + super().__init__(name, description) + self.task_type = task_type + self.task_params = task_params + self.flag = flag + self.task_priority = task_priority + self.worker_group = worker_group + self.fail_retry_times = fail_retry_times + self.fail_retry_interval = fail_retry_interval + self.delay_time = delay_time + self.timeout_flag = timeout_flag + self.timeout_notify_strategy = timeout_notify_strategy + self.timeout = timeout + self._process_definition = None + self.process_definition: ProcessDefinition = process_definition or ProcessDefinitionContext.get() + self._upstream_task_codes: Set[int] = set() + self._downstream_task_codes: Set[int] = set() + self._task_relation: Set[TaskRelation] = set() + # move attribute code and version after _process_definition and process_definition declare + self.code, self.version = self.gen_code_and_version() + # Add task to process definition, maybe we could put into property process_definition latter + if self.process_definition is not None and self.code not in self.process_definition.tasks: + self.process_definition.add_task(self) + + @property + def process_definition(self) -> Optional[ProcessDefinition]: + if self._process_definition: + return self._process_definition + else: + raise ValueError(f'Task {self} has not been assigned to a ProcessDefinition yet') + + @process_definition.setter + def process_definition(self, process_definition: Optional[ProcessDefinition]): + self._process_definition = process_definition + + def __hash__(self): + return hash(self.code) + + def __lshift__(self, other: Union["Task", Sequence["Task"]]): + """Implements Task << Task""" + self.set_upstream(other) + return other + + def __rshift__(self, other: Union["Task", Sequence["Task"]]): + """Implements Task >> Task""" + self.set_downstream(other) + return other + + def __rrshift__(self, other: Union["Task", Sequence["Task"]]): + """Called for Task >> [Task] because list don't have __rshift__ operators.""" + self.__lshift__(other) + return self + + def __rlshift__(self, other: Union["Task", Sequence["Task"]]): + """Called for Task << [Task] because list don't have __lshift__ operators.""" + self.__rshift__(other) + return self + + def _set_deps(self, tasks: Union["Task", Sequence["Task"]], upstream: bool = True) -> None: + if not isinstance(tasks, Sequence): + tasks = [tasks] + + for task in tasks: + if upstream: + self._upstream_task_codes.add(task.code) + task._downstream_task_codes.add(self.code) + + if self._process_definition: + task_relation = TaskRelation( + pre_task_code=task.code, + post_task_code=self.code, + ) + self.process_definition._task_relations.add(task_relation) + else: + self._downstream_task_codes.add(task.code) + task._upstream_task_codes.add(self.code) + + if self._process_definition: + task_relation = TaskRelation( + pre_task_code=self.code, + post_task_code=task.code, + ) + self.process_definition._task_relations.add(task_relation) + + def set_upstream(self, tasks: Union["Task", Sequence["Task"]]) -> None: + self._set_deps(tasks, upstream=True) + + def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None: + self._set_deps(tasks, upstream=False) + + # TODO code should better generate in bulk mode when :ref: processDefinition run submit or start + def gen_code_and_version(self) -> Tuple: + # TODO get code from specific project process definition and task name + gateway = launch_gateway() + result = gateway.entry_point.getCodeAndVersion(self.process_definition._project, self.name) + # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT) + # gateway_result_checker(result) + return result.get("code"), result.get("version") + + def to_dict(self, camel_attr=True) -> Dict: + content = {} + for attr, value in self.__dict__.items(): + # Don't publish private variables + if attr.startswith("_"): + continue + elif isinstance(value, TaskParams): + content[snake2camel(attr)] = value.to_dict() + else: + content[snake2camel(attr)] = value + return content diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py new file mode 100644 index 0000000000..e93e8f1fb4 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/java_gateway.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Optional + +from py4j.java_collections import JavaMap +from py4j.java_gateway import JavaGateway, GatewayParameters + +from pydolphinscheduler.constants import JavaGatewayDefault + + +def launch_gateway() -> JavaGateway: + # TODO Note that automatic conversion makes calling Java methods slightly less efficient because + # in the worst case, Py4J needs to go through all registered converters for all parameters. + # This is why automatic conversion is disabled by default. + gateway = JavaGateway(gateway_parameters=GatewayParameters(auto_convert=True)) + return gateway + + +def gateway_result_checker( + result: JavaMap, + msg_check: Optional[str] = JavaGatewayDefault.RESULT_MESSAGE_SUCCESS +) -> Any: + if result[JavaGatewayDefault.RESULT_STATUS_KEYWORD].toString() != \ + JavaGatewayDefault.RESULT_STATUS_SUCCESS: + raise RuntimeError(f"Failed when try to got result for java gateway") + if msg_check is not None and result[JavaGatewayDefault.RESULT_MESSAGE_KEYWORD] != msg_check: + raise ValueError(f"Get result state not success.") + return result diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py new file mode 100644 index 0000000000..2f376a5a01 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pydolphinscheduler.side.project import Project +from pydolphinscheduler.side.tenant import Tenant +from pydolphinscheduler.side.user import User diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py new file mode 100644 index 0000000000..b118be9994 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/project.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pydolphinscheduler.core.base_side import BaseSide +from pydolphinscheduler.constants import ProcessDefinitionDefault +from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker + + +class Project(BaseSide): + """ + Project + """ + + def __init__( + self, + name: str = ProcessDefinitionDefault.PROJECT, + description: Optional[str] = None + ): + super().__init__(name, description) + + def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None: + """ + Create Project if not exists + """ + gateway = launch_gateway() + result = gateway.entry_point.createProject(user, self.name, self.description) + # TODO recover result checker + # gateway_result_checker(result, None) + diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py new file mode 100644 index 0000000000..4c0d1f6010 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/queue.py @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pydolphinscheduler.constants import ProcessDefinitionDefault +from pydolphinscheduler.core.base_side import BaseSide +from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker + + +class Queue(BaseSide): + """ + Queue + """ + + def __init__( + self, + name: str = ProcessDefinitionDefault.QUEUE, + description: Optional[str] = "" + ): + super().__init__(name, description) + + def create_if_not_exists(self, user=ProcessDefinitionDefault.USER) -> None: + """ + Create Queue if not exists + """ + gateway = launch_gateway() + # Here we set Queue.name and Queue.queueName same as self.name + result = gateway.entry_point.createProject(user, self.name, self.name) + gateway_result_checker(result, None) \ No newline at end of file diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py new file mode 100644 index 0000000000..9cba5331a4 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/tenant.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pydolphinscheduler.constants import ProcessDefinitionDefault +from pydolphinscheduler.core.base_side import BaseSide +from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker + + +class Tenant(BaseSide): + """ + Tenant + """ + + def __init__( + self, + name: str = ProcessDefinitionDefault.TENANT, + queue: str = ProcessDefinitionDefault.QUEUE, + description: Optional[str] = None + ): + super().__init__(name, description) + self.queue = queue + + def create_if_not_exists(self, queue_name: str, user=ProcessDefinitionDefault.USER) -> None: + """ + Create Tenant if not exists + """ + gateway = launch_gateway() + result = gateway.entry_point.createTenant(self.name, self.description, queue_name) + # gateway_result_checker(result, None) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py new file mode 100644 index 0000000000..fc7c3392ee --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/user.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pydolphinscheduler.core.base_side import BaseSide +from pydolphinscheduler.java_gateway import launch_gateway, gateway_result_checker + + +class User(BaseSide): + _KEY_ATTR = { + "name", + "password", + "email", + "phone", + "tenant", + "queue", + "status", + } + + def __init__( + self, + name: str, + password: str, + email: str, + phone: str, + tenant: str, + queue: Optional[str] = None, + status: Optional[int] = 1, + ): + super().__init__(name) + self.password = password + self.email = email + self.phone = phone + self.tenant = tenant + self.queue = queue + self.status = status + + def create_if_not_exists(self, **kwargs): + """ + Create User if not exists + """ + gateway = launch_gateway() + result = gateway.entry_point.createUser( + self.name, + self.password, + self.email, + self.phone, + self.tenant, + self.queue, + self.status + ) + # TODO recover result checker + # gateway_result_checker(result, None) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py new file mode 100644 index 0000000000..d4b1bb46cf --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/side/worker_group.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Optional + +from pydolphinscheduler.core.base_side import BaseSide + + +class WorkerGroup(BaseSide): + """ + Worker Group + """ + + def __init__( + self, + name: str, + address: str, + description: Optional[str] = None + ): + super().__init__(name, description) + self.address = address diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py new file mode 100644 index 0000000000..e60c78bfd5 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/shell.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pydolphinscheduler.constants import TaskType +from pydolphinscheduler.core.task import Task, TaskParams + + +class Shell(Task): + # TODO maybe we could use instance name to replace attribute `name` + # which is simplify as `task_shell = Shell(command = "echo 1")` and + # task.name assign to `task_shell` + def __init__( + self, + name: str, + command: str, + task_type: str = TaskType.SHELL, + *args, **kwargs + ): + task_params = TaskParams(raw_script=command) + super().__init__(name, task_type, task_params, *args, **kwargs) diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py new file mode 100644 index 0000000000..c3bab71293 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/utils/string.py @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +def attr2camel(attr: str, include_private=True): + if include_private: + attr = attr.lstrip("_") + return snake2camel(attr) + + +def snake2camel(snake: str): + components = snake.split("_") + return components[0] + "".join(x.title() for x in components[1:]) + + +def class_name2camel(class_name: str): + return class_name[0].lower() + class_name[1:] diff --git a/dolphinscheduler-python/pydolphinscheduler/test/__init__.py b/dolphinscheduler-python/pydolphinscheduler/test/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py b/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/core/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py b/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py new file mode 100644 index 0000000000..7155447d66 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/core/test_process_definition.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from pydolphinscheduler.constants import ProcessDefinitionDefault, ProcessDefinitionReleaseState +from pydolphinscheduler.core.process_definition import ProcessDefinition +from pydolphinscheduler.core.task import Task, TaskParams +from pydolphinscheduler.side import Tenant, Project, User + +TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition" + + +@pytest.mark.parametrize( + "func", + [ + "run", "submit", "start" + ] +) +def test_process_definition_key_attr(func): + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + assert hasattr(pd, func), f"ProcessDefinition instance don't have attribute `{func}`" + + +@pytest.mark.parametrize( + "name,value", + [ + ("project", Project(ProcessDefinitionDefault.PROJECT)), + ("tenant", Tenant(ProcessDefinitionDefault.TENANT)), + ("user", User(ProcessDefinitionDefault.USER, + ProcessDefinitionDefault.USER_PWD, + ProcessDefinitionDefault.USER_EMAIL, + ProcessDefinitionDefault.USER_PHONE, + ProcessDefinitionDefault.TENANT, + ProcessDefinitionDefault.QUEUE, + ProcessDefinitionDefault.USER_STATE)), + ("release_state", ProcessDefinitionReleaseState.ONLINE), + ], +) +def test_process_definition_default_value(name, value): + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + assert getattr(pd, name) == value, \ + f"ProcessDefinition instance attribute `{name}` have not except default value `{getattr(pd, name)}`" + + +@pytest.mark.parametrize( + "name,cls,expect", + [ + ("project", Project, "project"), + ("tenant", Tenant, "tenant"), + ], +) +def test_process_definition_set_attr(name, cls, expect): + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + setattr(pd, name, cls(expect)) + assert getattr(pd, name) == cls( + expect), f"ProcessDefinition set attribute `{name}` do not work expect" + + +def test_process_definition_to_dict_without_task(): + expect = { + "name": TEST_PROCESS_DEFINITION_NAME, + "description": None, + "project": ProcessDefinitionDefault.PROJECT, + "tenant": ProcessDefinitionDefault.TENANT, + "timeout": 0, + "releaseState": ProcessDefinitionReleaseState.ONLINE, + "param": None, + "tasks": {}, + "taskDefinitionJson": [{}], + "taskRelationJson": [{}], + } + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + assert pd.to_dict() == expect + + +def test_process_definition_simple(): + expect_tasks_num = 5 + with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME) as pd: + for i in range(expect_tasks_num): + task_params = TaskParams(raw_script=f"test-raw-script-{i}") + curr_task = Task(name=f"task-{i}", task_type=f"type-{i}", task_params=task_params) + # Set deps task i as i-1 parent + if i > 0: + pre_task = pd.get_one_task_by_name(f"task-{i - 1}") + curr_task.set_upstream(pre_task) + assert len(pd.tasks) == expect_tasks_num + + # Test if task process_definition same as origin one + task: Task = pd.get_one_task_by_name("task-0") + assert pd is task.process_definition + + # Test if all tasks with expect deps + for i in range(expect_tasks_num): + task: Task = pd.get_one_task_by_name(f"task-{i}") + if i == 0: + assert task._upstream_task_codes == set() + assert task._downstream_task_codes == {pd.get_one_task_by_name("task-1").code} + elif i == expect_tasks_num - 1: + assert task._upstream_task_codes == {pd.get_one_task_by_name(f"task-{i - 1}").code} + assert task._downstream_task_codes == set() + else: + assert task._upstream_task_codes == {pd.get_one_task_by_name(f"task-{i - 1}").code} + assert task._downstream_task_codes == {pd.get_one_task_by_name(f"task-{i + 1}").code} diff --git a/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py b/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py new file mode 100644 index 0000000000..1d89b0778a --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/core/test_task.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from unittest.mock import patch + +from pydolphinscheduler.core.task import TaskParams, TaskRelation, Task + + +def test_task_params_to_dict(): + raw_script = "test_task_params_to_dict" + expect = { + "resourceList": [], + "localParams": [], + "rawScript": raw_script, + "dependence": {}, + "conditionResult": TaskParams.DEFAULT_CONDITION_RESULT, + "waitStartTimeout": {} + } + task_param = TaskParams(raw_script=raw_script) + assert task_param.to_dict() == expect + + +def test_task_relation_to_dict(): + pre_task_code = 123 + post_task_code = 456 + expect = { + "name": "", + "preTaskCode": pre_task_code, + "postTaskCode": post_task_code, + "preTaskVersion": 1, + "postTaskVersion": 1, + "conditionType": 0, + "conditionParams": {} + } + task_param = TaskRelation(pre_task_code=pre_task_code, post_task_code=post_task_code) + assert task_param.to_dict() == expect + + +def test_task_to_dict(): + code = "123" + name = "test_task_to_dict" + task_type = "test_task_to_dict_type" + raw_script = "test_task_params_to_dict" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": task_type, + "taskParams": { + "resourceList": [], + "localParams": [], + "rawScript": raw_script, + "dependence": {}, + "conditionResult": { + "successNode": [ + "" + ], + "failedNode": [ + "" + ] + }, + "waitStartTimeout": {} + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "worker-group-pydolphin", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0 + } + with patch('pydolphinscheduler.core.task.Task.gen_code', return_value=code): + task = Task( + name=name, + task_type=task_type, + task_params=TaskParams(raw_script) + ) + assert task.to_dict() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py b/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/example/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py b/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/tasks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py b/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py new file mode 100644 index 0000000000..ff1b7b5465 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/tasks/test_shell.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from unittest.mock import patch + +from pydolphinscheduler.tasks.shell import Shell + + +def test_shell_to_dict(): + code = "123" + name = "test_shell_to_dict" + command = "echo test shell" + expect = { + "code": code, + "name": name, + "version": 1, + "description": None, + "delayTime": 0, + "taskType": "SHELL", + "taskParams": { + "resourceList": [], + "localParams": [], + "rawScript": command, + "dependence": {}, + "conditionResult": { + "successNode": [ + "" + ], + "failedNode": [ + "" + ] + }, + "waitStartTimeout": {} + }, + "flag": "YES", + "taskPriority": "MEDIUM", + "workerGroup": "worker-group-pydolphin", + "failRetryTimes": 0, + "failRetryInterval": 1, + "timeoutFlag": "CLOSE", + "timeoutNotifyStrategy": None, + "timeout": 0 + } + with patch('pydolphinscheduler.core.task.Task.gen_code', return_value=code): + shell = Shell(name, command) + assert shell.to_dict() == expect diff --git a/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py b/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py new file mode 100644 index 0000000000..200c06d8b7 --- /dev/null +++ b/dolphinscheduler-python/pydolphinscheduler/test/test_java_gateway.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from py4j.java_gateway import java_import, JavaGateway + + +def test_gateway_connect(): + gateway = JavaGateway() + app = gateway.entry_point + assert app.ping() == "PONG" + + +def test_jvm_simple(): + gateway = JavaGateway() + smaller = gateway.jvm.java.lang.Integer.MIN_VALUE + bigger = gateway.jvm.java.lang.Integer.MAX_VALUE + assert bigger > smaller + + +def test_python_client_java_import_single(): + gateway = JavaGateway() + java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.FileUtils") + assert hasattr(gateway.jvm, "FileUtils") + + +def test_python_client_java_import_package(): + gateway = JavaGateway() + java_import(gateway.jvm, "org.apache.dolphinscheduler.common.utils.*") + # test if jvm view have some common utils + for util in ("FileUtils", "OSUtils", "DateUtils"): + assert hasattr(gateway.jvm, util) diff --git a/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java new file mode 100644 index 0000000000..4a16ab4dea --- /dev/null +++ b/dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ExecutorService; +import org.apache.dolphinscheduler.api.service.ProcessDefinitionService; +import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.QueueService; +import org.apache.dolphinscheduler.api.service.TaskDefinitionService; +import org.apache.dolphinscheduler.api.service.TenantService; +import org.apache.dolphinscheduler.api.service.UsersService; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.RunMode; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; +import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; + +import py4j.GatewayServer; + +@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "org.apache.dolphinscheduler.server.master.*", + "org.apache.dolphinscheduler.server.worker.*", + "org.apache.dolphinscheduler.server.monitor.*", + "org.apache.dolphinscheduler.server.log.*" + }) +}) +public class PythonGatewayServer extends SpringBootServletInitializer { + @Autowired + private ProcessDefinitionMapper processDefinitionMapper; + + @Autowired + private ProjectService projectService; + + @Autowired + private TenantService tenantService; + + @Autowired + private ExecutorService executorService; + + @Autowired + private ProcessDefinitionService processDefinitionService; + + @Autowired + private TaskDefinitionService taskDefinitionService; + + @Autowired + private UsersService usersService; + + @Autowired + private QueueService queueService; + + @Autowired + private ProjectMapper projectMapper; + + @Autowired + private TaskDefinitionMapper taskDefinitionMapper; + + // TODO replace this user to build in admin user if we make sure build in one could not be change + private final User dummyAdminUser = new User() { + { + setId(Integer.MAX_VALUE); + setUserName("dummyUser"); + setUserType(UserType.ADMIN_USER); + } + }; + + private final Queue queuePythonGateway = new Queue() { + { + setId(Integer.MAX_VALUE); + setQueueName("queuePythonGateway"); + } + }; + + public String ping() { + return "PONG"; + } + + // TODO Should we import package in python client side? utils package can but service can not, why + // Core api + public Map genTaskCodeList(Integer genNum) { + return taskDefinitionService.genTaskCodeList(genNum); + } + + public Map getCodeAndVersion(String projectName, String taskName) throws SnowFlakeUtils.SnowFlakeException { + Project project = projectMapper.queryByName(projectName); + Map result = new HashMap<>(); + // project do not exists, mean task not exists too, so we should directly return init value + if (project == null) { + result.put("code", SnowFlakeUtils.getInstance().nextId()); + result.put("version", 0L); + return result; + } + TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName); + if (taskDefinition == null) { + result.put("code", SnowFlakeUtils.getInstance().nextId()); + result.put("version", 0L); + } else { + result.put("code", taskDefinition.getCode()); + result.put("version", (long) taskDefinition.getVersion()); + } + return result; + } + + /** + * create or update process definition. + * If process definition do not exists in Project=`projectCode` would create a new one + * If process definition already exists in Project=`projectCode` would update it + * All requests + *

+ * + * @param name process definition name + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes + * @param taskDefinitionJson taskDefinitionJson + * @return create result code + */ + public Long createOrUpdateProcessDefinition(String userName, + String projectName, + String name, + String description, + String globalParams, + String locations, + int timeout, + String tenantCode, + String taskRelationJson, + String taskDefinitionJson) { + User user = usersService.queryUser(userName); + Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); + long projectCode = project.getCode(); + Map verifyProcessDefinitionExists = processDefinitionService.verifyProcessDefinitionName(user, projectCode, name); + + if (verifyProcessDefinitionExists.get(Constants.STATUS) != Status.SUCCESS) { + // update process definition + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(projectCode, name); + long processDefinitionCode = processDefinition.getCode(); + // make sure process definition offline which could edit + processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); + Map result = processDefinitionService.updateProcessDefinition(user, projectCode, name, processDefinitionCode, description, globalParams, + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); + return processDefinitionCode; + } else { + // create process definition + Map result = processDefinitionService.createProcessDefinition(user, projectCode, name, description, globalParams, + locations, timeout, tenantCode, taskRelationJson, taskDefinitionJson); + ProcessDefinition processDefinition = (ProcessDefinition) result.get(Constants.DATA_LIST); + return processDefinition.getCode(); + } + } + + public void execProcessInstance(String userName, + String projectName, + String processDefinitionName, + String cronTime, + String workerGroup, + Integer timeout + ) { + User user = usersService.queryUser(userName); + Project project = projectMapper.queryByName(projectName); + ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + + // temp default value + FailureStrategy failureStrategy = FailureStrategy.CONTINUE; + TaskDependType taskDependType = TaskDependType.TASK_POST; + WarningType warningType = WarningType.NONE; + RunMode runMode = RunMode.RUN_MODE_SERIAL; + Priority priority = Priority.MEDIUM; + int warningGroupId = 0; + Long environmentCode = -1L; + Map startParams = null; + Integer expectedParallelismNumber = null; + String startNodeList = null; + + // make sure process definition online + processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE); + + executorService.execProcessInstance(user, + project.getCode(), + processDefinition.getCode(), + cronTime, + null, + failureStrategy, + startNodeList, + taskDependType, + warningType, + warningGroupId, + runMode, + priority, + workerGroup, + environmentCode, + timeout, + startParams, + expectedParallelismNumber, + 0 + ); + } + + // side object + public Map createProject(String userName, String name, String desc) { + User user = usersService.queryUser(userName); + return projectService.createProject(user, name, desc); + } + + public Map createQueue(String name, String queueName) { + Result verifyQueueExists = queueService.verifyQueue(name, queueName); + if (verifyQueueExists.getCode() == 0) { + return queueService.createQueue(dummyAdminUser, name, queueName); + } else { + Map result = new HashMap<>(); + // TODO function putMsg do not work here + result.put(Constants.STATUS, Status.SUCCESS); + result.put(Constants.MSG, Status.SUCCESS.getMsg()); + return result; + } + } + + public Map createTenant(String tenantCode, String desc, String queueName) throws Exception { + if (tenantService.checkTenantExists(tenantCode)) { + Map result = new HashMap<>(); + // TODO function putMsg do not work here + result.put(Constants.STATUS, Status.SUCCESS); + result.put(Constants.MSG, Status.SUCCESS.getMsg()); + return result; + } else { + Result verifyQueueExists = queueService.verifyQueue(queueName, queueName); + if (verifyQueueExists.getCode() == 0) { + // TODO why create do not return id? + queueService.createQueue(dummyAdminUser, queueName, queueName); + } + Map result = queueService.queryQueueName(queueName); + List queueList = (List) result.get(Constants.DATA_LIST); + Queue queue = queueList.get(0); + return tenantService.createTenant(dummyAdminUser, tenantCode, queue.getId(), desc); + } + } + + public void createUser(String userName, + String userPassword, + String email, + String phone, + String tenantCode, + String queue, + int state) { + User user = usersService.queryUser(userName); + if (Objects.isNull(user)) { + Map tenantResult = tenantService.queryByTenantCode(tenantCode); + Tenant tenant = (Tenant) tenantResult.get(Constants.DATA_LIST); + usersService.createUser(userName, userPassword, email, tenant.getId(), phone, queue, state); + } + } + + @PostConstruct + public void run() { + GatewayServer server = new GatewayServer(this); + GatewayServer.turnLoggingOn(); + // Start server to accept python client RPC + server.start(); + } + + public static void main(String[] args) { + SpringApplication.run(PythonGatewayServer.class, args); + } +} diff --git a/dolphinscheduler-standalone-server/pom.xml b/dolphinscheduler-standalone-server/pom.xml index 8b9efc1241..83ab68d26e 100644 --- a/dolphinscheduler-standalone-server/pom.xml +++ b/dolphinscheduler-standalone-server/pom.xml @@ -51,6 +51,10 @@ org.apache.dolphinscheduler dolphinscheduler-alert + + org.apache.dolphinscheduler + dolphinscheduler-python + diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java index e5a9b00294..4a417704c8 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/server/StandaloneServer.java @@ -66,7 +66,8 @@ public class StandaloneServer { new SpringApplicationBuilder( ApiApplicationServer.class, MasterServer.class, - WorkerServer.class + WorkerServer.class, + PythonGatewayServer.class ).run(args); } diff --git a/pom.xml b/pom.xml index b453ce9b99..5f9f8973bf 100644 --- a/pom.xml +++ b/pom.xml @@ -127,6 +127,7 @@ 0.9.12 1.9.16 1.5.1 + 0.10.9 @@ -263,6 +264,11 @@ dolphinscheduler-spi ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-python + ${project.version} + org.apache.curator @@ -678,6 +684,12 @@ javax.mail 1.6.2 + + + net.sf.py4j + py4j + ${py4j.version} + @@ -1234,5 +1246,6 @@ dolphinscheduler-service dolphinscheduler-microbench dolphinscheduler-standalone-server + dolphinscheduler-python diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index b19e8b9d29..7474f000b4 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -201,6 +201,7 @@ protostuff-core-1.7.2.jar protostuff-runtime-1.7.2.jar protostuff-api-1.7.2.jar protostuff-collectionschema-1.7.2.jar +py4j-0.10.9.jar quartz-2.3.0.jar quartz-jobs-2.3.0.jar reflections-0.9.12.jar