@ -16,11 +16,10 @@
* /
package org.apache.dolphinscheduler.server.worker.task.sqoop ;
import org.apache.dolphinscheduler.common.enums.DbType ;
import com.alibaba.fastjson.JSON ;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters ;
import org.apache.dolphinscheduler.dao.entity.DataSource ;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext ;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.server.worker.task.TaskProps ;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
@ -33,7 +32,6 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.context.ApplicationContext ;
import org.apache.dolphinscheduler.common.utils.* ;
import java.util.Date ;
@ -52,64 +50,98 @@ public class SqoopTaskTest {
@Before
public void before ( ) throws Exception {
processService = Mockito . mock ( ProcessService . class ) ;
Mockito . when ( processService . findDataSourceById ( 2 ) ) . thenReturn ( getDataSource ( ) ) ;
applicationContext = Mockito . mock ( ApplicationContext . class ) ;
SpringApplicationContext springApplicationContext = new SpringApplicationContext ( ) ;
springApplicationContext . setApplicationContext ( applicationContext ) ;
Mockito . when ( applicationContext . getBean ( ProcessService . class ) ) . thenReturn ( processService ) ;
TaskProps props = new TaskProps ( ) ;
props . setTaskAppId ( String . valueOf ( System . currentTimeMillis ( ) ) ) ;
props . setTenantCode ( "1" ) ;
props . setEnvFile ( ".dolphinscheduler_env.sh" ) ;
props . setTaskStartTime ( new Date ( ) ) ;
props . setTaskTimeout ( 0 ) ;
props . setTaskParams ( "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ) ;
sqoopTask = new SqoopTask ( new TaskExecutionContext ( ) , logger ) ;
TaskExecutionContext taskExecutionContext = new TaskExecutionContext ( ) ;
taskExecutionContext . setTaskAppId ( String . valueOf ( System . currentTimeMillis ( ) ) ) ;
taskExecutionContext . setTenantCode ( "1" ) ;
taskExecutionContext . setEnvFile ( ".dolphinscheduler_env.sh" ) ;
taskExecutionContext . setStartTime ( new Date ( ) ) ;
taskExecutionContext . setTaskTimeout ( 0 ) ;
taskExecutionContext . setTaskParams ( "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," +
"\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," +
"\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," +
"\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," +
"\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" +
",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," +
"\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," +
"\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ) ;
sqoopTask = new SqoopTask ( taskExecutionContext , logger ) ;
//test sqoop tash init method
sqoopTask . init ( ) ;
}
/ * *
* test SqoopJobGenerator
* /
@Test
public void testGenerator ( ) {
String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters sqoopParameters1 = JSONUtils . parseObject ( data1 , SqoopParameters . class ) ;
TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext ( ) ;
//sqoop TEMPLATE job
//import mysql to HDFS with hadoo
String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," +
"\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters mysqlToHdfsParams = JSON . parseObject ( mysqlToHdfs , SqoopParameters . class ) ;
SqoopJobGenerator generator = new SqoopJobGenerator ( ) ;
String script = generator . generateSqoopJob ( sqoopParameters1 , new TaskExecutionContext ( ) ) ;
String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'" ;
Assert . assertEquals ( expected , script ) ;
String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters sqoopParameters2 = JSONUtils . parseObject ( data2 , SqoopParameters . class ) ;
String script2 = generator . generateSqoopJob ( sqoopParameters2 , new TaskExecutionContext ( ) ) ;
String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert" ;
Assert . assertEquals ( expected2 , script2 ) ;
String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters sqoopParameters3 = JSONUtils . parseObject ( data3 , SqoopParameters . class ) ;
String script3 = generator . generateSqoopJob ( sqoopParameters3 , new TaskExecutionContext ( ) ) ;
String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'" ;
Assert . assertEquals ( expected3 , script3 ) ;
String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ;
SqoopParameters sqoopParameters4 = JSONUtils . parseObject ( data4 , SqoopParameters . class ) ;
String script4 = generator . generateSqoopJob ( sqoopParameters4 , new TaskExecutionContext ( ) ) ;
String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16" ;
Assert . assertEquals ( expected4 , script4 ) ;
String mysqlToHdfsScript = generator . generateSqoopJob ( mysqlToHdfsParams , mysqlTaskExecutionContext ) ;
String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'" ;
Assert . assertEquals ( mysqlToHdfsExpected , mysqlToHdfsScript ) ;
//export hdfs to mysql using update mode
String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," +
"\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," +
"\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters hdfsToMysqlParams = JSON . parseObject ( hdfsToMysql , SqoopParameters . class ) ;
String hdfsToMysqlScript = generator . generateSqoopJob ( hdfsToMysqlParams , mysqlTaskExecutionContext ) ;
String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert" ;
Assert . assertEquals ( hdfsToMysqlScriptExpected , hdfsToMysqlScript ) ;
//export hive to mysql
String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}" ;
SqoopParameters hiveToMysqlParams = JSON . parseObject ( hiveToMysql , SqoopParameters . class ) ;
String hiveToMysqlScript = generator . generateSqoopJob ( hiveToMysqlParams , mysqlTaskExecutionContext ) ;
String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'" ;
Assert . assertEquals ( hiveToMysqlExpected , hiveToMysqlScript ) ;
//import mysql to hive
String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}" ;
SqoopParameters mysqlToHiveParams = JSON . parseObject ( mysqlToHive , SqoopParameters . class ) ;
String mysqlToHiveScript = generator . generateSqoopJob ( mysqlToHiveParams , mysqlTaskExecutionContext ) ;
String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16" ;
Assert . assertEquals ( mysqlToHiveExpected , mysqlToHiveScript ) ;
//sqoop CUSTOM job
String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}" ;
SqoopParameters sqoopCustomParams = JSON . parseObject ( sqoopCustomString , SqoopParameters . class ) ;
String sqoopCustomScript = generator . generateSqoopJob ( sqoopCustomParams , new TaskExecutionContext ( ) ) ;
String sqoopCustomExpected = "sqoop import" ;
Assert . assertEquals ( sqoopCustomExpected , sqoopCustomScript ) ;
}
private DataSource getDataSource ( ) {
DataSource dataSource = new DataSource ( ) ;
dataSource . setType ( DbType . MYSQL ) ;
dataSource . setConnectionParams (
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ) ;
dataSource . setUserId ( 1 ) ;
return dataSource ;
/ * *
* get taskExecutionContext include mysql
* @return TaskExecutionContext
* /
private TaskExecutionContext getMysqlTaskExecutionContext ( ) {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext ( ) ;
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext ( ) ;
String mysqlSourceConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ;
String mysqlTargetConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}" ;
sqoopTaskExecutionContext . setDataSourceId ( 2 ) ;
sqoopTaskExecutionContext . setDataTargetId ( 2 ) ;
sqoopTaskExecutionContext . setSourcetype ( 0 ) ;
sqoopTaskExecutionContext . setTargetConnectionParams ( mysqlTargetConnectionParams ) ;
sqoopTaskExecutionContext . setSourceConnectionParams ( mysqlSourceConnectionParams ) ;
sqoopTaskExecutionContext . setTargetType ( 0 ) ;
taskExecutionContext . setSqoopTaskExecutionContext ( sqoopTaskExecutionContext ) ;
return taskExecutionContext ;
}
@Test