Browse Source

[Feature-10182][task-datax] (#11387)

In the process of realizing datax synchronization, use custom json for better connection to enable kerberos-authenticated clusters, such as using plug-ins such as hdfs and hbase
3.1.0-release
lugela 2 years ago committed by GitHub
parent
commit
e5d06c40a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/docs/en/guide/task/datax.md
  2. 1
      docs/docs/zh/guide/task/datax.md
  3. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java
  4. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java
  5. 7
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts

1
docs/docs/en/guide/task/datax.md

@ -27,6 +27,7 @@ DataX task type for executing DataX programs. For DataX nodes, the worker will e
| Timeout alarm | Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail. |
| Custom template | Custom the content of the DataX node's json profile when the default data source provided does not meet the required requirements. |
| json | json configuration file for DataX synchronization. |
| Resource | When using custom json, if the cluster has kerberos authentication enabled, and datax needs to use the relevant keytab, xml file, etc. when reading or writing plug-ins such as hdfs and hbase, you can use this option. and the files uploaded or created in Resource Center - File Management.|
| Custom parameters | SQL task type, and stored procedure is a custom parameter order to set values for the method. The custom parameter type and data type are the same as the stored procedure task type. The difference is that the SQL task type custom parameter will replace the \${variable} in the SQL statement. |
| Data source | Select the data source from which the data will be extracted. |
| sql statement | the sql statement used to extract data from the target database, the sql query column name is automatically parsed when the node is executed, and mapped to the target table synchronization column name. When the source table and target table column names are inconsistent, they can be converted by column alias. |

1
docs/docs/zh/guide/task/datax.md

@ -25,6 +25,7 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker
- 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。
- 自定义模板:当默认提供的数据源不满足所需要求的时,可自定义 datax 节点的 json 配置文件内容。
- json:DataX 同步的 json 配置文件。
- 资源:在使用自定义json中如果集群开启了kerberos认证后,datax读取或者写入hdfs、hbase等插件时需要使用相关的keytab,xml文件等,则可使用改选项。资源中心-文件管理上传或创建的文件
- 自定义参数:sql 任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换 sql 语句中 ${变量}。
- 数据源:选择抽取数据的数据源。
- sql 语句:目标库抽取数据的 sql 语句,节点执行时自动解析 sql 查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换。

17
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.ArrayList;
@ -104,6 +105,11 @@ public class DataxParameters extends AbstractParameters {
*/
private int xmx;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public int getCustomConfig() {
return customConfig;
}
@ -216,6 +222,14 @@ public class DataxParameters extends AbstractParameters {
this.xmx = xmx;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override
public boolean checkParameters() {
if (customConfig == Flag.NO.ordinal()) {
@ -230,7 +244,7 @@ public class DataxParameters extends AbstractParameters {
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
return resourceList;
}
@Override
@ -250,6 +264,7 @@ public class DataxParameters extends AbstractParameters {
+ ", jobSpeedRecord=" + jobSpeedRecord
+ ", xms=" + xms
+ ", xmx=" + xmx
+ ", resourceList=" + JSONUtils.toJsonString(resourceList)
+ '}';
}

14
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java

@ -17,9 +17,13 @@
package org.apache.dolphinscheduler.plugin.task.datax;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
public class DataxParametersTest {
/**
@ -51,6 +55,12 @@ public class DataxParametersTest {
public void testToString() {
DataxParameters dataxParameters = new DataxParameters();
List<ResourceInfo> resourceInfoList = new ArrayList<>();
ResourceInfo resourceInfo = new ResourceInfo();
resourceInfo.setId(2);
resourceInfo.setResourceName("/hdfs.keytab");
resourceInfoList.add(resourceInfo);
dataxParameters.setCustomConfig(0);
dataxParameters.setXms(0);
dataxParameters.setXmx(-100);
@ -61,6 +71,7 @@ public class DataxParametersTest {
dataxParameters.setJobSpeedByte(1);
dataxParameters.setJobSpeedRecord(1);
dataxParameters.setJson("json");
dataxParameters.setResourceList(resourceInfoList);
String expected = "DataxParameters"
+ "{"
@ -77,7 +88,8 @@ public class DataxParametersTest {
+ "jobSpeedByte=1, "
+ "jobSpeedRecord=1, "
+ "xms=0, "
+ "xmx=-100"
+ "xmx=-100, "
+ "resourceList=[{\"id\":2,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]"
+ "}";
Assert.assertEquals(expected,dataxParameters.toString());

7
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts

@ -16,7 +16,7 @@
*/
import { ref, onMounted, watch } from 'vue'
import { useI18n } from 'vue-i18n'
import { useCustomParams, useDatasource } from '.'
import {useCustomParams, useDatasource, useResources} from '.'
import type { IJsonItem } from '../types'
export function useDataX(model: { [field: string]: any }): IJsonItem[] {
@ -103,6 +103,7 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
const otherStatementSpan = ref(22)
const jobSpeedSpan = ref(12)
const customParameterSpan = ref(0)
const useResourcesSpan = ref(0)
const initConstants = () => {
if (model.customConfig) {
@ -113,6 +114,7 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
otherStatementSpan.value = 0
jobSpeedSpan.value = 0
customParameterSpan.value = 24
useResourcesSpan.value = 24
} else {
sqlEditorSpan.value = 24
jsonEditorSpan.value = 0
@ -121,6 +123,8 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
otherStatementSpan.value = 22
jobSpeedSpan.value = 12
customParameterSpan.value = 0
useResourcesSpan.value = 0
}
}
@ -167,6 +171,7 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] {
message: t('project.node.sql_empty_tips')
}
},
useResources(useResourcesSpan),
...useDatasource(model, {
typeField: 'dtType',
sourceField: 'dataTarget',

Loading…
Cancel
Save