Browse Source

fix hadoop params build failed in sqoop task

eights 4 years ago
parent
commit
5a6e1d3bba
  1. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
  2. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
  3. 11
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java

@ -53,13 +53,14 @@ public class CommonGenerator {
//hadoop custom param
List<Property> hadoopCustomParams = sqoopParameters.getHadoopCustomParams();
if (CollectionUtils.isNotEmpty(hadoopCustomParams)) {
StringBuilder hadoopCustomParamStr = new StringBuilder();
for (Property hadoopCustomParam : hadoopCustomParams) {
String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(),
Constants.EQUAL_SIGN, hadoopCustomParam.getValue());
commonSb.append(Constants.SPACE).append(Constants.D)
.append(Constants.SPACE).append(hadoopCustomParamStr);
hadoopCustomParamStr.append(Constants.D)
.append(Constants.SPACE).append(hadoopCustomParam.getProp())
.append(Constants.EQUAL_SIGN).append(hadoopCustomParam.getValue())
.append(Constants.SPACE);
}
commonSb.append(Constants.SPACE).append(hadoopCustomParamStr.substring(0, hadoopCustomParamStr.length() - 1));
}
//sqoop custom params

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java

@ -97,7 +97,10 @@ public class MysqlSourceGenerator implements ISourceGenerator {
if (null != mapColumnHive && !mapColumnHive.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
for (Property item : mapColumnHive) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
if (!item.getProp().isEmpty()) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN)
.append(item.getValue()).append(Constants.COMMA);
}
}
if (StringUtils.isNotEmpty(columnMap.toString())) {
@ -110,14 +113,17 @@ public class MysqlSourceGenerator implements ISourceGenerator {
List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
if (null != mapColumnJava && !mapColumnJava.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
StringBuilder columnJavaMap = new StringBuilder();
for (Property item : mapColumnJava) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
if (!item.getProp().isEmpty()) {
columnJavaMap.append(item.getProp()).append(Constants.EQUAL_SIGN)
.append(item.getValue()).append(Constants.COMMA);
}
}
if (StringUtils.isNotEmpty(columnMap.toString())) {
if (StringUtils.isNotEmpty(columnJavaMap.toString())) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA)
.append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
.append(Constants.SPACE).append(columnJavaMap.substring(0, columnJavaMap.length() - 1));
}
}
}

11
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

@ -134,16 +134,21 @@ public class SqoopTaskTest {
//import mysql to hive
String mysqlToHive =
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
+ "\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"},{\"prop\":\"mapreduce.reduce.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"}],"
+ "\"sqoopAdvancedParams\":[{\"prop\":\"--delete-target-dir\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"},{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
+ "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
+ "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
+ "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
+ "\\\"mapColumnHive\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"}],"
+ "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"}]}\","
+ "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
+ "\\\"hiveOverWrite\\\":true,\\\"hiveTargetDir\\\":\\\"/tmp/sqoop_import_hive\\\",\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
String mysqlToHiveExpected =
"sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 "
"sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --delete-target-dir --direct -m 1 "
+ "--connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-hive create_time=string,update_time=string --map-column-java create_time=java.sql.Date,update_time=java.sql.Date "
+ "--hive-import --hive-database stg --hive-table person_internal_2 "
+ "--create-hive-table --hive-overwrite --delete-target-dir --target-dir /tmp/sqoop_import_hive --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);

Loading…
Cancel
Save