From e5d06c40a524740454868dea9ed5fa778d9ccbf7 Mon Sep 17 00:00:00 2001 From: lugela <374029008@qq.com> Date: Wed, 17 Aug 2022 10:01:20 +0800 Subject: [PATCH] [Feature-10182][task-datax] (#11387) In the process of realizing datax synchronization, use custom json for better connection to enable kerberos-authenticated clusters, such as using plug-ins such as hdfs and hbase --- docs/docs/en/guide/task/datax.md | 1 + docs/docs/zh/guide/task/datax.md | 1 + .../plugin/task/datax/DataxParameters.java | 17 ++++++++++++++++- .../plugin/task/datax/DataxParametersTest.java | 14 +++++++++++++- .../task/components/node/fields/use-datax.ts | 7 ++++++- 5 files changed, 37 insertions(+), 3 deletions(-) diff --git a/docs/docs/en/guide/task/datax.md b/docs/docs/en/guide/task/datax.md index d3ceea8e73..e0ee5d7ed5 100644 --- a/docs/docs/en/guide/task/datax.md +++ b/docs/docs/en/guide/task/datax.md @@ -27,6 +27,7 @@ DataX task type for executing DataX programs. For DataX nodes, the worker will e | Timeout alarm | Check the timeout alarm and timeout failure. When the task exceeds the "timeout period", an alarm email will be sent and the task execution will fail. | | Custom template | Custom the content of the DataX node's json profile when the default data source provided does not meet the required requirements. | | json | json configuration file for DataX synchronization. | +| Resource | When using custom json, if the cluster has kerberos authentication enabled, and datax needs to use the relevant keytab, xml file, etc. when reading or writing plug-ins such as hdfs and hbase, you can use this option. and the files uploaded or created in Resource Center - File Management.| | Custom parameters | SQL task type, and stored procedure is a custom parameter order to set values for the method. The custom parameter type and data type are the same as the stored procedure task type. The difference is that the SQL task type custom parameter will replace the \${variable} in the SQL statement. | | Data source | Select the data source from which the data will be extracted. | | sql statement | the sql statement used to extract data from the target database, the sql query column name is automatically parsed when the node is executed, and mapped to the target table synchronization column name. When the source table and target table column names are inconsistent, they can be converted by column alias. | diff --git a/docs/docs/zh/guide/task/datax.md b/docs/docs/zh/guide/task/datax.md index 49340db726..39eca9b32e 100644 --- a/docs/docs/zh/guide/task/datax.md +++ b/docs/docs/zh/guide/task/datax.md @@ -25,6 +25,7 @@ DataX 任务类型,用于执行 DataX 程序。对于 DataX 节点,worker - 超时警告:勾选超时警告、超时失败,当任务超过“超时时长”后,会发送告警邮件并且任务执行失败。 - 自定义模板:当默认提供的数据源不满足所需要求的时,可自定义 datax 节点的 json 配置文件内容。 - json:DataX 同步的 json 配置文件。 +- 资源:在使用自定义json中如果集群开启了kerberos认证后,datax读取或者写入hdfs、hbase等插件时需要使用相关的keytab,xml文件等,则可使用改选项。资源中心-文件管理上传或创建的文件 - 自定义参数:sql 任务类型,而存储过程是自定义参数顺序的给方法设置值自定义参数类型和数据类型同存储过程任务类型一样。区别在于SQL任务类型自定义参数会替换 sql 语句中 ${变量}。 - 数据源:选择抽取数据的数据源。 - sql 语句:目标库抽取数据的 sql 语句,节点执行时自动解析 sql 查询列名,映射为目标表同步列名,源表和目标表列名不一致时,可以通过列别名(as)转换。 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java index d6fd449462..391e94658e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.DataSourceParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.spi.enums.Flag; +import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.util.ArrayList; @@ -104,6 +105,11 @@ public class DataxParameters extends AbstractParameters { */ private int xmx; + /** + * resource list + */ + private List resourceList; + public int getCustomConfig() { return customConfig; } @@ -216,6 +222,14 @@ public class DataxParameters extends AbstractParameters { this.xmx = xmx; } + public List getResourceList() { + return resourceList; + } + + public void setResourceList(List resourceList) { + this.resourceList = resourceList; + } + @Override public boolean checkParameters() { if (customConfig == Flag.NO.ordinal()) { @@ -230,7 +244,7 @@ public class DataxParameters extends AbstractParameters { @Override public List getResourceFilesList() { - return new ArrayList<>(); + return resourceList; } @Override @@ -250,6 +264,7 @@ public class DataxParameters extends AbstractParameters { + ", jobSpeedRecord=" + jobSpeedRecord + ", xms=" + xms + ", xmx=" + xmx + + ", resourceList=" + JSONUtils.toJsonString(resourceList) + '}'; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java index 05fdaeaf71..c7d317005e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParametersTest.java @@ -17,9 +17,13 @@ package org.apache.dolphinscheduler.plugin.task.datax; +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + public class DataxParametersTest { /** @@ -51,6 +55,12 @@ public class DataxParametersTest { public void testToString() { DataxParameters dataxParameters = new DataxParameters(); + List resourceInfoList = new ArrayList<>(); + ResourceInfo resourceInfo = new ResourceInfo(); + resourceInfo.setId(2); + resourceInfo.setResourceName("/hdfs.keytab"); + resourceInfoList.add(resourceInfo); + dataxParameters.setCustomConfig(0); dataxParameters.setXms(0); dataxParameters.setXmx(-100); @@ -61,6 +71,7 @@ public class DataxParametersTest { dataxParameters.setJobSpeedByte(1); dataxParameters.setJobSpeedRecord(1); dataxParameters.setJson("json"); + dataxParameters.setResourceList(resourceInfoList); String expected = "DataxParameters" + "{" @@ -77,7 +88,8 @@ public class DataxParametersTest { + "jobSpeedByte=1, " + "jobSpeedRecord=1, " + "xms=0, " - + "xmx=-100" + + "xmx=-100, " + + "resourceList=[{\"id\":2,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]" + "}"; Assert.assertEquals(expected,dataxParameters.toString()); diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts index 2176761e2f..6c56e5d85b 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts @@ -16,7 +16,7 @@ */ import { ref, onMounted, watch } from 'vue' import { useI18n } from 'vue-i18n' -import { useCustomParams, useDatasource } from '.' +import {useCustomParams, useDatasource, useResources} from '.' import type { IJsonItem } from '../types' export function useDataX(model: { [field: string]: any }): IJsonItem[] { @@ -103,6 +103,7 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { const otherStatementSpan = ref(22) const jobSpeedSpan = ref(12) const customParameterSpan = ref(0) + const useResourcesSpan = ref(0) const initConstants = () => { if (model.customConfig) { @@ -113,6 +114,7 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { otherStatementSpan.value = 0 jobSpeedSpan.value = 0 customParameterSpan.value = 24 + useResourcesSpan.value = 24 } else { sqlEditorSpan.value = 24 jsonEditorSpan.value = 0 @@ -121,6 +123,8 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { otherStatementSpan.value = 22 jobSpeedSpan.value = 12 customParameterSpan.value = 0 + useResourcesSpan.value = 0 + } } @@ -167,6 +171,7 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { message: t('project.node.sql_empty_tips') } }, + useResources(useResourcesSpan), ...useDatasource(model, { typeField: 'dtType', sourceField: 'dataTarget',