Browse Source

[Feature] Add new task type chunjun (#10937)

3.1.0-release
xuhhui 2 years ago committed by GitHub
parent
commit
513f336015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/configs/docsdev.js
  2. 73
      docs/docs/en/guide/task/chunjun.md
  3. 47
      docs/docs/zh/guide/task/chunjun.md
  4. BIN
      docs/img/tasks/demo/chunjun_task01.png
  5. BIN
      docs/img/tasks/demo/chunjun_task02.png
  6. BIN
      docs/img/tasks/demo/chunjun_task03.png
  7. BIN
      docs/img/tasks/icons/chunjun.png
  8. 52
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/pom.xml
  9. 31
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunConstants.java
  10. 139
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunParameters.java
  11. 257
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
  12. 52
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTaskChannel.java
  13. 59
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTaskChannelFactory.java
  14. 62
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTaskExecutionContext.java
  15. 46
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/test/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunConstantsTest.java
  16. 78
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/test/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunParametersTest.java
  17. 1
      dolphinscheduler-task-plugin/pom.xml
  18. BIN
      dolphinscheduler-ui/public/images/task-icons/chunjun.png
  19. BIN
      dolphinscheduler-ui/public/images/task-icons/chunjun_hover.png
  20. 2
      dolphinscheduler-ui/src/locales/en_US/project.ts
  21. 2
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  22. 2
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts
  23. 49
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-chunjun-deploy-mode.ts
  24. 124
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-chunjun.ts
  25. 8
      dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts
  26. 4
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts
  27. 86
      dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts
  28. 5
      dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts
  29. 6
      dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss
  30. 3
      script/env/dolphinscheduler_env.sh

8
docs/configs/docsdev.js

@ -185,6 +185,10 @@ export default {
title: 'SageMaker',
link: '/en-us/docs/dev/user_doc/guide/task/sagemaker.html',
},
{
title: 'ChunJun',
link: '/en-us/docs/dev/user_doc/guide/task/chunjun.html',
},
],
},
{
@ -626,6 +630,10 @@ export default {
title: 'SageMaker',
link: '/zh-cn/docs/dev/user_doc/guide/task/sagemaker.html',
},
{
title: 'ChunJun',
link: '/zh-cn/docs/dev/user_doc/guide/task/chunjun.html',
},
],
},
{

73
docs/docs/en/guide/task/chunjun.md

@ -0,0 +1,73 @@
# ChunJun
## Overview
ChunJun task type for executing ChunJun programs. For ChunJun nodes, the worker will execute `${CHUNJUN_HOME}/bin/start-chunjun` to analyze the input json file.
## Create Task
- Click `Project Management -> Project Name -> Workflow Definition`, and click the `Create Workflow` button to enter the DAG editing page.
- Drag the <img src="../../../../img/tasks/icons/chunjun.png" width="15"/> from the toolbar to the drawing board.
## Task Parameters
| **Parameter** | **Description** |
| ------- | ---------- |
| Node name | The node name in a workflow definition is unique. |
| Run flag | Identifies whether this node schedules normally, if it does not need to execute, select the prohibition execution. |
| Task priority | When the number of worker threads is insufficient, execute in the order of priority from high to low, and tasks with the same priority will execute in a first-in first-out order. |
| Description | Describe the function of the node. |
| Worker group | Assign tasks to the machines of the worker group to execute. If `Default` is selected, randomly select a worker machine for execution. |
| Environment Name | Configure the environment name in which run the script. |
| Number of failed retries | The number of times the task failed to resubmit. |
| Failed retry interval | The time interval (unit minute) for resubmitting the task after a failed task. |
| Task group name | The task group name. |
| Priority | The task priority. |
| Delayed execution time | The time, in minutes, that a task is delayed in execution. |
| Timeout alarm | Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail. |
| Custom template | Custom the content of the ChunJun node's json profile. |
| json | json configuration file for ChunJun synchronization. |
| Custom parameters | It is a user-defined parameter, and will replace the content with `${variable}` in the script.
| Deploy mode | Execute chunjun task mode, eg local standalone. |
| Option Parameters | Support such as `-confProp "{\"flink.checkpoint.interval\":60000}"` |
| Predecessor task | Selecting a predecessor task for the current task will set the selected predecessor task as upstream of the current task. |
## Task Example
This example demonstrates importing data from Hive into MySQL.
### Configuring the ChunJun environment in DolphinScheduler
If you are using the ChunJun task type in a production environment, it is necessary to configure the required environment first. The configuration file is as follows: `/dolphinscheduler/conf/env/dolphinscheduler_env.sh`.
![chunjun_task01](../../../../img/tasks/demo/chunjun_task01.png)
After the environment has been configured, DolphinScheduler needs to be restarted.
### Configuring ChunJun Task Node
As the data to be read from Hive, a custom json is required, refer to: [Hive Json Template](https://github.com/DTStack/chunjun/blob/master/chunjun-examples/json/hive/binlog_hive.json).
After writing the required json file, you can configure the node content by following the steps in the diagram below.
![chunjun_task02](../../../../img/tasks/demo/chunjun_task02.png)
### View run results
![chunjun_task03](../../../../img/tasks/demo/chunjun_task03.png)
### Note
Before execute ${CHUNJUN_HOME}/bin/start-chunjun, need to change the shell ${CHUNJUN_HOME}/bin/start-chunjun, remove '&' in order to run in front.
such as:
```shell
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@ &
```
update to following:
```shell
nohup $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $@
```

47
docs/docs/zh/guide/task/chunjun.md

@ -0,0 +1,47 @@
# ChunJun节点
## 综述
ChunJun 任务类型,用于执行 ChunJun 程序。对于 ChunJun 节点,worker 会通过执行 `${CHUNJUN_HOME}/bin/start-chunjun` 来解析传入的 json 文件。
## 创建任务
- 点击项目管理 -> 项目名称 -> 工作流定义,点击“创建工作流”按钮,进入 DAG 编辑页面;
- 拖动工具栏的<img src="../../../../img/tasks/icons/chunjun.png" width="15"/> 任务节点到画板中。
## 任务参数
- 节点名称:设置任务节点的名称。一个工作流定义中的节点名称是唯一的。
- 运行标志:标识这个结点是否能正常调度,如果不需要执行,可以打开禁止执行开关。
- 描述:描述该节点的功能。
- 任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。
- Worker 分组:任务分配给 worker 组的机器执行,选择 Default ,会随机选择一台 worker 机执行。
- 环境名称:配置运行脚本的环境。
- 任务组名称:任务组的名称。
- 组内优先级:一个任务组内此任务的优先级。
- 失败重试次数:任务失败重新提交的次数。
- 失败重试间隔:任务失败重新提交任务的时间间隔,以分为单位。
- 延时执行时间:任务延迟执行的时间,以分为单位。
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 自定义模板:自定义 ChunJun 节点的 json 配置文件内容,当前支持此种方式。
- json:ChunJun 同步的 json 配置文件。
- 自定义参数:用户自定义参数,会替换脚本中以 ${变量} 的内容。
- 部署方式: 执行ChunJun任务的方式,比如local,standalone等。
- 选项参数: 支持 `-confProp "{\"flink.checkpoint.interval\":60000}"` 格式。
- 前置任务:选择当前任务的前置任务,会将被选择的前置任务设置为当前任务的上游。
## 任务样例
该样例演示为从 Hive 数据导入到 MySQL 中。
### 在 DolphinScheduler 中配置 ChunJun 环境
若生产环境中要是使用到 ChunJun 任务类型,则需要先配置好所需的环境。配置文件如下:`/dolphinscheduler/conf/env/dolphinscheduler_env.sh`。
![chunjun_task01](../../../../img/tasks/demo/chunjun_task01.png)
当环境配置完成之后,需要重启 DolphinScheduler。
### 配置 ChunJun 任务节点
从 Hive 中读取数据,所以需要自定义 json,可参考:[Hive Json Template](https://github.com/DTStack/chunjun/blob/master/chunjun-examples/json/hive/binlog_hive.json)

BIN
docs/img/tasks/demo/chunjun_task01.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 77 KiB

BIN
docs/img/tasks/demo/chunjun_task02.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

BIN
docs/img/tasks/demo/chunjun_task03.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

BIN
docs/img/tasks/icons/chunjun.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 176 KiB

52
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/pom.xml

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>dev-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-chunjun</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-datasource-api</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

31
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunConstants.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
/**
* ChunJun constants
*/
public class ChunJunConstants {
public static final String FLINK_CONF_DIR = "${FLINK_HOME}/conf";
public static final String FLINK_LIB_DIR = "${FLINK_HOME}/lib";
public static final String HADOOP_CONF_DIR = "${HADOOP_HOME}/etc/hadoop";
}

139
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunParameters.java

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* chunjun parameters
*/
@Setter
@Getter
@ToString
public class ChunJunParameters extends AbstractParameters {
/**
* custom json configdefault 1, support custom json
*/
private int customConfig;
/**
* custom config json
*/
private String json;
/**
* other arguments -confProp "{\"flink.checkpoint.interval\":60000}"
*/
private String others;
/**
* deploy mode local standlone yarn-session yarn-per-job
*/
private String deployMode;
/**
* customConfig value is 0, datasource typeeg mysql
*/
private String dsType;
/**
* customConfig value is 0, datasource id int
*/
private int dataSource;
/**
* customConfig value is 0, datasource targetTypeeg MYSQL, POSTGRES
*/
private String dtType;
/**
* customConfig value is 0, data target id
*/
private int dataTarget;
/**
* customConfig value is 0, sql
*/
private String sql;
/**
* customConfig value is 0, target table
*/
private String targetTable;
/**
* pre statements
*/
private List<String> preStatements;
/**
* post statements
*/
private List<String> postStatements;
/**
* customConfig value is 0, job speed byte
*/
private int jobSpeedByte;
/**
* customConfig value is 0, job speed record count
*/
private int jobSpeedRecord;
@Override
public boolean checkParameters() {
if (customConfig == Flag.NO.ordinal()) {
return dataSource != 0 && dataTarget != 0
&& StringUtils.isNotEmpty(sql)
&& StringUtils.isNotEmpty(targetTable);
} else {
return StringUtils.isNotEmpty(json);
}
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
}
@Override
public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources();
if (customConfig == Flag.YES.ordinal()) {
return resources;
}
resources.put(ResourceType.DATASOURCE, dataSource);
resources.put(ResourceType.DATASOURCE, dataTarget);
return resources;
}
}

257
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

@ -0,0 +1,257 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* chunjun task
*/
public class ChunJunTask extends AbstractTaskExecutor {
/**
* chunjun path
*/
private static final String CHUNJUN_PATH = "${CHUNJUN_HOME}/bin/start-chunjun";
/**
* chunjun dist
*/
private static final String CHUNJUN_DIST_DIR = "${CHUNJUN_HOME}/chunjun-dist";
/**
* chunJun parameters
*/
private ChunJunParameters chunJunParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
public ChunJunTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext, logger);
}
/**
* init chunjun config
*/
@Override
public void init() {
logger.info("chunjun task params {}", taskExecutionContext.getTaskParams());
chunJunParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ChunJunParameters.class);
if (!chunJunParameters.checkParameters()) {
throw new RuntimeException("chunjun task params is not valid");
}
}
/**
* run chunjun process
*
* @throws Exception exception
*/
@Override
public void handle() throws Exception {
try {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String jsonFilePath = buildChunJunJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("chunjun task failed.", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
}
/**
* build chunjun json file
*
* @param paramsMap
* @return
* @throws Exception
*/
private String buildChunJunJsonFile(Map<String, Property> paramsMap)
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
String json = null;
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
if (chunJunParameters.getCustomConfig() == Flag.YES.ordinal()) {
json = chunJunParameters.getJson().replaceAll("\\r\\n", "\n");
}
// replace placeholder
json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
logger.debug("chunjun job json : {}", json);
// create chunjun json file
FileUtils.writeStringToFile(new File(fileName), json, StandardCharsets.UTF_8);
return fileName;
}
/**
* create command
*
* @return shell command file name
* @throws Exception if error throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(),
SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
// chunjun command
List<String> args = new ArrayList<>();
args.add(CHUNJUN_PATH);
args.add("-mode");
args.add(getExecMode(chunJunParameters));
args.add("-jobType sync");
args.add("-job");
args.add(jobConfigFilePath);
args.add("-chunjunDistDir");
args.add(CHUNJUN_DIST_DIR);
if (!"local".equalsIgnoreCase(getExecMode(chunJunParameters))) {
args.add("-flinkConfDir");
args.add(ChunJunConstants.FLINK_CONF_DIR);
args.add("-flinkLibDir");
args.add(ChunJunConstants.FLINK_LIB_DIR);
args.add("-hadoopConfDir");
args.add(ChunJunConstants.HADOOP_CONF_DIR);
}
if (chunJunParameters.getOthers() != null) {
args.add(chunJunParameters.getOthers());
}
String command = String.join(" ", args);
// replace placeholder
String chunjunCommand = ParameterUtils.convertParameterPlaceholders(command, ParamUtils.convert(paramsMap));
logger.info("raw script : {}", chunjunCommand);
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, chunjunCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
}
public String getExecMode(ChunJunParameters chunJunParameters) {
if (chunJunParameters.getDeployMode() == null) {
return "local";
}
return chunJunParameters.getDeployMode();
}
/**
* get task parameters
*
* @return AbstractParameters
*/
@Override
public AbstractParameters getParameters() {
return chunJunParameters;
}
/**
* cancel ChunJun process
*
* @param cancelApplication cancelApplication
* @throws Exception if error throws Exception
*/
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
}

