Browse Source

[Improvement][Task] DataX Node doesn't Support Clickhouse as datasource (#5180) (#5243)

* [Improvement][Module Name] DataX Node doesn't Support Clickhouse as the datasource (#5180)

* add ut

* add ut

* resolve code smell
pull/3/MERGE
wenjun 4 years ago committed by GitHub
parent
commit
cc7a4446f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java
  2. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
  3. 15
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java
  4. 24
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
  5. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
  6. 2
      pom.xml

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java

@ -36,6 +36,8 @@ public class DataxUtils {
public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader"; public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader";
public static final String DATAX_READER_PLUGIN_CLICKHOUSE = "clickhousereader";
public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter";
public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter";
@ -44,6 +46,8 @@ public class DataxUtils {
public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter"; public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter";
public static final String DATAX_WRITER_PLUGIN_CLICKHOUSE = "clickhousewriter";
public static String getReaderPluginName(DbType dbType) { public static String getReaderPluginName(DbType dbType) {
switch (dbType) { switch (dbType) {
case MYSQL: case MYSQL:
@ -54,6 +58,8 @@ public class DataxUtils {
return DATAX_READER_PLUGIN_ORACLE; return DATAX_READER_PLUGIN_ORACLE;
case SQLSERVER: case SQLSERVER:
return DATAX_READER_PLUGIN_SQLSERVER; return DATAX_READER_PLUGIN_SQLSERVER;
case CLICKHOUSE:
return DATAX_READER_PLUGIN_CLICKHOUSE;
default: default:
return null; return null;
} }
@ -69,6 +75,8 @@ public class DataxUtils {
return DATAX_WRITER_PLUGIN_ORACLE; return DATAX_WRITER_PLUGIN_ORACLE;
case SQLSERVER: case SQLSERVER:
return DATAX_WRITER_PLUGIN_SQLSERVER; return DATAX_WRITER_PLUGIN_SQLSERVER;
case CLICKHOUSE:
return DATAX_WRITER_PLUGIN_CLICKHOUSE;
default: default:
return null; return null;
} }

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -310,8 +310,8 @@ public class DataxTask extends AbstractTask {
List<ObjectNode> contentList = new ArrayList<>(); List<ObjectNode> contentList = new ArrayList<>();
ObjectNode content = JSONUtils.createObjectNode(); ObjectNode content = JSONUtils.createObjectNode();
content.put("reader", reader.toString()); content.set("reader", reader);
content.put("writer", writer.toString()); content.set("writer", writer);
contentList.add(content); contentList.add(content);
return contentList; return contentList;
@ -341,8 +341,8 @@ public class DataxTask extends AbstractTask {
errorLimit.put("percentage", 0); errorLimit.put("percentage", 0);
ObjectNode setting = JSONUtils.createObjectNode(); ObjectNode setting = JSONUtils.createObjectNode();
setting.put("speed", speed.toString()); setting.set("speed", speed);
setting.put("errorLimit", errorLimit.toString()); setting.set("errorLimit", errorLimit);
return setting; return setting;
} }
@ -462,7 +462,10 @@ public class DataxTask extends AbstractTask {
try { try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); if (parser == null) {
logger.warn("database driver [{}] is not support grammatical analysis sql", dbType);
return new String[0];
}
SQLStatement sqlStatement = parser.parseStatement(); SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement; SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement;
@ -511,7 +514,7 @@ public class DataxTask extends AbstractTask {
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
return null; return new String[0];
} }
return columnNames; return columnNames;

15
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java

@ -14,17 +14,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.utils; package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.junit.Test;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** /**
* DataxUtils Tester. * DataxUtils Tester.
@ -42,6 +45,7 @@ public class DataxUtilsTest {
assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL)); assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER)); assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE)); assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_CLICKHOUSE, DataxUtils.getReaderPluginName(DbType.CLICKHOUSE));
assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null); assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null);
} }
@ -56,6 +60,7 @@ public class DataxUtilsTest {
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL)); assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER)); assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE)); assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_CLICKHOUSE, DataxUtils.getWriterPluginName(DbType.CLICKHOUSE));
assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null); assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null);
} }

24
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java

@ -43,6 +43,7 @@ import java.util.UUID;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito; import org.powermock.api.mockito.PowerMockito;
@ -304,6 +305,7 @@ public class DataxTaskTest {
* Method: buildDataxJsonFile() * Method: buildDataxJsonFile()
*/ */
@Test @Test
@Ignore("method not found")
public void testBuildDataxJsonFile() public void testBuildDataxJsonFile()
throws Exception { throws Exception {
@ -322,6 +324,7 @@ public class DataxTaskTest {
* Method: buildDataxJsonFile() * Method: buildDataxJsonFile()
*/ */
@Test @Test
@Ignore("method not found")
public void testBuildDataxJsonFile0() public void testBuildDataxJsonFile0()
throws Exception { throws Exception {
try { try {
@ -348,17 +351,29 @@ public class DataxTaskTest {
Assert.assertNotNull(contentList); Assert.assertNotNull(contentList);
ObjectNode content = contentList.get(0); ObjectNode content = contentList.get(0);
JsonNode reader = JSONUtils.parseObject(content.path("reader").asText()); JsonNode reader = JSONUtils.parseObject(content.path("reader").toString());
Assert.assertNotNull(reader); Assert.assertNotNull(reader);
Assert.assertEquals("{\"name\":\"mysqlreader\",\"parameter\":{\"username\":\"root\","
+ "\"password\":\"123456\",\"connection\":[{\"querySql\":[\"select 1 as test from dual\"],"
+ "\"jdbcUrl\":[\"jdbc:mysql://127.0.0.1:3306/test?allowLoadLocalInfile=false"
+ "&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\"]}]}}",
reader.toString());
String readerPluginName = reader.path("name").asText(); String readerPluginName = reader.path("name").asText();
Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName); Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName);
JsonNode writer = JSONUtils.parseObject(content.path("writer").asText()); JsonNode writer = JSONUtils.parseObject(content.path("writer").toString());
Assert.assertNotNull(writer); Assert.assertNotNull(writer);
Assert.assertEquals("{\"name\":\"mysqlwriter\",\"parameter\":{\"username\":\"root\","
+ "\"password\":\"123456\",\"column\":[\"`test`\"],\"connection\":[{\"table\":[\"test\"],"
+ "\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test?allowLoadLocalInfile=false&"
+ "autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\"}],"
+ "\"preSql\":[\"delete from test\"],\"postSql\":[\"delete from test\"]}}",
writer.toString());
String writerPluginName = writer.path("name").asText(); String writerPluginName = writer.path("name").asText();
Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName); Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName);
} catch (Exception e) { } catch (Exception e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
@ -375,8 +390,8 @@ public class DataxTaskTest {
method.setAccessible(true); method.setAccessible(true);
JsonNode setting = (JsonNode) method.invoke(dataxTask, null); JsonNode setting = (JsonNode) method.invoke(dataxTask, null);
Assert.assertNotNull(setting); Assert.assertNotNull(setting);
Assert.assertNotNull(setting.get("speed")); Assert.assertEquals("{\"channel\":1,\"record\":1000}", setting.get("speed").toString());
Assert.assertNotNull(setting.get("errorLimit")); Assert.assertEquals("{\"record\":0,\"percentage\":0}", setting.get("errorLimit").toString());
} catch (Exception e) { } catch (Exception e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
@ -403,6 +418,7 @@ public class DataxTaskTest {
* Method: buildShellCommandFile(String jobConfigFilePath) * Method: buildShellCommandFile(String jobConfigFilePath)
*/ */
@Test @Test
@Ignore("method not found")
public void testBuildShellCommandFile() public void testBuildShellCommandFile()
throws Exception { throws Exception {
try { try {

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue

@ -33,7 +33,7 @@
<m-datasource <m-datasource
ref="refDs" ref="refDs"
@on-dsData="_onDsData" @on-dsData="_onDsData"
:supportType="['MYSQL','POSTGRESQL', 'ORACLE', 'SQLSERVER']" :supportType="['MYSQL','POSTGRESQL', 'ORACLE', 'SQLSERVER', 'CLICKHOUSE']"
:data="{ type:dsType,datasource:datasource }"> :data="{ type:dsType,datasource:datasource }">
</m-datasource> </m-datasource>
</div> </div>

2
pom.xml

@ -952,7 +952,7 @@
<include>**/server/worker/task/spark/SparkTaskTest.java</include> <include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include> <include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include> <include>**/server/worker/task/spark/SparkTaskTest.java</include>
<!--<include>**/server/worker/task/datax/DataxTaskTest.java</include>--> <include>**/server/worker/task/datax/DataxTaskTest.java</include>
<!--<include>**/server/worker/task/http/HttpTaskTest.java</include>--> <!--<include>**/server/worker/task/http/HttpTaskTest.java</include>-->
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include> <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/task/shell/ShellTaskTest.java</include> <include>**/server/worker/task/shell/ShellTaskTest.java</include>

Loading…
Cancel
Save