From 5d0982df05e1dcfa2a09d4421c4aecec6ee285c9 Mon Sep 17 00:00:00 2001 From: eights Date: Wed, 10 Jun 2020 09:11:17 +0800 Subject: [PATCH] modify sqoop task UT --- .../sqoop/targets/TargetMysqlParameter.java | 2 +- .../sqoop/generator/SqoopJobGenerator.java | 22 ++- .../targets/MysqlTargetGenerator.java | 2 +- .../worker/task/sqoop/SqoopTaskTest.java | 125 +++++++++++------- 4 files changed, 96 insertions(+), 55 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java index 47126ae993..86931cbe46 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java @@ -106,7 +106,7 @@ public class TargetMysqlParameter { this.preQuery = preQuery; } - public boolean isUpdate() { + public boolean getIsUpdate() { return isUpdate; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java index 4e9cb84ff3..6815e323b5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator; @@ -62,14 +63,23 @@ public class SqoopJobGenerator { * @return */ public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){ - createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); - if(sourceGenerator == null || targetGenerator == null){ - return null; + + String sqoopScripts = ""; + + if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) { + createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); + if(sourceGenerator == null || targetGenerator == null){ + return null; + } + + sqoopScripts = commonGenerator.generate(sqoopParameters) + + sourceGenerator.generate(sqoopParameters,taskExecutionContext) + + targetGenerator.generate(sqoopParameters,taskExecutionContext); + } else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) { + sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n"); } - return commonGenerator.generate(sqoopParameters) - + sourceGenerator.generate(sqoopParameters,taskExecutionContext) - + targetGenerator.generate(sqoopParameters,taskExecutionContext); + return sqoopScripts; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java index aed8b9e24a..5b0b75d8e2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -78,7 +78,7 @@ public class MysqlTargetGenerator implements ITargetGenerator { result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'"); } - if(targetMysqlParameter.isUpdate() + if(targetMysqlParameter.getIsUpdate() && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey()) && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){ result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey()) diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index bfc8205c2d..df0d548b59 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -17,11 +17,9 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop; import com.alibaba.fastjson.JSON; -import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; -import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -35,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import java.util.*; +import java.util.Date; /** * sqoop task test @@ -52,64 +50,97 @@ public class SqoopTaskTest { @Before public void before() throws Exception{ processService = Mockito.mock(ProcessService.class); - Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); applicationContext = Mockito.mock(ApplicationContext.class); SpringApplicationContext springApplicationContext = new SpringApplicationContext(); springApplicationContext.setApplicationContext(applicationContext); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); - TaskProps props = new TaskProps(); - props.setTaskAppId(String.valueOf(System.currentTimeMillis())); - props.setTenantCode("1"); - props.setEnvFile(".dolphinscheduler_env.sh"); - props.setTaskStartTime(new Date()); - props.setTaskTimeout(0); - props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); - - sqoopTask = new SqoopTask(new TaskExecutionContext(),logger); + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis())); + taskExecutionContext.setTenantCode("1"); + taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh"); + taskExecutionContext.setStartTime(new Date()); + taskExecutionContext.setTaskTimeout(0); + taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," + + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," + + "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," + + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," + + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" + + ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," + + "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," + + "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); + + sqoopTask = new SqoopTask(taskExecutionContext,logger); + //test sqoop tash init method sqoopTask.init(); } + /** + * test SqoopJobGenerator + */ @Test public void testGenerator(){ - String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class); + TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext(); + //sqoop TEMPLATE job + //import mysql to HDFS + String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHdfsParams = JSON.parseObject(mysqlToHdfs,SqoopParameters.class); SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext()); - String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; - Assert.assertEquals(expected, script); - - String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class); - - String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext()); - String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; - Assert.assertEquals(expected2, script2); - - String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class); - - String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext()); - String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; - Assert.assertEquals(expected3, script3); - - String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; - SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class); - - String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext()); - String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; - Assert.assertEquals(expected4, script4); + String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams,mysqlTaskExecutionContext); + String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; + Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript); + + //export hdfs to mysql using update mode + String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," + + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," + + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hdfsToMysqlParams = JSON.parseObject(hdfsToMysql,SqoopParameters.class); + String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams,mysqlTaskExecutionContext); + String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; + Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript); + + //export hive to mysql + String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters hiveToMysqlParams = JSON.parseObject(hiveToMysql,SqoopParameters.class); + String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams,mysqlTaskExecutionContext); + String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; + Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript); + + //import mysql to hive + String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; + SqoopParameters mysqlToHiveParams = JSON.parseObject(mysqlToHive,SqoopParameters.class); + String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams,mysqlTaskExecutionContext); + String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; + Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); + + //sqoop CUSTOM job + String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}"; + SqoopParameters sqoopCustomParams = JSON.parseObject(sqoopCustomString, SqoopParameters.class); + String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext()); + String sqoopCustomExpected = "sqoop import"; + Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript); } - private DataSource getDataSource() { - DataSource dataSource = new DataSource(); - dataSource.setType(DbType.MYSQL); - dataSource.setConnectionParams( - "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"); - dataSource.setUserId(1); - return dataSource; + + /** + * get taskExecutionContext include mysql + * @return TaskExecutionContext + */ + private TaskExecutionContext getMysqlTaskExecutionContext() { + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); + String mysqlSourceConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + String mysqlTargetConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"; + sqoopTaskExecutionContext.setDataSourceId(2); + sqoopTaskExecutionContext.setDataTargetId(2); + sqoopTaskExecutionContext.setSourcetype(0); + sqoopTaskExecutionContext.setTargetConnectionParams(mysqlTargetConnectionParams); + sqoopTaskExecutionContext.setSourceConnectionParams(mysqlSourceConnectionParams); + sqoopTaskExecutionContext.setTargetType(0); + taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext); + return taskExecutionContext; } @Test