52
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTaskChannel.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
/**
* chunjun task channel
*/
public class ChunJunTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public AbstractTask createTask(TaskExecutionContext taskExecutionContext) {
return new ChunJunTask(taskExecutionContext);
}
@Override
public AbstractParameters parseParameters(ParametersNode parametersNode) {
return JSONUtils.parseObject(parametersNode.getTaskParams(), ChunJunParameters.class);
}
@Override
public ResourceParametersHelper getResources(String parameters) {
return JSONUtils.parseObject(parameters, ChunJunParameters.class).getResources();
}
}

59
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTaskChannelFactory.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import java.util.List;
import com.google.auto.service.AutoService;
/**
* chunjun task channelFactory
*/
@AutoService(TaskChannelFactory.class)
public class ChunJunTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new ChunJunTaskChannel();
}
/**
* plugin name
* Must be UNIQUE .
*
* @return this task plugin name
*/
@Override
public String getName() {
return "CHUNJUN";
}
/**
* Returns the configurable parameters that this plugin needs to display on the web ui
*
* @return this plugin params
*/
@Override
public List<PluginParams> getParams() {
return null;
}
}

62
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTaskExecutionContext.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.io.Serializable;
import lombok.Data;
/**
* chunjun taskExecutionContext
*/
@Data
public class ChunJunTaskExecutionContext implements Serializable {
/**
* dataSourceId
*/
private int dataSourceId;
/**
* sourcetype
*/
private DbType sourcetype;
/**
* sourceConnectionParams
*/
private String sourceConnectionParams;
/**
* dataTargetId
*/
private int dataTargetId;
/**
* targetType
*/
private DbType targetType;
/**
* targetConnectionParams
*/
private String targetConnectionParams;
}

