diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js index 0f73e3a0bd..bf72d5f9ed 100644 --- a/docs/configs/docsdev.js +++ b/docs/configs/docsdev.js @@ -161,6 +161,10 @@ export default { title: 'MLflow', link: '/en-us/docs/dev/user_doc/guide/task/mlflow.html', }, + { + title: 'Openmldb', + link: '/en-us/docs/dev/user_doc/guide/task/openmldb.html', + }, ], }, { @@ -525,6 +529,10 @@ export default { title: 'MLflow', link: '/zh-cn/docs/dev/user_doc/guide/task/mlflow.html', }, + { + title: 'Openmldb', + link: '/zh-cn/docs/dev/user_doc/guide/task/openmldb.html', + }, ], }, { diff --git a/docs/docs/en/guide/task/openmldb.md b/docs/docs/en/guide/task/openmldb.md new file mode 100644 index 0000000000..e2e48ea84a --- /dev/null +++ b/docs/docs/en/guide/task/openmldb.md @@ -0,0 +1,76 @@ +# OpenMLDB Node + +## Overview + +[OpenMLDB](https://openmldb.ai/) is an excellent open source machine learning database, providing a full-stack +FeatureOps solution for production. + +OpenMLDB task plugin used to execute tasks on OpenMLDB cluster. + +## Create Task + +- Click `Project -> Management-Project -> Name-Workflow Definition`, and click the "Create Workflow" button to enter the + DAG editing page. +- Drag from the toolbar task node to canvas. + +## Task Example + +First, introduce some general parameters of DolphinScheduler + +- **Node name**: The node name in a workflow definition is unique. +- **Run flag**: Identifies whether this node schedules normally, if it does not need to execute, select + the `prohibition execution`. +- **Descriptive information**: Describe the function of the node. +- **Task priority**: When the number of worker threads is insufficient, execute in the order of priority from high + to low, and tasks with the same priority will execute in a first-in first-out order. +- **Worker grouping**: Assign tasks to the machines of the worker group to execute. If `Default` is selected, + randomly select a worker machine for execution. +- **Environment Name**: Configure the environment name in which run the script. +- **Times of failed retry attempts**: The number of times the task failed to resubmit. +- **Failed retry interval**: The time interval (unit minute) for resubmitting the task after a failed task. +- **Delayed execution time**: The time (unit minute) that a task delays in execution. +- **Timeout alarm**: Check the timeout alarm and timeout failure. When the task runs exceed the "timeout", an alarm + email will send and the task execution will fail. +- **Predecessor task**: Selecting a predecessor task for the current task, will set the selected predecessor task as + upstream of the current task. + +### OpenMLDB Parameters + +**Task Parameter** + +- **zookeeper** :OpenMLDB cluster zookeeper address, e.g. 127.0.0.1:2181. +- **zookeeper path** : OpenMLDB cluster zookeeper path, e.g. /openmldb. +- **Execute Mode** :determine the init mode, offline or online. You can switch it in sql statement. +- **SQL statement** :SQL statement. +- Custom parameters: It is the user-defined parameters of Python, which will replace the content with \${variable} in the script. + +Here are some examples: + +#### Load data + +![load data](/img/tasks/demo/openmldb-load-data.png) + +We use `LOAD DATA` to load data into OpenMLDB cluster. We select `offline` here, so it will load to offline storage. + +#### Feature extraction + +![fe](/img/tasks/demo/openmldb-feature-extraction.png) + +We use `SELECT INTO` to do feature extraction. We select `offline` here, so it will run sql on offline engine. + +## Environment to prepare + +### Start the OpenMLDB cluster + +You should create an OpenMLDB cluster first. If in production env, please check [deploy OpenMLDB](https://openmldb.ai/docs/en/v0.5/deploy/install_deploy.html). + +You can follow [run OpenMLDB in docker](https://openmldb.ai/docs/zh/v0.5/quickstart/openmldb_quickstart.html#id11) +to a quick start. + +### Python env + +The OpenMLDB task will use OpenMLDB Python SDK to connect OpenMLDB cluster. So you should have the Python env. + +We will use `python3` by default. You can set `PYTHON_HOME` to use your custom python env. + +Make sure you have installed OpenMLDB Python SDK in the host where the worker server running, using `pip install openmldb`. diff --git a/docs/docs/zh/guide/task/openmldb.md b/docs/docs/zh/guide/task/openmldb.md new file mode 100644 index 0000000000..f889b978f2 --- /dev/null +++ b/docs/docs/zh/guide/task/openmldb.md @@ -0,0 +1,68 @@ +# OpenMLDB 节点 + +## 综述 + +[OpenMLDB](https://openmldb.ai/) 是一个优秀的开源机器学习数据库,提供生产级数据及特征开发全栈解决方案。 + +OpenMLDB任务组件可以连接OpenMLDB集群执行任务。 + +## 创建任务 + +- 点击项目管理-项目名称-工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面; +- 拖动工具栏的 任务节点到画板中。 + +## 任务样例 + +首先介绍一些DS通用参数: + +- **节点名称** :设置任务的名称。一个工作流定义中的节点名称是唯一的。 +- **运行标志** :标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。 +- **描述** :描述该节点的功能。 +- **任务优先级** :worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。 +- **Worker 分组** :任务分配给 worker 组的机器执行,选择 Default,会随机选择一台 worker 机执行。 +- **环境名称** :配置运行脚本的环境。 +- **失败重试次数** :任务失败重新提交的次数。 +- **失败重试间隔** :任务失败重新提交任务的时间间隔,以分钟为单位。 +- **延迟执行时间** :任务延迟执行的时间,以分钟为单位。 +- **超时告警** :勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败。 +- **前置任务** :选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。 + +### OpenMLDB 参数 + +**任务参数** + +- **zookeeper地址** :OpenMLDB集群连接地址中的zookeeper地址, e.g. 127.0.0.1:2181。 +- **zookeeper路径** : OpenMLDB集群连接地址中的zookeeper路径, e.g. /openmldb。 +- **执行模式** :初始执行模式(离线/在线),你可以在sql语句中随时切换。 +- **SQL语句** :SQL语句。 +- 自定义参数:是PYTHON局部的用户自定义参数,会替换脚本中以${变量}的内容。 + +下面有几个例子: + +#### 导入数据 + +![load data](/img/tasks/demo/openmldb-load-data.png) + +我们使用`LOAD DATA`语句导入数据到OpenMLDB集群。因为选择的是离线执行模式,所以将会导入数据到离线存储中。 + +#### 特征抽取 + +![fe](/img/tasks/demo/openmldb-feature-extraction.png) + +我们使用`SELECT INTO`进行特征抽取。因为选择的是离线执行模式,所以会使用离线引擎做特征计算。 + +## 环境准备 + +### OpenMLDB 启动 + +执行任务之前,你需要启动OpenMLDB集群。如果是在生产环境,请参考[deploy OpenMLDB](https://openmldb.ai/docs/zh/v0.5/deploy/install_deploy.html). + +你可以参考[在docker中运行OpenMLDB集群](https://openmldb.ai/docs/zh/v0.5/quickstart/openmldb_quickstart.html#id11) 快速启动。 + +### Python 环境 + +OpenMLDB任务组件将使用OpenMLDB Python SDK来连接OpenMLDB。所以你需要Python环境。 + +我们默认使用`python3`,你可以通过配置`PYTHON_HOME`来设置自己的Python环境。 + +请确保已通过`pip install openmldb`,在worker server的主机中安装了OpenMLDB Python SDK。 diff --git a/docs/img/tasks/demo/openmldb-feature-extraction.png b/docs/img/tasks/demo/openmldb-feature-extraction.png new file mode 100644 index 0000000000..9f5fb195ac Binary files /dev/null and b/docs/img/tasks/demo/openmldb-feature-extraction.png differ diff --git a/docs/img/tasks/demo/openmldb-load-data.png b/docs/img/tasks/demo/openmldb-load-data.png new file mode 100644 index 0000000000..559c68a86e Binary files /dev/null and b/docs/img/tasks/demo/openmldb-load-data.png differ diff --git a/docs/img/tasks/icons/openmldb.png b/docs/img/tasks/icons/openmldb.png new file mode 100644 index 0000000000..a06cbfe855 Binary files /dev/null and b/docs/img/tasks/icons/openmldb.png differ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml index 5ecec9263e..2422a4628e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml @@ -165,6 +165,12 @@ dolphinscheduler-task-mlflow ${project.version} + + + org.apache.dolphinscheduler + dolphinscheduler-task-openmldb + ${project.version} + \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/pom.xml new file mode 100644 index 0000000000..065d99386f --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/pom.xml @@ -0,0 +1,45 @@ + + + + + dolphinscheduler-task-plugin + org.apache.dolphinscheduler + dev-SNAPSHOT + + 4.0.0 + + dolphinscheduler-task-openmldb + jar + + + + org.apache.dolphinscheduler + dolphinscheduler-spi + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + + + org.apache.dolphinscheduler + dolphinscheduler-task-python + + + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbParameters.java new file mode 100644 index 0000000000..5f8a9b9047 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbParameters.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.openmldb; + +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.util.List; + +public class OpenmldbParameters extends AbstractParameters { + + private String zk; + private String zkPath; + private String executeMode; + /** + * origin sql script + */ + private String sql; + + /** + * resource list + */ + private List resourceList; + + public String getZk() { + return zk; + } + + public void setZk(String zk) { + this.zk = zk; + } + + public String getZkPath() { + return zkPath; + } + + public void setZkPath(String zkPath) { + this.zkPath = zkPath; + } + + public String getExecuteMode() { + return executeMode; + } + + public void setExecuteMode(String executeMode) { + this.executeMode = executeMode; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public List getResourceList() { + return resourceList; + } + + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(zk) && StringUtils.isNotEmpty(zkPath) && StringUtils.isNotEmpty(sql); + } + + @Override + public List getResourceFilesList() { + return this.resourceList; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java new file mode 100644 index 0000000000..a93c191f0c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.openmldb; + +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils; +import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.python.PythonTask; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; +import org.apache.dolphinscheduler.spi.utils.StringUtils; + +import java.nio.file.Paths; +import java.util.Locale; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.base.Preconditions; + +/** + * openmldb task + */ +public class OpenmldbTask extends PythonTask { + + /** + * openmldb parameters + */ + private OpenmldbParameters openmldbParameters; + + /** + * python process(openmldb only supports version 3 by default) + */ + private static final String OPENMLDB_PYTHON = "python3"; + private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$"); + + /** + * constructor + * + * @param taskRequest taskRequest + */ + public OpenmldbTask(TaskExecutionContext taskRequest) { + super(taskRequest); + } + + @Override + public void init() { + logger.info("openmldb task params {}", taskRequest.getTaskParams()); + + openmldbParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class); + + if (openmldbParameters == null || !openmldbParameters.checkParameters()) { + throw new TaskException("openmldb task params is not valid"); + } + } + + @Override + @Deprecated + public String getPreScript() { + return ""; + } + + @Override + public AbstractParameters getParameters() { + return openmldbParameters; + } + + /** + * build python command file path + * + * @return python command file path + */ + @Override + protected String buildPythonCommandFilePath() { + return String.format("%s/openmldb_%s.py", taskRequest.getExecutePath(), taskRequest.getTaskAppId()); + } + + /** + * build python script content from sql + * + * @return raw python script + */ + @Override + protected String buildPythonScriptContent() { + logger.info("raw sql script : {}", openmldbParameters.getSql()); + + String rawSQLScript = openmldbParameters.getSql().replaceAll("[\\r]?\\n", "\n"); + Map paramsMap = mergeParamsWithContext(openmldbParameters); + rawSQLScript = ParameterUtils.convertParameterPlaceholders(rawSQLScript, ParamUtils.convert(paramsMap)); + + // convert sql to python script + String pythonScript = buildPythonScriptsFromSql(rawSQLScript); + logger.info("rendered python script : {}", pythonScript); + return pythonScript; + } + + private String buildPythonScriptsFromSql(String rawSqlScript) { + // imports + StringBuilder builder = new StringBuilder("import openmldb\nimport sqlalchemy as db\n"); + + // connect to openmldb + builder.append(String.format("engine = db.create_engine('openmldb:///?zk=%s&zkPath=%s')\n", + openmldbParameters.getZk(), openmldbParameters.getZkPath())); + builder.append("con = engine.connect()\n"); + + // execute mode + String executeMode = openmldbParameters.getExecuteMode().toLowerCase(Locale.ROOT); + builder.append("con.execute(\"set @@execute_mode='").append(executeMode).append("';\")\n"); + // offline job should be sync, and set job_timeout to 30min(==server.channel_keep_alive_time). + // You can set it longer in sqls. + if (executeMode.equals("offline")) { + builder.append("con.execute(\"set @@sync_job=true\")\n"); + builder.append("con.execute(\"set @@job_timeout=1800000\")\n"); + } + + // split sql to list + // skip the sql only has space characters + Pattern pattern = Pattern.compile("\\S"); + for (String sql : rawSqlScript.split(";")) { + if (pattern.matcher(sql).find()) { + sql = sql.replaceAll("\\n", "\\\\n"); + builder.append("con.execute(\"").append(sql).append("\")\n"); + } + } + return builder.toString(); + } + + /** + * Build the python task command. + * If user have set the 'PYTHON_HOME' environment, we will use the 'PYTHON_HOME', + * if not, we will default use python. + * + * @param pythonFile Python file, cannot be empty. + * @return Python execute command, e.g. 'python test.py'. + */ + @Override + protected String buildPythonExecuteCommand(String pythonFile) { + Preconditions.checkNotNull(pythonFile, "Python file cannot be null"); + return getPythonCommand() + " " + pythonFile; + } + + private String getPythonCommand() { + String pythonHome = System.getenv(PYTHON_HOME); + return getPythonCommand(pythonHome); + } + + private String getPythonCommand(String pythonHome) { + if (StringUtils.isEmpty(pythonHome)) { + return OPENMLDB_PYTHON; + } + // If your python home is "xx/bin/python[xx]", you are forced to use python3 + String pythonBinPath = "/bin/" + OPENMLDB_PYTHON; + Matcher matcher = PYTHON_PATH_PATTERN.matcher(pythonHome); + if (matcher.find()) { + return matcher.replaceAll(pythonBinPath); + } + return Paths.get(pythonHome, pythonBinPath).toString(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannel.java new file mode 100644 index 0000000000..27a1e349d3 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannel.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.openmldb; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; +import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; + +public class OpenmldbTaskChannel implements TaskChannel { + @Override + public void cancelApplication(boolean status) { + + } + + @Override + public OpenmldbTask createTask(TaskExecutionContext taskRequest) { + return new OpenmldbTask(taskRequest); + } + + @Override + public AbstractParameters parseParameters(ParametersNode parametersNode) { + return JSONUtils.parseObject(parametersNode.getTaskParams(), OpenmldbParameters.class); + } + + @Override + public ResourceParametersHelper getResources(String parameters) { + return null; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannelFactory.java new file mode 100644 index 0000000000..b4698c9461 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskChannelFactory.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.openmldb; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; +import org.apache.dolphinscheduler.spi.params.base.PluginParams; + +import java.util.List; + +import com.google.auto.service.AutoService; + +@AutoService(TaskChannelFactory.class) +public class OpenmldbTaskChannelFactory implements TaskChannelFactory { + @Override + public TaskChannel create() { + return new OpenmldbTaskChannel(); + } + + @Override + public String getName() { + return "OPENMLDB"; + } + + @Override + public List getParams() { + return null; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/test/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/test/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskTest.java new file mode 100644 index 0000000000..c5ed143a5c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/test/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTaskTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.openmldb; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; +import org.powermock.reflect.Whitebox; + +public class OpenmldbTaskTest { + static class MockOpenmldbTask extends OpenmldbTask { + /** + * constructor + * + * @param taskRequest taskRequest + */ + public MockOpenmldbTask(TaskExecutionContext taskRequest) { + super(taskRequest); + } + + @Override + protected Map mergeParamsWithContext(AbstractParameters parameters) { + return new HashMap<>(); + } + } + + private OpenmldbTask createOpenmldbTask() { + return new MockOpenmldbTask(null); + } + + @Test + public void buildPythonExecuteCommand() throws Exception { + OpenmldbTask openmldbTask = createOpenmldbTask(); + String pythonFile = "test.py"; + String result1 = openmldbTask.buildPythonExecuteCommand(pythonFile); + Assert.assertEquals("python3 test.py", result1); + } + + @Test + public void buildSQLWithComment() throws Exception { + OpenmldbTask openmldbTask = createOpenmldbTask(); + OpenmldbParameters openmldbParameters = new OpenmldbParameters(); + openmldbParameters.setExecuteMode("offline"); + String rawSQLScript = "select * from users\r\n" + + "-- some comment\n" + + "inner join order on users.order_id = order.id; \n\n;" + + "select * from users;"; + openmldbParameters.setSql(rawSQLScript); + Whitebox.setInternalState(openmldbTask, "openmldbParameters", openmldbParameters); + OpenmldbParameters internal = (OpenmldbParameters) openmldbTask.getParameters(); + Assert.assertNotNull(internal); + Assert.assertEquals(internal.getExecuteMode(), "offline"); + + String result1 = openmldbTask.buildPythonScriptContent(); + Assert.assertEquals("import openmldb\n" + + "import sqlalchemy as db\n" + + "engine = db.create_engine('openmldb:///?zk=null&zkPath=null')\n" + + "con = engine.connect()\n" + + "con.execute(\"set @@execute_mode='offline';\")\n" + + "con.execute(\"set @@sync_job=true\")\n" + + "con.execute(\"set @@job_timeout=1800000\")\n" + + "con.execute(\"select * from users\\n-- some comment\\ninner join order on users.order_id = " + + "order.id\")\n" + + "con.execute(\"select * from users\")\n" + , result1); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index c153847b18..87fba6fe81 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -51,16 +51,16 @@ public class PythonTask extends AbstractTaskExecutor { /** * python parameters */ - private PythonParameters pythonParameters; + protected PythonParameters pythonParameters; /** * shell command executor */ private ShellCommandExecutor shellCommandExecutor; - private TaskExecutionContext taskRequest; + protected TaskExecutionContext taskRequest; - private static final String PYTHON_HOME = "PYTHON_HOME"; + protected static final String PYTHON_HOME = "PYTHON_HOME"; private static final String DEFAULT_PYTHON_VERSION = "python"; @@ -109,7 +109,7 @@ public class PythonTask extends AbstractTaskExecutor { String pythonScriptFile = buildPythonCommandFilePath(); // create this file - createPythonCommandFileIfNotExists(pythonScriptContent,pythonScriptFile); + createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile); String command = buildPythonExecuteCommand(pythonScriptFile); TaskResponse taskResponse = shellCommandExecutor.run(command); @@ -140,7 +140,7 @@ public class PythonTask extends AbstractTaskExecutor { * * @param rawScript rawScript * @return String - * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException + * @throws StringIndexOutOfBoundsException if substring index is out of bounds */ private static String convertPythonScriptPlaceholders(String rawScript) throws StringIndexOutOfBoundsException { int len = "${setShareVar(${".length(); @@ -170,7 +170,7 @@ public class PythonTask extends AbstractTaskExecutor { /** * create python command file if not exists * - * @param pythonScript exec python script + * @param pythonScript exec python script * @param pythonScriptFile python script file * @throws IOException io exception */ @@ -209,22 +209,23 @@ public class PythonTask extends AbstractTaskExecutor { * @return raw python script * @throws Exception exception */ - private String buildPythonScriptContent() throws Exception { + protected String buildPythonScriptContent() throws Exception { + logger.info("raw python script : {}", pythonParameters.getRawScript()); String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); + Map paramsMap = mergeParamsWithContext(pythonParameters); + return ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); + } + protected Map mergeParamsWithContext(AbstractParameters parameters) { // replace placeholder - Map paramsMap = ParamUtils.convert(taskRequest, pythonParameters); + Map paramsMap = ParamUtils.convert(taskRequest, parameters); if (MapUtils.isEmpty(paramsMap)) { paramsMap = new HashMap<>(); } if (MapUtils.isNotEmpty(taskRequest.getParamsMap())) { paramsMap.putAll(taskRequest.getParamsMap()); } - rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); - - logger.info("raw python script : {}", pythonParameters.getRawScript()); - - return rawPythonScript; + return paramsMap; } /** @@ -235,7 +236,7 @@ public class PythonTask extends AbstractTaskExecutor { * @param pythonFile Python file, cannot be empty. * @return Python execute command, e.g. 'python test.py'. */ - private String buildPythonExecuteCommand(String pythonFile) { + protected String buildPythonExecuteCommand(String pythonFile) { Preconditions.checkNotNull(pythonFile, "Python file cannot be null"); String pythonHome = String.format("${%s}", PYTHON_HOME); diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index b7f3c097cb..43f0209e0f 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -54,5 +54,6 @@ dolphinscheduler-task-zeppelin dolphinscheduler-task-jupyter dolphinscheduler-task-mlflow + dolphinscheduler-task-openmldb diff --git a/dolphinscheduler-ui/public/images/task-icons/openmldb.png b/dolphinscheduler-ui/public/images/task-icons/openmldb.png new file mode 100644 index 0000000000..a06cbfe855 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/openmldb.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/openmldb_hover.png b/dolphinscheduler-ui/public/images/task-icons/openmldb_hover.png new file mode 100644 index 0000000000..c42c332611 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/openmldb_hover.png differ diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 8f37959248..381de4232c 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -659,6 +659,14 @@ export default { mlflowProjectRepository_tips: 'github respository or path on worker', mlflowProjectVersion: 'Project Version', mlflowProjectVersion_tips: 'git version', + openmldb_zk_address: 'zookeeper address', + openmldb_zk_address_tips: 'Please enter the zookeeper address', + openmldb_zk_path: 'zookeeper path', + openmldb_zk_path_tips: 'Please enter the zookeeper path', + openmldb_execute_mode: 'Execute Mode', + openmldb_execute_mode_tips: 'Please select the execute mode', + openmldb_execute_mode_offline: 'offline', + openmldb_execute_mode_online: 'online', send_email: 'Send Email', log_display: 'Log display', rows_of_result: 'rows of result', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index 5e10480c4e..ccc93958a9 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -646,6 +646,14 @@ export default { mlflowProjectRepository_tips: '可以为github仓库或worker上的路径', mlflowProjectVersion: '项目版本', mlflowProjectVersion_tips: '项目git版本', + openmldb_zk_address: 'zookeeper地址', + openmldb_zk_address_tips: '请输入zookeeper地址', + openmldb_zk_path: 'zookeeper路径', + openmldb_zk_path_tips: '请输入zookeeper路径', + openmldb_execute_mode: '执行模式', + openmldb_execute_mode_tips: '请选择执行模式', + openmldb_execute_mode_offline: '离线', + openmldb_execute_mode_online: '在线', send_email: '发送邮件', log_display: '日志显示', rows_of_result: '行查询结果', diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index 4bddd81834..bc3a2c61dd 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -68,3 +68,4 @@ export { useJupyter } from './use-jupyter' export { useMlflow } from './use-mlflow' export { useMlflowProjects } from './use-mlflow-projects' export { useMlflowModels } from './use-mlflow-models' +export { useOpenmldb } from './use-openmldb' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-openmldb.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-openmldb.ts new file mode 100644 index 0000000000..9f5a38db6f --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-openmldb.ts @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useI18n } from 'vue-i18n' +import { useCustomParams, useResources } from '.' +import type { IJsonItem } from '../types' + +export function useOpenmldb(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + const options = [ + { + label: t('project.node.openmldb_execute_mode_offline'), + value: 'offline' + }, + { + label: t('project.node.openmldb_execute_mode_online'), + value: 'online' + } + ] + return [ + { + type: 'input', + field: 'zk', + name: t('project.node.openmldb_zk_address'), + props: { + placeholder: t('project.node.openmldb_zk_address_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.openmldb_zk_address_tips')) + } + } + } + }, + { + type: 'input', + field: 'zkPath', + name: t('project.node.openmldb_zk_path'), + props: { + placeholder: t('project.node.openmldb_zk_path_tips') + }, + validate: { + trigger: ['input', 'blur'], + required: true, + validator(validate: any, value: string) { + if (!value) { + return new Error(t('project.node.openmldb_zk_path_tips')) + } + } + } + }, + { + type: 'radio', + field: 'executeMode', + name: t('project.node.openmldb_execute_mode'), + options: options + }, + { + type: 'editor', + field: 'sql', + name: t('project.node.sql_statement'), + validate: { + trigger: ['input', 'trigger'], + required: true, + message: t('project.node.sql_empty_tips') + } + }, + useResources(), + ...useCustomParams({ model, field: 'localParams', isSimple: false }) + ] +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index d993aaa234..2ea47c72cb 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -356,6 +356,13 @@ export function formatParams(data: INodeData): { taskParams.mlflowProjectVersion = data.mlflowProjectVersion } + if (data.taskType === 'OPENMLDB') { + taskParams.zk = data.zk + taskParams.zkPath = data.zkPath + taskParams.executeMode = data.executeMode + taskParams.sql = data.sql + } + if (data.taskType === 'PIGEON') { taskParams.targetJobName = data.targetJobName } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts index 726a796cd5..a151b70d70 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts @@ -37,6 +37,7 @@ import { useZeppelin } from './use-zeppelin' import { useK8s } from './use-k8s' import { useJupyter } from './use-jupyter' import { useMlflow } from './use-mlflow' +import { useOpenmldb } from './use-openmldb' export default { SHELL: useShell, @@ -60,5 +61,6 @@ export default { ZEPPELIN: useZeppelin, K8S: useK8s, JUPYTER: useJupyter, - MLFLOW: useMlflow + MLFLOW: useMlflow, + OPENMLDB: useOpenmldb } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts new file mode 100644 index 0000000000..9f8d560de9 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-openmldb.ts @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData } from '../types' +import { ITaskData } from '../types' + +export function useOpenmldb({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const model = reactive({ + name: '', + taskType: 'OPENMLDB', + flag: 'YES', + description: '', + timeoutFlag: false, + timeoutNotifyStrategy: ['WARN'], + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + zk: '', + zkPath: '', + executeMode: 'offline' + } as INodeData) + + let extra: IJsonItem[] = [] + if (from === 1) { + extra = [ + Fields.useTaskType(model, readonly), + Fields.useProcessName({ + model, + projectCode, + isCreate: !data?.id, + from, + processName: data?.processName + }) + ] + } + + return { + json: [ + Fields.useName(from), + ...extra, + Fields.useRunFlag(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(), + Fields.useEnvironmentName(model, !model.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useFailed(), + Fields.useDelayTime(model), + ...Fields.useTimeoutAlarm(model), + ...Fields.useOpenmldb(model), + Fields.usePreTasks() + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 78f48c6b24..f44e65c95f 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -336,6 +336,9 @@ interface ITaskParams { deployType?: string deployPort?: string deployModelKey?: string + zk?: string + zkPath?: string + executeMode?: string } interface INodeData diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts index e291d184e6..26cd32f544 100644 --- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts @@ -37,6 +37,7 @@ export type TaskType = | 'K8S' | 'JUPYTER' | 'MLFLOW' + | 'OPENMLDB' export const TASK_TYPES_MAP = { SHELL: { @@ -113,5 +114,9 @@ export const TASK_TYPES_MAP = { MLFLOW: { alias: 'MLFLOW', helperLinkDisable: true + }, + OPENMLDB: { + alias: 'OPENMLDB', + helperLinkDisable: true } } as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } } diff --git a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss index 725ae513a0..bbe539d813 100644 --- a/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss +++ b/dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss @@ -167,6 +167,9 @@ $bgLight: #ffffff; &.icon-mlflow { background-image: url('/images/task-icons/mlflow.png'); } + &.icon-openmldb { + background-image: url('/images/task-icons/openmldb.png'); + } } &:hover { @@ -237,6 +240,9 @@ $bgLight: #ffffff; &.icon-mlflow { background-image: url('/images/task-icons/mlflow.png'); } + &.icon-openmldb { + background-image: url('/images/task-icons/openmldb_hover.png'); + } } } } diff --git a/pom.xml b/pom.xml index 9522d5eb86..7b97d1f028 100644 --- a/pom.xml +++ b/pom.xml @@ -408,6 +408,11 @@ dolphinscheduler-task-api ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-task-python + ${project.version} + org.apache.dolphinscheduler dolphinscheduler-task-all