Browse Source

[feature][task plugin] Add new task plugin for apache linkis (#12693)

3.2.0-release
Assert 2 years ago committed by GitHub
parent
commit
2dbc79693e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/configs/docsdev.js
  2. 49
      docs/docs/en/guide/task/linkis.md
  3. 49
      docs/docs/zh/guide/task/linkis.md
  4. BIN
      docs/img/tasks/demo/linkis_task01.png
  5. BIN
      docs/img/tasks/demo/linkis_task02.png
  6. BIN
      docs/img/tasks/icons/linkis.png
  7. 1
      dolphinscheduler-api/src/main/resources/task-type-config.yaml
  8. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml
  9. 43
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/pom.xml
  10. 39
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/Constants.java
  11. 69
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisJobStatus.java
  12. 54
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisParameters.java
  13. 256
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
  14. 49
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTaskChannel.java
  15. 45
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTaskChannelFactory.java
  16. 59
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/test/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTaskTest.java
  17. 1
      dolphinscheduler-task-plugin/pom.xml
  18. BIN
      dolphinscheduler-ui/public/images/task-icons/datasync.png
  19. BIN
      dolphinscheduler-ui/public/images/task-icons/datasync_hover.png
  20. BIN
      dolphinscheduler-ui/public/images/task-icons/linkis.png
  21. BIN
      dolphinscheduler-ui/public/images/task-icons/linkis_hover.png
  22. 4
      dolphinscheduler-ui/src/store/project/task-type.ts
  23. 1
      dolphinscheduler-ui/src/store/project/types.ts
  24. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  25. 94
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-linkis.ts
  26. 6
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  27. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  28. 79
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts
  29. 1
      dolphinscheduler-ui/src/views/projects/task/components/node/types.ts
  30. 5
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  31. 10
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
  32. 1
      script/env/dolphinscheduler_env.sh

8
docs/configs/docsdev.js

@ -217,6 +217,10 @@ export default {
title: 'Kubeflow', title: 'Kubeflow',
link: '/en-us/docs/dev/user_doc/guide/task/kubeflow.html', link: '/en-us/docs/dev/user_doc/guide/task/kubeflow.html',
}, },
{
title: 'Apache Linkis',
link: '/en-us/docs/dev/user_doc/guide/task/linkis.html',
},
], ],
}, },
{ {
@ -877,6 +881,10 @@ export default {
title: 'Kubeflow', title: 'Kubeflow',
link: '/zh-cn/docs/dev/user_doc/guide/task/kubeflow.html', link: '/zh-cn/docs/dev/user_doc/guide/task/kubeflow.html',
}, },
{
title: 'Apache Linkis',
link: '/zh-cn/docs/dev/user_doc/guide/task/linkis.html',
},
], ],
}, },
{ {

49
docs/docs/en/guide/task/linkis.md

@ -0,0 +1,49 @@
# Apache Linkis
## Overview
`Linkis` task type for creating and executing `Linkis` tasks. When the worker executes this task, it will parse the shell parameters through the `linkis-cli` command.
Click [here](https://linkis.apache.org/) for more information about `Apache Linkis`.
## Create Task
- Click Project Management -> Project Name -> Workflow Definition, and click the "Create Workflow" button to enter the DAG editing page.
- Drag the <img src="../../../../img/tasks/icons/linkis.png" width="15"/> from the toolbar to the drawing board.
## Task Parameter
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix]&#40;appendix.md#default-task-parameters&#41; `Default Task Parameters` section for default parameters.)
- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters.
- Please refer to [Linkis-Cli Task Parameters](https://linkis.apache.org/zh-CN/docs/latest/user-guide/linkiscli-manual) `Linkis Support Parameters` section for Linkis parameters.
## Task Example
This sample demonstrates using the Spark engine to execute sql script.
### Configuring the Linkis environment in DolphinScheduler
If you want to use the Linkis task type in the production environment, you need to configure the required environment first. The configuration file is as follows: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
![linkis_task01](../../../../img/tasks/demo/linkis_task01.png)
### Configuring Linkis Task Node
According to the above parameter description, configure the required content.
![linkis_task02](../../../../img/tasks/demo/linkis_task02.png)
### Config example
```
sh ./bin/linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from testdb.test;" -submitUser hadoop -proxyUser hadoop
```
### Attention
- No need to fill `sh ./bin/linkis-cli` in the configuration column, it has been configured in advance.
- The default configuration is asynchronous submission. You do not need to configure the `--async` parameter.

49
docs/docs/zh/guide/task/linkis.md

@ -0,0 +1,49 @@
# Apache Linkis
## 综述
`Linkis` 任务类型,用于创建并执行 `Linkis` 类型任务。worker 执行该任务的时候,会通过 `linkis-cli` 执行命令行。
点击 [这里](https://linkis.apache.org/) 获取更多关于 `Apache Linkis` 的信息。
## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的<img src="../../../../img/tasks/icons/linkis.png" width="15"/> 任务节点到画板中。
## 任务参数
[//]: # (TODO: use the commented anchor below once our website template supports this syntax)
[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录]&#40;appendix.md#默认任务参数&#41;`默认任务参数`一栏。)
- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。
- Linkis支持的参数列表请参考[linkis-cli任务参数](https://linkis.apache.org/zh-CN/docs/latest/user-guide/linkiscli-manual)`支持的参数列表`一栏。
## 任务样例
该样例演示为使用 Spark 引擎提交sql执行。
### 在 DolphinScheduler 中配置 Linkis 环境
若生产环境中要是使用到 Linkis 任务类型,则需要先配置好所需的环境,配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
![linkis_task01](../../../../img/tasks/demo/linkis_task01.png)
### 配置 Linkis 任务节点
根据上述参数说明,配置所需的内容即可。
![linkis_task02](../../../../img/tasks/demo/linkis_task02.png)
### Config 样例
```
sh ./bin/linkis-cli -engineType spark-2.4.3 -codeType sql -code "select count(*) from testdb.test;" -submitUser hadoop -proxyUser hadoop
```
### 注意事项
- 无需在配置栏里再填写`sh ./bin/linkis-cli`,已提前配置。
- 配置默认为异步提交,您无需再配置`--async`参数。

BIN
docs/img/tasks/demo/linkis_task01.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 127 KiB

BIN
docs/img/tasks/demo/linkis_task02.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 160 KiB

BIN
docs/img/tasks/icons/linkis.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.5 KiB

1
dolphinscheduler-api/src/main/resources/task-type-config.yaml

@ -57,3 +57,4 @@ task:
- 'ZEPPELIN' - 'ZEPPELIN'
- 'CHUNJUN' - 'CHUNJUN'
- 'DATASYNC' - 'DATASYNC'
- 'LINKIS'

5
dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml

@ -236,6 +236,11 @@
<artifactId>dolphinscheduler-task-kubeflow</artifactId> <artifactId>dolphinscheduler-task-kubeflow</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-linkis</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

43
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/pom.xml

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<version>dev-SNAPSHOT</version>
</parent>
<artifactId>dolphinscheduler-task-linkis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

39
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/Constants.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
public class Constants {
private Constants() {
throw new IllegalStateException("Utility class");
}
public static final String SHELL_CLI_OPTIONS = "${LINKIS_HOME}/bin/linkis-cli";
public static final String KILL_OPTIONS = "--kill";
public static final String STATUS_OPTIONS = "--status";
public static final String ASYNC_OPTIONS = "--async true";
public static final String SPACE = " ";
public static final String LINKIS_TASK_ID_REGEX = "\"taskID\": \"\\d+";
public static final String LINKIS_STATUS_REGEX = "\"status\": \"\\w+";
}

69
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisJobStatus.java

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.commons.lang3.StringUtils;
public enum LinkisJobStatus {
UNSUBMITTED("Unsubmitted", 0),
SUBMITTING("Submitting", 1),
INITED("Inited", 2),
WAIT_FOR_RETRY("WaitForRetry", 3),
SCHEDULED("Scheduled", 4),
RUNNING("Running", 5),
SUCCEED("Succeed", 6),
FAILED("Failed", 7),
CANCELLED("Cancelled", 8),
TIMEOUT("Timeout", 9),
UNKNOWN("Unknown", 10),
SHUTTINGDOWN("Shuttingdown", 11);
private String name;
private int id;
LinkisJobStatus(String name, int id) {
this.name = name;
this.id = id;
}
public static LinkisJobStatus convertFromJobStatusString(String status) {
if (StringUtils.isNotBlank(status)) {
if (LinkisJobStatus.INITED.name().equalsIgnoreCase(status))
return LinkisJobStatus.INITED;
else if (LinkisJobStatus.WAIT_FOR_RETRY.name().equalsIgnoreCase(status))
return LinkisJobStatus.WAIT_FOR_RETRY;
else if (LinkisJobStatus.SCHEDULED.name().equalsIgnoreCase(status))
return LinkisJobStatus.SCHEDULED;
else if (LinkisJobStatus.RUNNING.name().equalsIgnoreCase(status))
return LinkisJobStatus.RUNNING;
else if (LinkisJobStatus.SUCCEED.name().equalsIgnoreCase(status))
return LinkisJobStatus.SUCCEED;
else if (LinkisJobStatus.FAILED.name().equalsIgnoreCase(status))
return LinkisJobStatus.FAILED;
else if (LinkisJobStatus.CANCELLED.name().equalsIgnoreCase(status))
return LinkisJobStatus.CANCELLED;
else if (LinkisJobStatus.TIMEOUT.name().equalsIgnoreCase(status))
return LinkisJobStatus.TIMEOUT;
else
return LinkisJobStatus.UNKNOWN;
} else {
return LinkisJobStatus.UNKNOWN;
}
}
}

54
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisParameters.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
@Data
public class LinkisParameters extends AbstractParameters {
private Boolean useCustom;
private List<Param> paramScript;
private String rawScript;
@Getter
@Setter
public static class Param {
private String props;
private String value;
}
@Override
public boolean checkParameters() {
return ((BooleanUtils.isTrue(useCustom) && StringUtils.isNotBlank(rawScript))
|| (BooleanUtils.isFalse(useCustom) && paramScript.size() > 0));
}
}

256
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java

@ -0,0 +1,256 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* linkis task
*/
public class LinkisTask extends AbstractRemoteTask {
/**
* linkis parameters
*/
private LinkisParameters linkisParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
*/
protected final TaskExecutionContext taskExecutionContext;
private String taskId;
protected static final Pattern LINKIS_TASK_ID_REGEX = Pattern.compile(Constants.LINKIS_TASK_ID_REGEX);
protected static final Pattern LINKIS_STATUS_REGEX = Pattern.compile(Constants.LINKIS_STATUS_REGEX);
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public LinkisTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
logger);
}
@Override
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
}
@Override
public void init() {
logger.info("Linkis task params {}", taskExecutionContext.getTaskParams());
if (!linkisParameters.checkParameters()) {
throw new RuntimeException("Linkis task params is not valid");
}
}
@Override
public void submitApplication() throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
linkisParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Linkis task has been interrupted", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("The current Linkis task has been interrupted", e);
} catch (Exception e) {
logger.error("Linkis task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw new TaskException("Execute Linkis task failed", e);
}
}
@Override
public void trackApplicationStatus() throws TaskException {
initTaskId();
try {
List<String> args = new ArrayList<>();
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.STATUS_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
String status = findStatus(commandExecuteResult.getResultString());
LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status);
switch (jobStatus) {
case FAILED:
setExitStatusCode(EXIT_CODE_FAILURE);
break;
case SUCCEED:
setExitStatusCode(EXIT_CODE_SUCCESS);
break;
case CANCELLED:
setExitStatusCode(EXIT_CODE_KILL);
break;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Linkis task has been interrupted", e);
throw new TaskException("The current Linkis task has been interrupted", e);
} catch (Exception e) {
throw new TaskException("track linkis status error", e);
}
}
@Override
public void cancelApplication() throws TaskException {
// cancel process
initTaskId();
try {
List<String> args = new ArrayList<>();
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.KILL_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
shellCommandExecutor.run(command);
setExitStatusCode(EXIT_CODE_KILL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("The current Linkis task has been interrupted", e);
throw new TaskException("The current Linkis task has been interrupted", e);
} catch (Exception e) {
throw new TaskException("cancel linkis task error", e);
}
}
private String buildCommand() {
List<String> args = new ArrayList<>();
args.addAll(buildOptions());
String command = String.join(Constants.SPACE, args);
logger.info("Linkis task command: {}", command);
return command;
}
protected List<String> buildOptions() {
List<String> args = new ArrayList<>();
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.ASYNC_OPTIONS);
if (BooleanUtils.isTrue(linkisParameters.getUseCustom())) {
args.add(buildCustomConfigContent());
} else {
args.add(buildParamConfigContent());
}
return args;
}
private String buildCustomConfigContent() {
logger.info("raw custom config content : {}", linkisParameters.getRawScript());
String script = linkisParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
return script;
}
private String buildParamConfigContent() {
logger.info("raw param config content : {}", linkisParameters.getParamScript());
String script = "";
List<LinkisParameters.Param> paramList = linkisParameters.getParamScript();
for (LinkisParameters.Param param : paramList) {
script = script.concat(param.getProps())
.concat(Constants.SPACE)
.concat(param.getValue());
}
script = parseScript(script);
return script;
}
private void initTaskId() {
if (taskId == null) {
if (StringUtils.isNotEmpty(getAppIds())) {
taskId = getAppIds();
}
}
if (taskId == null) {
throw new TaskException("linkis task id is null");
}
}
protected String findTaskId(String line) {
Matcher matcher = LINKIS_TASK_ID_REGEX.matcher(line);
if (matcher.find()) {
String str = matcher.group();
return str.substring(11);
}
return null;
}
protected String findStatus(String line) {
Matcher matcher = LINKIS_STATUS_REGEX.matcher(line);
if (matcher.find()) {
String str = matcher.group();
return str.substring(11);
}
return null;
}
@Override
public AbstractParameters getParameters() {
return linkisParameters;
}
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
public void setLinkisParameters(LinkisParameters linkisParameters) {
this.linkisParameters = linkisParameters;
}
}

49
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTaskChannel.java

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
public class LinkisTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public LinkisTask createTask(TaskExecutionContext taskRequest) {
return new LinkisTask(taskRequest);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), LinkisParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return null;
}
}