46
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/test/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunConstantsTest.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ChunJunConstantsTest {
private String flinkConfDir;
private String flinkLibDir;
private String hadoopConfDir;
@Before
public void setUp() {
flinkConfDir = "${FLINK_HOME}/conf";
flinkLibDir = "${FLINK_HOME}/lib";
hadoopConfDir = "${HADOOP_HOME}/etc/hadoop";
}
@Test
public void testEqualsString() {
Assert.assertEquals(ChunJunConstants.FLINK_CONF_DIR, flinkConfDir);
Assert.assertEquals(ChunJunConstants.FLINK_LIB_DIR, flinkLibDir);
Assert.assertEquals(ChunJunConstants.HADOOP_CONF_DIR, hadoopConfDir);
}
}

78
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/test/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunParametersTest.java

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.chunjun;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ChunJunParametersTest {
private ChunJunParameters chunJunParameters = new ChunJunParameters();
@Before
public void setUp() {
chunJunParameters.setCustomConfig(0);
chunJunParameters.setDataSource(1);
chunJunParameters.setDataTarget(1);
chunJunParameters.setDsType("MYSQL");
chunJunParameters.setDtType("MYSQL");
chunJunParameters.setJobSpeedByte(1);
chunJunParameters.setJobSpeedRecord(1);
chunJunParameters.setJson("json");
}
@Test
public void testToString() {
String expected = "ChunJunParameters"
+ "{"
+ "customConfig=0, "
+ "json='json', "
+ "dsType='MYSQL', "
+ "dataSource=1, "
+ "dtType='MYSQL', "
+ "dataTarget=1, "
+ "sql='null', "
+ "targetTable='null', "
+ "preStatements=null, "
+ "postStatements=null, "
+ "jobSpeedByte=1, "
+ "jobSpeedRecord=1, "
+ "others=xx, "
+ "deployMode=local"
+ "}";
Assert.assertNotEquals(expected, chunJunParameters.toString());
}
@Test
public void testCheckParameters() {
Assert.assertFalse(chunJunParameters.checkParameters());
}
@Test
public void testGetResourceFilesList() {
Assert.assertNotNull(chunJunParameters.getResourceFilesList());
}
@Test
public void testGetResources() {
Assert.assertNotNull(chunJunParameters.getResources());
}
}

