@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* See the License for the specific language governing permissions and
* limitations under the License .
* limitations under the License .
* /
* /
package org.apache.dolphinscheduler.server.worker.task.sqoop ;
package org.apache.dolphinscheduler.server.worker.task.sqoop ;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters ;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters ;
@ -23,6 +24,9 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator ;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import java.util.Date ;
import org.junit.Assert ;
import org.junit.Assert ;
import org.junit.Before ;
import org.junit.Before ;
import org.junit.Test ;
import org.junit.Test ;
@ -33,7 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory ;
import org.slf4j.LoggerFactory ;
import org.springframework.context.ApplicationContext ;
import org.springframework.context.ApplicationContext ;
import java.util.Date ;
/ * *
/ * *
* sqoop task test
* sqoop task test
@ -43,14 +46,12 @@ public class SqoopTaskTest {
private static final Logger logger = LoggerFactory . getLogger ( SqoopTaskTest . class ) ;
private static final Logger logger = LoggerFactory . getLogger ( SqoopTaskTest . class ) ;
private ProcessService processService ;
private ApplicationContext applicationContext ;
private SqoopTask sqoopTask ;
private SqoopTask sqoopTask ;
@Before
@Before
public void before ( ) throws Exception {
public void before ( ) {
processService = Mockito . mock ( ProcessService . class ) ;
ProcessService processService = Mockito . mock ( ProcessService . class ) ;
applicationContext = Mockito . mock ( ApplicationContext . class ) ;
ApplicationContext applicationContext = Mockito . mock ( ApplicationContext . class ) ;
SpringApplicationContext springApplicationContext = new SpringApplicationContext ( ) ;
SpringApplicationContext springApplicationContext = new SpringApplicationContext ( ) ;
springApplicationContext . setApplicationContext ( applicationContext ) ;
springApplicationContext . setApplicationContext ( applicationContext ) ;
Mockito . when ( applicationContext . getBean ( ProcessService . class ) ) . thenReturn ( processService ) ;
Mockito . when ( applicationContext . getBean ( ProcessService . class ) ) . thenReturn ( processService ) ;
@ -61,17 +62,17 @@ public class SqoopTaskTest {
taskExecutionContext . setEnvFile ( ".dolphinscheduler_env.sh" ) ;
taskExecutionContext . setEnvFile ( ".dolphinscheduler_env.sh" ) ;
taskExecutionContext . setStartTime ( new Date ( ) ) ;
taskExecutionContext . setStartTime ( new Date ( ) ) ;
taskExecutionContext . setTaskTimeout ( 0 ) ;
taskExecutionContext . setTaskTimeout ( 0 ) ;
taskExecutionContext . setTaskParams ( "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," +
taskExecutionContext . setTaskParams ( "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
"\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," +
+ "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\","
"\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," +
+ "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\","
"\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," +
+ "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],"
"\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" +
+ "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\""
",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," +
+ ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,"
"\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," +
+ "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\","
"\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ) ;
+ "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ) ;
sqoopTask = new SqoopTask ( taskExecutionContext , logger ) ;
sqoopTask = new SqoopTask ( taskExecutionContext , logger ) ;
//test sqoop tash init method
//test sqoop task init method
sqoopTask . init ( ) ;
sqoopTask . init ( ) ;
}
}
@ -79,40 +80,72 @@ public class SqoopTaskTest {
* test SqoopJobGenerator
* test SqoopJobGenerator
* /
* /
@Test
@Test
public void testGenerator ( ) {
public void testGenerator ( ) {
TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext ( ) ;
TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext ( ) ;
//sqoop TEMPLATE job
//sqoop TEMPLATE job
//import mysql to HDFS with hadoo
//import mysql to HDFS with hadoop
String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," +
String mysqlToHdfs =
"\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
"{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],"
SqoopParameters mysqlToHdfsParams = JSONUtils . parseObject ( mysqlToHdfs , SqoopParameters . class ) ;
+ "\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+ "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\","
+ "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\","
+ "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\","
+ "\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+ "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters mysqlToHdfsParams = JSONUtils . parseObject ( mysqlToHdfs , SqoopParameters . class ) ;
SqoopJobGenerator generator = new SqoopJobGenerator ( ) ;
SqoopJobGenerator generator = new SqoopJobGenerator ( ) ;
String mysqlToHdfsScript = generator . generateSqoopJob ( mysqlToHdfsParams , mysqlTaskExecutionContext ) ;
String mysqlToHdfsScript = generator . generateSqoopJob ( mysqlToHdfsParams , mysqlTaskExecutionContext ) ;
String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'" ;
String mysqlToHdfsExpected =
"sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test "
+ "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile "
+ "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'" ;
Assert . assertEquals ( mysqlToHdfsExpected , mysqlToHdfsScript ) ;
Assert . assertEquals ( mysqlToHdfsExpected , mysqlToHdfsScript ) ;
//export hdfs to mysql using update mode
//export hdfs to mysql using update mode
String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," +
String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\","
"\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," +
+ "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\","
"\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
+ "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\","
SqoopParameters hdfsToMysqlParams = JSONUtils . parseObject ( hdfsToMysql , SqoopParameters . class ) ;
+ "\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\","
String hdfsToMysqlScript = generator . generateSqoopJob ( hdfsToMysqlParams , mysqlTaskExecutionContext ) ;
+ "\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert" ;
SqoopParameters hdfsToMysqlParams = JSONUtils . parseObject ( hdfsToMysql , SqoopParameters . class ) ;
String hdfsToMysqlScript = generator . generateSqoopJob ( hdfsToMysqlParams , mysqlTaskExecutionContext ) ;
String hdfsToMysqlScriptExpected =
"sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test "
+ "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' "
+ "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert" ;
Assert . assertEquals ( hdfsToMysqlScriptExpected , hdfsToMysqlScript ) ;
Assert . assertEquals ( hdfsToMysqlScriptExpected , hdfsToMysqlScript ) ;
//export hive to mysql
//export hive to mysql
String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
String hiveToMysql =
SqoopParameters hiveToMysqlParams = JSONUtils . parseObject ( hiveToMysql , SqoopParameters . class ) ;
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\","
String hiveToMysqlScript = generator . generateSqoopJob ( hiveToMysqlParams , mysqlTaskExecutionContext ) ;
+ "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\","
String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'" ;
+ "\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\","
+ "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\","
+ "\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+ "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters hiveToMysqlParams = JSONUtils . parseObject ( hiveToMysql , SqoopParameters . class ) ;
String hiveToMysqlScript = generator . generateSqoopJob ( hiveToMysqlParams , mysqlTaskExecutionContext ) ;
String hiveToMysqlExpected =
"sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date "
+ "--hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password \"123456\" --table person_3 "
+ "--fields-terminated-by '@' --lines-terminated-by '\\n'" ;
Assert . assertEquals ( hiveToMysqlExpected , hiveToMysqlScript ) ;
Assert . assertEquals ( hiveToMysqlExpected , hiveToMysqlScript ) ;
//import mysql to hive
//import mysql to hive
String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ;
String mysqlToHive =
SqoopParameters mysqlToHiveParams = JSONUtils . parseObject ( mysqlToHive , SqoopParameters . class ) ;
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
String mysqlToHiveScript = generator . generateSqoopJob ( mysqlToHiveParams , mysqlTaskExecutionContext ) ;
+ "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16" ;
+ "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
+ "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
+ "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
+ "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ;
SqoopParameters mysqlToHiveParams = JSONUtils . parseObject ( mysqlToHive , SqoopParameters . class ) ;
String mysqlToHiveScript = generator . generateSqoopJob ( mysqlToHiveParams , mysqlTaskExecutionContext ) ;
String mysqlToHiveExpected =
"sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 "
+ "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16" ;
Assert . assertEquals ( mysqlToHiveExpected , mysqlToHiveScript ) ;
Assert . assertEquals ( mysqlToHiveExpected , mysqlToHiveScript ) ;
//sqoop CUSTOM job
//sqoop CUSTOM job
@ -124,16 +157,18 @@ public class SqoopTaskTest {
}
}
/ * *
/ * *
* get taskExecutionContext include mysql
* get taskExecutionContext include mysql
*
* @return TaskExecutionContext
* @return TaskExecutionContext
* /
* /
private TaskExecutionContext getMysqlTaskExecutionContext ( ) {
private TaskExecutionContext getMysqlTaskExecutionContext ( ) {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext ( ) ;
TaskExecutionContext taskExecutionContext = new TaskExecutionContext ( ) ;
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext ( ) ;
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext ( ) ;
String mysqlSourceConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ;
String mysqlSourceConnectionParams =
String mysqlTargetConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ;
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ;
String mysqlTargetConnectionParams =
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ;
sqoopTaskExecutionContext . setDataSourceId ( 2 ) ;
sqoopTaskExecutionContext . setDataSourceId ( 2 ) ;
sqoopTaskExecutionContext . setDataTargetId ( 2 ) ;
sqoopTaskExecutionContext . setDataTargetId ( 2 ) ;
sqoopTaskExecutionContext . setSourcetype ( 0 ) ;
sqoopTaskExecutionContext . setSourcetype ( 0 ) ;
@ -153,7 +188,7 @@ public class SqoopTaskTest {
* Method : init
* Method : init
* /
* /
@Test
@Test
public void testInit ( ) {
public void testInit ( ) {
try {
try {
sqoopTask . init ( ) ;
sqoopTask . init ( ) ;
} catch ( Exception e ) {
} catch ( Exception e ) {