45
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTaskChannelFactory.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class LinkisTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new LinkisTaskChannel();
}
@Override
public String getName() {
return "LINKIS";
}
@Override
public List<PluginParams> getParams() {
return null;
}
}

59
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/test/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTaskTest.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.linkis;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class LinkisTaskTest {
@Test
public void testBuildLinkisExecuteCommand() throws Exception {
Assertions.assertEquals("sh ./bin/shell-cli -engineType spark-2.4.3",
testBuildRunCommandLine(testBuildLinkisParameters()));
}
private LinkisParameters testBuildLinkisParameters() {
LinkisParameters linkisParameters = new LinkisParameters();
List<LinkisParameters.Param> testParamList = new ArrayList<>();
LinkisParameters.Param testParam = new LinkisParameters.Param();
testParam.setProps("-engineType");
testParam.setValue("spark-2.4.3");
testParamList.add(testParam);
linkisParameters.setUseCustom(false);
linkisParameters.setParamScript(testParamList);
return linkisParameters;
}
private static String testBuildRunCommandLine(LinkisParameters linkisParameters) {
List<String> args = new ArrayList<>();
String script = "";
List<LinkisParameters.Param> paramList = linkisParameters.getParamScript();
for (LinkisParameters.Param param : paramList) {
script = script.concat(param.getProps())
.concat(Constants.SPACE)
.concat(param.getValue());
}
args.add("sh ./bin/shell-cli");
args.add(script);
return String.join(Constants.SPACE, args);
}
}