1
dolphinscheduler-task-plugin/pom.xml

@ -58,6 +58,7 @@
<module>dolphinscheduler-task-dvc</module>
<module>dolphinscheduler-task-dinky</module>
<module>dolphinscheduler-task-sagemaker</module>
<module>dolphinscheduler-task-chunjun</module>
</modules>
<dependencyManagement>

BIN
dolphinscheduler-ui/public/images/task-icons/chunjun.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 KiB

BIN
dolphinscheduler-ui/public/images/task-icons/chunjun_hover.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.2 KiB

2
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -527,6 +527,8 @@ export default {
datax_job_runtime_memory_xms: 'Low Limit Value',
datax_job_runtime_memory_xmx: 'High Limit Value',
datax_job_runtime_memory_unit: 'G',
chunjun_custom_template: 'Custom Template',
chunjun_json_template: 'JSON',
current_hour: 'CurrentHour',
last_1_hour: 'Last1Hour',
last_2_hour: 'Last2Hours',

2
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -520,6 +520,8 @@ export default {
datax_job_runtime_memory_xms: '最小内存',
datax_job_runtime_memory_xmx: '最大内存',
datax_job_runtime_memory_unit: 'G',
chunjun_custom_template: '自定义模板',
chunjun_json_template: 'JSON',
current_hour: '当前小时',
last_1_hour: '前1小时',
last_2_hour: '前2小时',

2
dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts

@ -73,3 +73,5 @@ export { useOpenmldb } from './use-openmldb'
export { useDvc } from './use-dvc'
export { useDinky } from './use-dinky'
export { useSagemaker } from './use-sagemaker'
export { useChunjun } from './use-chunjun'
export { useChunjunDeployMode } from './use-chunjun-deploy-mode'

49
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-chunjun-deploy-mode.ts

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
export function useChunjunDeployMode(span = 24): IJsonItem {
const { t } = useI18n()
return {
type: 'radio',
field: 'deployMode',
name: t('project.node.deploy_mode'),
options: DEPLOY_MODES,
span
}
}
export const DEPLOY_MODES = [
{
label: 'local',
value: 'local'
},
{
label: 'standlone',
value: 'standlone'
},
{
label: 'yarn-session',
value: 'yarn-session'
},
{
label: 'yarn-per-job',
value: 'yarn-per-job'
}
]

124
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-chunjun.ts

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ref, onMounted, watch } from 'vue'
import { useI18n } from 'vue-i18n'
import type { IJsonItem } from '../types'
import { useChunjunDeployMode } from './'
export function useChunjun(model: { [field: string]: any }): IJsonItem[] {
const { t } = useI18n()
const jsonEditorSpan = ref(0)
const customParameterSpan = ref(0)
const initConstants = () => {
if (model.customConfig) {
jsonEditorSpan.value = 24
customParameterSpan.value = 24
} else {
jsonEditorSpan.value = 0
customParameterSpan.value = 0
}
}
onMounted(() => {
initConstants()
})
watch(
() => model.customConfig,
() => {
initConstants()
}
)
return [
{
type: 'switch',
field: 'customConfig',
value: true,
name: t('project.node.chunjun_custom_template'),
props: {
disabled: true
}
},
{
type: 'editor',
field: 'json',
name: t('project.node.chunjun_json_template'),
span: jsonEditorSpan,
validate: {
trigger: ['input', 'trigger'],
required: true,
message: t('project.node.sql_empty_tips')
}
},
{
type: 'custom-parameters',
field: 'localParams',
name: t('project.node.custom_parameters'),
span: customParameterSpan,
children: [
{
type: 'input',
field: 'prop',
span: 10,
props: {
placeholder: t('project.node.prop_tips'),
maxLength: 256
},
validate: {
trigger: ['input', 'blur'],
required: true,
validator(validate: any, value: string) {
if (!value) {
return new Error(t('project.node.prop_tips'))
}
const sameItems = model.localParams.filter(
(item: { prop: string }) => item.prop === value
)
if (sameItems.length > 1) {
return new Error(t('project.node.prop_repeat'))
}
}
}
},
{
type: 'input',
field: 'value',
span: 10,
props: {
placeholder: t('project.node.value_tips'),
maxLength: 256
}
}
]
},
useChunjunDeployMode(24),
{
type: 'input',
field: 'others',
name: t('project.node.option_parameters'),
props: {
type: 'textarea',
placeholder: t('project.node.option_parameters_tips')
}
}
]
}

8
dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts

@ -402,9 +402,17 @@ export function formatParams(data: INodeData): {
taskParams.sql = data.sql
}
if (data.taskType === 'CHUNJUN') {
taskParams.customConfig = data.customConfig ? 1 : 0
taskParams.json = data.json
taskParams.deployMode = data.deployMode
taskParams.others = data.others
}
if (data.taskType === 'PIGEON') {
taskParams.targetJobName = data.targetJobName
}
let timeoutNotifyStrategy = ''
if (data.timeoutNotifyStrategy) {
if (data.timeoutNotifyStrategy.length === 1) {

4
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts

@ -41,6 +41,7 @@ import { useOpenmldb } from './use-openmldb'
import { useDvc } from './use-dvc'
import { useDinky } from './use-dinky'
import { userSagemaker } from './use-sagemaker'
import { useChunjun } from './use-chunjun'
export default {
SHELL: useShell,
@ -68,5 +69,6 @@ export default {
OPENMLDB: useOpenmldb,
DVC: useDvc,
DINKY: useDinky,
SAGEMAKER: userSagemaker
SAGEMAKER: userSagemaker,
CHUNJUN: useChunjun
}

86
dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-chunjun.ts

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { reactive } from 'vue'
import * as Fields from '../fields/index'
import type { IJsonItem, INodeData } from '../types'
import { ITaskData } from '../types'
export function useChunjun({
projectCode,
from = 0,
readonly,
data
}: {
projectCode: number
from?: number
readonly?: boolean
data?: ITaskData
}) {
const model = reactive({
name: '',
taskType: 'CHUNJUN',
flag: 'YES',
description: '',
deployMode: 'local',
timeoutFlag: false,
localParams: [],
environmentCode: null,
failRetryInterval: 1,
failRetryTimes: 0,
workerGroup: 'default',
delayTime: 0,
timeout: 30,
customConfig: false,
preStatements: [],
postStatements: [],
timeoutNotifyStrategy: ['WARN']
} as INodeData)
let extra: IJsonItem[] = []
if (from === 1) {
extra = [
Fields.useTaskType(model, readonly),
Fields.useProcessName({
model,
projectCode,
isCreate: !data?.id,
from,
processName: data?.processName
})
]
}
return {
json: [
Fields.useName(from),
...extra,
Fields.useRunFlag(),
Fields.useDescription(),
Fields.useTaskPriority(),
Fields.useWorkerGroup(),
Fields.useEnvironmentName(model, !model.id),
...Fields.useTaskGroup(model, projectCode),
...Fields.useFailed(),
Fields.useDelayTime(model),
...Fields.useTimeoutAlarm(model),
...Fields.useChunjun(model),
Fields.usePreTasks()
] as IJsonItem[],
model
}
}

5
dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts

@ -41,6 +41,7 @@ export type TaskType =
| 'DVC'
| 'DINKY'
| 'SAGEMAKER'
| 'CHUNJUN'
export const TASK_TYPES_MAP = {
SHELL: {
@ -133,5 +134,9 @@ export const TASK_TYPES_MAP = {
SAGEMAKER: {
alias: 'SageMaker',
helperLinkDisable: true
},
CHUNJUN: {
alias: 'CHUNJUN',
helperLinkDisable: true
}
} as { [key in TaskType]: { alias: string; helperLinkDisable?: boolean } }

6
dolphinscheduler-ui/src/views/projects/workflow/components/dag/dag.module.scss

@ -179,6 +179,9 @@ $bgLight: #ffffff;
&.icon-sagemaker {
background-image: url('/images/task-icons/sagemaker.png');
}
&.icon-chunjun {
background-image: url('/images/task-icons/chunjun.png');
}
}
&:hover {
@ -261,6 +264,9 @@ $bgLight: #ffffff;
&.icon-sagemaker {
background-image: url('/images/task-icons/sagemaker_hover.png');
}
&.icon-chunjun {
background-image: url('/images/task-icons/chunjun_hover.png');
}
}
}
}

3
script/env/dolphinscheduler_env.sh vendored

@ -44,5 +44,6 @@ export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}
export SEATUNNEL_HOME=${SEATUNNEL_HOME:-/opt/soft/seatunnel}
export CHUNJUN_HOME=${CHUNJUN_HOME:-/opt/soft/chunjun}
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$PATH
export PATH=$HADOOP_HOME/bin:$SPARK_HOME1/bin:$SPARK_HOME2/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$SEATUNNEL_HOME/bin:$CHUNJUN_HOME/bin:$PATH

Loading…
Cancel
Save