Browse Source

[Fix-3240][worker] Cannot find a (Map) Key deserializer for type (#3245)

* fix the bug #2923 (Hive JDBC connection parameter ignored) , the hive jdbc connection url ;

* fix the bug index out of bound and add some test case

* add the Licensed

* fix the checkstyle and import StringBuilder

* put HiveDataSourceTest.java to root pom maven-surefire-plugin to avoiding the sonar 0.0% Coverage.

* Update HiveDataSource.java

* fix the bug #3240  [worker] Cannot find a (Map) Key deserializer for type

* [DS-3245][fix]  change toString  use JsonString

- change the method of  UdfFunc.toString use JsonString

This closes #3245

* [DS-3245][fix]  change toString  use JsonString

- fix the junit test for full

This closes #3245

* [DS-3245][fix]  change toString  use JsonString

- fix the sonarcloud merge error, Swap these 2 arguments so they are in the correct order: expected value, actual value.

This closes #3245

* [DS-3245][fix]  move the Class UdfFuncDeserializer to UdfFunc.java

move the class UdfFuncDeserializer.java to UdfFunc.java

This closes #3245

* [DS-3245][fix]  move the Class UdfFuncDeserializer to UdfFunc.java

add blank line

This closes #3245

* [DS-3245][fix]  add the unit test of UdfFunc

add the unit test of UdfFunc.java

This closes #3245

* [DS-3245][fix]  add the unit test of UdfFunc

remove the UdfFuncTest.java from maven-surefire-plugin list

This closes #3245

* [DS-3245][fix]  add the unit test of UdfFunc

add the UdfFuncTest to the root pom maven-surefire-plugin

This closes #3245

Co-authored-by: dailidong <dailidong66@gmail.com>
pull/3/MERGE
LEI SHENG 4 years ago committed by GitHub
parent
commit
6f2667bf1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java
  2. 64
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/UdfFuncTest.java
  3. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
  4. 189
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java
  5. 2
      pom.xml

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/UdfFunc.java

@ -14,14 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.KeyDeserializer;
import java.io.IOException;
import java.util.Date;
/**
@ -215,19 +220,17 @@ public class UdfFunc {
@Override
public String toString() {
return "UdfFunc{" +
"id=" + id +
", userId=" + userId +
", funcName='" + funcName + '\'' +
", className='" + className + '\'' +
", argTypes='" + argTypes + '\'' +
", database='" + database + '\'' +
", description='" + description + '\'' +
", resourceId=" + resourceId +
", resourceName='" + resourceName + '\'' +
", type=" + type +
", createTime=" + createTime +
", updateTime=" + updateTime +
'}';
return JSONUtils.toJsonString(this);
}
public static class UdfFuncDeserializer extends KeyDeserializer {
@Override
public Object deserializeKey(String key, DeserializationContext ctxt) throws IOException {
if (StringUtils.isBlank(key)) {
return null;
}
return JSONUtils.parseObject(key);
}
}
}

64
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/entity/UdfFuncTest.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.dao.entity.UdfFunc.UdfFuncDeserializer;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
public class UdfFuncTest {
/**
* test to String
*/
@Test
public void testUdfFuncToString() {
UdfFunc udfFunc = new UdfFunc();
udfFunc.setResourceName("dolphin_resource_update");
udfFunc.setResourceId(2);
udfFunc.setClassName("org.apache.dolphinscheduler.test.mrUpdate");
Assert.assertEquals("{\"id\":0,\"userId\":0,\"funcName\":null,\"className\":\"org.apache.dolphinscheduler.test.mrUpdate\",\"argTypes\":null,\"database\":null,"
+ "\"description\":null,\"resourceId\":2,\"resourceName\":\"dolphin_resource_update\",\"type\":null,\"createTime\":null,\"updateTime\":null}"
, udfFunc.toString());
}
/**
* test UdfFuncDeserializer.deserializeKey
*
* @throws IOException
*/
@Test
public void testUdfFuncDeserializer() throws IOException {
// UdfFuncDeserializer.deserializeKey key is null
UdfFuncDeserializer udfFuncDeserializer = new UdfFuncDeserializer();
Assert.assertNull(udfFuncDeserializer.deserializeKey(null, null));
//
UdfFunc udfFunc = new UdfFunc();
udfFunc.setResourceName("dolphin_resource_update");
udfFunc.setResourceId(2);
udfFunc.setClassName("org.apache.dolphinscheduler.test.mrUpdate");
Assert.assertNotNull(udfFuncDeserializer.deserializeKey(udfFunc.toString(), null));
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java

@ -18,7 +18,8 @@
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.UdfFunc.UdfFuncDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import java.io.Serializable;
import java.util.Map;
@ -40,6 +41,7 @@ public class SQLTaskExecutionContext implements Serializable {
/**
* udf function tenant code map
*/
@JsonDeserialize(keyUsing = UdfFuncDeserializer.class)
private Map<UdfFunc,String> udfFuncTenantCodeMap;

189
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContextTest.java

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
public class SQLTaskExecutionContextTest {
/**
* test parse josn String to TaskExecutionContext
*/
@Test
public void testTaskExecutionContext() {
String contextJson = "{\n"
+ " \"taskInstanceId\":32,\n"
+ " \"taskName\":\"test-hive-func\",\n"
+ " \"startTime\":\"2020-07-19 16:45:46\",\n"
+ " \"taskType\":\"SQL\",\n"
+ " \"host\":null,\n"
+ " \"executePath\":\"/tmp/dolphinscheduler/exec/process/1/5/14/32\",\n"
+ " \"logPath\":null,\n"
+ " \"taskJson\":\"{\\\"id\\\":\\\"tasks-70999\\\",\\\"name\\\":\\\"test-hive-func\\\""
+ ",\\\"desc\\\":null,\\\"type\\\":\\\"SQL\\\",\\\"runFlag\\\":\\\"NORMAL\\\","
+ "\\\"loc\\\":null,\\\"maxRetryTimes\\\":0,\\\"retryInterval\\\":1,"
+ "\\\"params\\\":{\\\"type\\\":\\\"HIVE\\\",\\\"datasource\\\":2,"
+ "\\\"sql\\\":\\\"select mid_id, user_id,"
+ " version_code, version_name, lang, source, os, area, model, "
+ "brand, sdk_version, gmail, height_width, app_time, network,"
+ " lng, lat, dt,\\\\n Lower(model)\\\\nfrom dws_uv_detail_day limit 5;"
+ "\\\",\\\"udfs\\\":\\\"1\\\",\\\"sqlType\\\":\\\"0\\\",\\\"title\\\":\\\""
+ "test-hive-user-func\\\",\\\"receivers\\\":\\\"534634799@qq.com\\\","
+ "\\\"receiversCc\\\":\\\"\\\",\\\"showType\\\":\\\"TABLE\\\",\\\"localParams\\\":[],"
+ "\\\"connParams\\\":\\\"\\\",\\\"preStatements\\\":[],\\\"postStatements\\\":[]},"
+ "\\\"preTasks\\\":[],\\\"extras\\\":null,\\\"depList\\\":[],\\\"dependence\\\":{},"
+ "\\\"conditionResult\\\":{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]},"
+ "\\\"taskInstancePriority\\\":\\\"MEDIUM\\\",\\\"workerGroup\\\":\\\"default\\\","
+ "\\\"workerGroupId\\\":null,\\\"timeout\\\":{\\\"strategy\\\":\\\"\\\",\\\"interval\\\":null,"
+ "\\\"enable\\\":false},\\\"conditionsTask\\\":false,\\\"forbidden\\\":false,"
+ "\\\"taskTimeoutParameter\\\":{\\\"enable\\\":false,\\\"strategy\\\":null,"
+ "\\\"interval\\\":0}}\",\n"
+ " \"processId\":0,\n"
+ " \"appIds\":null,\n"
+ " \"processInstanceId\":14,\n"
+ " \"scheduleTime\":null,\n"
+ " \"globalParams\":null,\n"
+ " \"executorId\":2,\n"
+ " \"cmdTypeIfComplement\":2,\n"
+ " \"tenantCode\":\"sl\",\n"
+ " \"queue\":\"sl\",\n"
+ " \"processDefineId\":5,\n"
+ " \"projectId\":1,\n"
+ " \"taskParams\":null,\n"
+ " \"envFile\":null,\n"
+ " \"definedParams\":null,\n"
+ " \"taskAppId\":null,\n"
+ " \"taskTimeoutStrategy\":0,\n"
+ " \"taskTimeout\":0,\n"
+ " \"workerGroup\":\"default\",\n"
+ " \"resources\":{\n"
+ " },\n"
+ " \"sqlTaskExecutionContext\":{\n"
+ " \"warningGroupId\":0,\n"
+ " \"connectionParams\":\"{\\\"type\\\":null,\\\"address\\\":"
+ "\\\"jdbc:hive2://localhost:10000\\\",\\\"database\\\":\\\"gmall\\\","
+ "\\\"jdbcUrl\\\":\\\"jdbc:hive2://localhost:10000/gmall\\\","
+ "\\\"user\\\":\\\"sl-test\\\",\\\"password\\\":\\\"123456sl\\\"}\",\n"
+ " \"udfFuncTenantCodeMap\": null"
+ " },\n"
+ " \"dataxTaskExecutionContext\":{\n"
+ " \"dataSourceId\":0,\n"
+ " \"sourcetype\":0,\n"
+ " \"sourceConnectionParams\":null,\n"
+ " \"dataTargetId\":0,\n"
+ " \"targetType\":0,\n"
+ " \"targetConnectionParams\":null\n"
+ " },\n"
+ " \"dependenceTaskExecutionContext\":null,\n"
+ " \"sqoopTaskExecutionContext\":{\n"
+ " \"dataSourceId\":0,\n"
+ " \"sourcetype\":0,\n"
+ " \"sourceConnectionParams\":null,\n"
+ " \"dataTargetId\":0,\n"
+ " \"targetType\":0,\n"
+ " \"targetConnectionParams\":null\n"
+ " },\n"
+ " \"procedureTaskExecutionContext\":{\n"
+ " \"connectionParams\":null\n"
+ " }\n"
+ "}\n";
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
assertNotNull(taskExecutionContext);
}
@Test
public void testSqlTaskExecutionContext() {
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
sqlTaskExecutionContext.setWarningGroupId(0);
Map<UdfFunc, String> udfmap = new HashMap<>();
UdfFunc udfFunc = new UdfFunc();
udfFunc.setArgTypes("1");
udfFunc.setId(1);
udfFunc.setResourceName("name1");
udfmap.put(udfFunc, "map1");
UdfFunc udfFunc2 = new UdfFunc();
udfFunc2.setArgTypes("2");
udfFunc2.setId(2);
udfFunc2.setResourceName("name2");
udfmap.put(udfFunc2, "map2");
sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfmap);
String contextJson = JSONUtils.toJsonString(sqlTaskExecutionContext);
SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class);
assertNotNull(parseSqlTask);
assertEquals(sqlTaskExecutionContext.getWarningGroupId(), parseSqlTask.getWarningGroupId());
assertEquals(sqlTaskExecutionContext.getUdfFuncTenantCodeMap().size(), parseSqlTask.getUdfFuncTenantCodeMap().size());
}
/**
* test the SQLTaskExecutionContext
*/
@Test
public void testSqlTaskExecutionContextParse() {
// SQLTaskExecutionContext.udfFuncTenantCodeMap is null
String contextJson = "{\n"
+ " \"warningGroupId\":0,\n"
+ " \"connectionParams\":null,\n"
+ " \"udfFuncTenantCodeMap\":null"
+ "}\n}";
SQLTaskExecutionContext parseSqlTask = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class);
assertNotNull(parseSqlTask);
assertEquals(0,parseSqlTask.getWarningGroupId());
assertNull(parseSqlTask.getUdfFuncTenantCodeMap());
// SQLTaskExecutionContext.udfFuncTenantCodeMap is not null
contextJson = "{\"warningGroupId\":0,"
+ "\"connectionParams\":null,"
+ "\"udfFuncTenantCodeMap\":{\""
+ "{\\\"id\\\":2,\\\"userId\\\":0,"
+ "\\\"funcName\\\":null,\\\"className\\\":null,\\\"argTypes\\\":\\\"2\\\",\\\"database\\\":null,"
+ "\\\"description\\\":null,\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name2\\\",\\\"type\\\":null,"
+ "\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map2\","
+ "\"{\\\"id\\\":1,\\\"userId\\\":0,\\\"funcName\\\":null,"
+ "\\\"className\\\":null,\\\"argTypes\\\":\\\"1\\\","
+ "\\\"database\\\":null,\\\"description\\\":null,"
+ "\\\"resourceId\\\":0,\\\"resourceName\\\":\\\"name1\\\","
+ "\\\"type\\\":null,\\\"createTime\\\":null,\\\"updateTime\\\":null}\":\"map1\"}}\n";
SQLTaskExecutionContext parseSqlTask2 = JSONUtils.parseObject(contextJson, SQLTaskExecutionContext.class);
assertNotNull(parseSqlTask2);
assertEquals(0,parseSqlTask2.getWarningGroupId());
assertEquals(2,parseSqlTask2.getUdfFuncTenantCodeMap().size());
}
}

2
pom.xml

@ -785,12 +785,14 @@
<include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/mapper/ConnectionFactoryTest.java</include>
<include>**/dao/mapper/DataSourceMapperTest.java</include>
<include>**/dao/entity/UdfFuncTest.java</include>
<include>**/remote/JsonSerializerTest.java</include>
<include>**/remote/RemoveTaskLogResponseCommandTest.java</include>
<include>**/remote/RemoveTaskLogRequestCommandTest.java</include>
<!--<include>**/remote/NettyRemotingClientTest.java</include>-->
<include>**/remote/ResponseFutureTest.java</include>
<!--<include>**/server/log/LoggerServerTest.java</include>-->
<include>**/server/entity/SQLTaskExecutionContextTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include>
<include>**/server/log/SensitiveDataConverterTest.java</include>
<!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->

Loading…
Cancel
Save