1
dolphinscheduler-task-plugin/pom.xml

@ -65,6 +65,7 @@
<module>dolphinscheduler-task-dms</module> <module>dolphinscheduler-task-dms</module>
<module>dolphinscheduler-task-datasync</module> <module>dolphinscheduler-task-datasync</module>
<module>dolphinscheduler-task-kubeflow</module> <module>dolphinscheduler-task-kubeflow</module>
<module>dolphinscheduler-task-linkis</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>

BIN
dolphinscheduler-ui/public/images/task-icons/datasync.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

After

Width:  |  Height:  |  Size: 156 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/datasync_hover.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 156 KiB

After

Width:  |  Height:  |  Size: 24 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/linkis.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 940 B

BIN
dolphinscheduler-ui/public/images/task-icons/linkis_hover.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.5 KiB

4
dolphinscheduler-ui/src/store/project/task-type.ts

@ -145,6 +145,10 @@ export const TASK_TYPES_MAP = {
KUBEFLOW: { KUBEFLOW: {
alias: 'KUBEFLOW', alias: 'KUBEFLOW',
helperLinkDisable: true helperLinkDisable: true
},
LINKIS: {
alias: 'LINKIS',
helperLinkDisable: true
} }
} as { } as {
[key in TaskType]: { [key in TaskType]: {

1
dolphinscheduler-ui/src/store/project/types.ts

@ -55,6 +55,7 @@ type TaskType =
| 'DMS' | 'DMS'
| 'DATASYNC' | 'DATASYNC'
| 'KUBEFLOW' | 'KUBEFLOW'
| 'LINKIS'
type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON' type ProgramType = 'JAVA' | 'SCALA' | 'PYTHON'
type DependentResultType = { type DependentResultType = {

1
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -83,3 +83,4 @@ export { useHiveCli } from './use-hive-cli'
export { useDms } from './use-dms' export { useDms } from './use-dms'
export { useDatasync } from './use-datasync' export { useDatasync } from './use-datasync'
export { useKubeflow } from './use-kubeflow' export { useKubeflow } from './use-kubeflow'
export { useLinkis } from './use-linkis'

94
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-linkis.ts

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { computed } from 'vue'
import { useI18n } from 'vue-i18n'
import { useCustomParams } from '.'
import type { IJsonItem } from '../types'
export function useLinkis(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const configEditorSpan = computed(() => (model.useCustom ? 24 : 0))
const parmaEditorSpan = computed(() => (model.useCustom ? 0 : 24))
computed(() => (model.useCustom ? 0 : 24));
return [
{
type: 'switch',
field: 'useCustom',
name: t('project.node.custom_config')
},
{
type: 'custom-parameters',
field: 'paramScript',
name: t('project.node.option_parameters'),
span: parmaEditorSpan,
children: [
{
type: 'input',
field: 'prop',
span: 10,
props: {
placeholder: t('project.node.prop_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.prop_tips'))
}
const sameItems = model.localParams.filter(
(item: { prop: string }) => item.prop === value
)
if (sameItems.length > 1) {
return new Error(t('project.node.prop_repeat'))
}
}
}
},
{
type: 'input',
field: 'value',
span: 10,
props: {
placeholder: t('project.node.value_tips'),
maxLength: 256
}
}
]
},
{
type: 'editor',
field: 'rawScript',
name: t('project.node.script'),
span: configEditorSpan,
validate: {
trigger: ['input', 'trigger'],
required: model.useCustom,
validator(validate: any, value: string) {
if (model.useCustom && !value) {
return new Error(t('project.node.script_tips'))
}
}
}
},
...useCustomParams({ model, field: 'localParams', isSimple: true })
]
}

6
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -460,6 +460,12 @@ export function formatParams(data: INodeData): {
taskParams.yamlContent = data.yamlContent taskParams.yamlContent = data.yamlContent
taskParams.namespace = data.namespace taskParams.namespace = data.namespace
} }
if (data.taskType === 'LINKIS') {
taskParams.useCustom = data.useCustom
taskParams.paramScript = data.paramScript
taskParams.rawScript = data.rawScript
}
let timeoutNotifyStrategy = '' let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) { if (data.timeoutNotifyStrategy) {

5
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -49,7 +49,7 @@ import { useHiveCli } from './use-hive-cli'
import { useDms } from './use-dms' import { useDms } from './use-dms'
import { useDatasync } from './use-datasync' import { useDatasync } from './use-datasync'
import { useKubeflow } from './use-kubeflow' import { useKubeflow } from './use-kubeflow'
import { useLinkis } from './use-linkis'
export default { export default {
SHELL: useShell, SHELL: useShell,
@ -85,5 +85,6 @@ export default {
HIVECLI: useHiveCli, HIVECLI: useHiveCli,
DMS: useDms, DMS: useDms,
DATASYNC: useDatasync, DATASYNC: useDatasync,
KUBEFLOW: useKubeflow KUBEFLOW: useKubeflow,
LINKIS: useLinkis
} }

79
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-linkis.ts

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData } from '../types'
import { ITaskData } from '../types'
export function useLinkis({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'LINKIS',
flag: 'YES',
description: '',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
cpuQuota: -1,
memoryMax: -1,
delayTime: 0,
timeout: 30,
timeoutNotifyStrategy: ['WARN'],
useCustom: false,
paramScript: [
{
prop: '',
value: ''
},
],
rawScript: ''
} as INodeData)
return {
json: [
Fields.useName(from),
...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }),
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !data?.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
...Fields.useResourceLimit(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useLinkis(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

1
dolphinscheduler-ui/src/views/projects/task/components/node/types.ts

@ -396,6 +396,7 @@ interface ITaskParams {
name?: string name?: string
cloudWatchLogGroupArn?: string cloudWatchLogGroupArn?: string
yamlContent?: string yamlContent?: string
paramScript?: ILocalParam[]
} }
interface INodeData interface INodeData

5
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -49,6 +49,7 @@ export type TaskType =
| 'DMS' | 'DMS'
| 'DATASYNC' | 'DATASYNC'
| 'KUBEFLOW' | 'KUBEFLOW'
| 'LINKIS'
export type TaskExecuteType = 'STREAM' | 'BATCH' export type TaskExecuteType = 'STREAM' | 'BATCH'
@ -175,6 +176,10 @@ export const TASK_TYPES_MAP = {
KUBEFLOW: { KUBEFLOW: {
alias: 'KUBEFLOW', alias: 'KUBEFLOW',
helperLinkDisable: true helperLinkDisable: true
},
LINKIS: {
alias: 'LINKIS',
helperLinkDisable: true
} }
} as { } as {
[key in TaskType]: { [key in TaskType]: {

10
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -196,7 +196,10 @@ $bgLight: #ffffff;
background-image: url('/images/task-icons/dms.png'); background-image: url('/images/task-icons/dms.png');
} }
&.icon-datasync { &.icon-datasync {
background-image: url('/images/task-icons/datasync_hover.png'); background-image: url('/images/task-icons/datasync.png');
}
&.icon-linkis {
background-image: url('/images/task-icons/linkis.png');
} }
&.icon-kubeflow { &.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow_hover.png'); background-image: url('/images/task-icons/kubeflow_hover.png');
@ -300,7 +303,10 @@ $bgLight: #ffffff;
background-image: url('/images/task-icons/dms_hover.png'); background-image: url('/images/task-icons/dms_hover.png');
} }
&.icon-datasync { &.icon-datasync {
background-image: url('/images/task-icons/datasync.png'); background-image: url('/images/task-icons/datasync_hover.png');
}
&.icon-linkis {
background-image: url('/images/task-icons/linkis_hover.png');
} }
&.icon-kubeflow { &.icon-kubeflow {
background-image: url('/images/task-icons/kubeflow.png'); background-image: url('/images/task-icons/kubeflow.png');

1
script/env/dolphinscheduler_env.sh vendored

@ -31,6 +31,7 @@ export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax} export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel} export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun} export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export LINKIS_HOME=${LINKIS_HOME:-/opt/soft/linkis}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH

Loading…
Cancel
Save