Browse Source

Add seatunnel task plugin (#7131)

3.0.0/version-upgrade
Kerwin 3 years ago committed by GitHub
parent
commit
5172862a0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  2. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  3. 48
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/pom.xml
  4. 62
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java
  5. 170
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  6. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java
  7. 45
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannelFactory.java
  8. 1
      dolphinscheduler-task-plugin/pom.xml
  9. 8
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/taskbar.scss
  10. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js
  11. 14
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  12. 12
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/seatunnel.vue
  13. 0
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/images/task-icos/seatunnel.png
  14. 0
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/images/task-icos/seatunnel_hover.png
  15. 0
      dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_SEATUNNEL.png
  16. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  17. 2
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  18. 5
      dolphinscheduler-worker/pom.xml
  19. 5
      pom.xml

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -37,7 +37,7 @@ public enum TaskType {
* 10 DATAX
* 11 CONDITIONS
* 12 SQOOP
* 13 WATERDROP
* 13 SEATUNNEL
* 15 PIGEON
*/
SHELL(0, "SHELL"),
@ -53,7 +53,7 @@ public enum TaskType {
DATAX(10, "DATAX"),
CONDITIONS(11, "CONDITIONS"),
SQOOP(12, "SQOOP"),
WATERDROP(13, "WATERDROP"),
SEATUNNEL(13, "SEATUNNEL"),
SWITCH(14, "SWITCH"),
PIGEON(15, "PIGEON");

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -60,7 +60,7 @@ public class TaskParametersUtils {
case "SUB_PROCESS":
return JSONUtils.parseObject(parameter, SubProcessParameters.class);
case "SHELL":
case "WATERDROP":
case "SEATUNNEL":
return JSONUtils.parseObject(parameter, ShellParameters.class);
case "PROCEDURE":
return JSONUtils.parseObject(parameter, ProcedureParameters.class);

48
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/pom.xml

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dolphinscheduler-task-plugin</artifactId>
<groupId>org.apache.dolphinscheduler</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dolphinscheduler-task-seatunnel</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-spi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
</dependencies>
</project>

62
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelParameters.java

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import java.util.List;
public class SeatunnelParameters extends AbstractParameters {
/**
* shell script
*/
private String rawScript;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public String getRawScript() {
return rawScript;
}
public void setRawScript(String rawScript) {
this.rawScript = rawScript;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override
public boolean checkParameters() {
return rawScript != null && !rawScript.isEmpty();
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}

170
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.util.OSUtils;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* seatunnel task
*/
public class SeatunnelTask extends AbstractTaskExecutor {
/**
* seatunnel parameters
*/
private SeatunnelParameters seatunnelParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
*/
private TaskRequest taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public SeatunnelTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
logger);
}
@Override
public void init() {
logger.info("seatunnel task params {}", taskExecutionContext.getTaskParams());
seatunnelParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SeatunnelParameters.class);
if (!seatunnelParameters.checkParameters()) {
throw new RuntimeException("seatunnel task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("seatunnel task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
String script = seatunnelParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
seatunnelParameters.setRawScript(script);
logger.info("raw script : {}", seatunnelParameters.getRawScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, seatunnelParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return seatunnelParameters;
}
private String parseScript(String script) {
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext,getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
}

35
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannel.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
public class SeatunnelTaskChannel implements TaskChannel {
@Override
public void cancelApplication(boolean status) {
}
@Override
public SeatunnelTask createTask(TaskRequest taskRequest) {
return new SeatunnelTask(taskRequest);
}
}

45
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTaskChannelFactory.java

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
import com.google.auto.service.AutoService;
@AutoService(TaskChannelFactory.class)
public class SeatunnelTaskChannelFactory implements TaskChannelFactory {
@Override
public TaskChannel create() {
return new SeatunnelTaskChannel();
}
@Override
public String getName() {
return "SEATUNNEL";
}
@Override
public List<PluginParams> getParams() {
return null;
}
}

1
dolphinscheduler-task-plugin/pom.xml

@ -41,5 +41,6 @@
<module>dolphinscheduler-task-sqoop</module>
<module>dolphinscheduler-task-procedure</module>
<module>dolphinscheduler-task-pigeon</module>
<module>dolphinscheduler-task-seatunnel</module>
</modules>
</project>

8
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/canvas/taskbar.scss

@ -102,8 +102,8 @@
&.icos-conditions {
background-image: url("../images/task-icos/conditions.png");
}
&.icos-waterdrop {
background-image: url("../images/task-icos/waterdrop.png");
&.icos-seatunnel {
background-image: url("../images/task-icos/seatunnel.png");
}
&.icos-spark {
background-image: url("../images/task-icos/spark.png");
@ -162,8 +162,8 @@
&.icos-conditions {
background-image: url("../images/task-icos/conditions_hover.png");
}
&.icos-waterdrop {
background-image: url("../images/task-icos/waterdrop_hover.png");
&.icos-seatunnel {
background-image: url("../images/task-icos/seatunnel_hover.png");
}
&.icos-spark {
background-image: url("../images/task-icos/spark_hover.png");

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js

@ -333,7 +333,7 @@ const tasksType = {
desc: 'SWITCH',
color: '#E46F13'
},
WATERDROP: {
SEATUNNEL: {
desc: 'WATERDROP',
color: '#646465',
helperLinkDisable: true

14
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -464,15 +464,15 @@
:nodeData="nodeData"
:postTasks="postTasks"
></m-switch>
<!-- waterdrop node -->
<m-waterdrop
v-if="nodeData.taskType === 'WATERDROP'"
<!-- seatunnel node -->
<m-seatunnel
v-if="nodeData.taskType === 'SEATUNNEL'"
@on-params="_onParams"
@on-cache-params="_onCacheParams"
ref="WATERDROP"
ref="SEATUNNEL"
:backfill-item="backfillItem"
>
</m-waterdrop>
</m-seatunnel>
</div>
<!-- Pre-tasks in workflow -->
<m-pre-tasks ref="preTasks" :code="code" :fromTaskDefinition="fromTaskDefinition" :prevTasks="prevTasks" :processDefinition="processDefinition"/>
@ -507,7 +507,7 @@
import { findLocale } from '@/module/i18n/config'
import mListBox from './tasks/_source/listBox'
import mShell from './tasks/shell'
import mWaterdrop from './tasks/waterdrop'
import mSeatunnel from './tasks/seatunnel'
import mSpark from './tasks/spark'
import mFlink from './tasks/flink'
import mPython from './tasks/python'
@ -1160,7 +1160,7 @@
mListBox,
mMr,
mShell,
mWaterdrop,
mSeatunnel,
mSubProcess,
mProcedure,
mSql,

12
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/waterdrop.vue → dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/seatunnel.vue

@ -15,7 +15,7 @@
* limitations under the License.
*/
<template>
<div class="waterdrop-model">
<div class="seatunnel-model">
<!--deploy mode-->
<m-list-box>
<div slot="text">{{$t('Deploy Mode')}}</div>
@ -102,13 +102,13 @@
import { diGuiTree, searchTree } from './_source/resourceTree'
export default {
name: 'waterdrop',
name: 'seatunnel',
data () {
return {
valueConsistsOf: 'LEAF_PRIORITY',
// script
rawScript: '',
// waterdrop script
// seatunnel script
baseScript: 'sh ${WATERDROP_HOME}/bin/start-waterdrop.sh', // eslint-disable-line
// resourceNameVal
resourceNameVal: [],
@ -180,11 +180,11 @@
}
// noRes
if (!this.resourceNameVal.resourceList) {
this.$message.warning(`${i18n.$t('Please select the waterdrop resources')}`)
this.$message.warning(`${i18n.$t('Please select the seatunnel resources')}`)
return false
}
if (this.resourceNameVal.resourceList && this.resourceNameVal.resourceList.length === 0) {
this.$message.warning(`${i18n.$t('Please select the waterdrop resources')}`)
this.$message.warning(`${i18n.$t('Please select the seatunnel resources')}`)
return false
}
// Process resourcelist
@ -209,7 +209,7 @@
locparams = locparams + ' --variable ' + v.prop + '=' + v.value
}
)
// get waterdrop script
// get seatunnel script
let tureScript = ''
this.resourceNameVal.resourceList.forEach(v => {
tureScript = tureScript + this.baseScript +

0
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/images/task-icos/waterdrop.png → dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/images/task-icos/seatunnel.png

Before

Width:  |  Height:  |  Size: 1.1 KiB

After

Width:  |  Height:  |  Size: 1.1 KiB

0
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/images/task-icos/waterdrop_hover.png → dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/images/task-icos/seatunnel_hover.png

Before

Width:  |  Height:  |  Size: 1.0 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

0
dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_WATERDROP.png → dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_SEATUNNEL.png

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

2
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -678,7 +678,7 @@ export default {
ms: 'ms',
'Please Enter Url': 'Please Enter Url eg. 127.0.0.1:7077',
Master: 'Master',
'Please select the waterdrop resources': 'Please select the waterdrop resources',
'Please select the seatunnel resources': 'Please select the seatunnel resources',
zkDirectory: 'zkDirectory',
'Directory detail': 'Directory detail',
'Connection name': 'Connection name',

2
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -678,7 +678,7 @@ export default {
ms: '毫秒',
'Please Enter Url': '请直接填写地址,例如:127.0.0.1:7077',
Master: 'Master',
'Please select the waterdrop resources': '请选择waterdrop配置文件',
'Please select the seatunnel resources': '请选择 seatunnel 配置文件',
zkDirectory: 'zk注册目录',
'Directory detail': '查看目录详情',
'Connection name': '连线名',

5
dolphinscheduler-worker/pom.xml

@ -96,6 +96,11 @@
<artifactId>dolphinscheduler-task-sqoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-seatunnel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>

5
pom.xml

@ -467,6 +467,11 @@
<artifactId>dolphinscheduler-task-sqoop</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-task-seatunnel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-ui</artifactId>

Loading…
Cancel
Save