diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java index 21a874f9b9..caedf854e7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java @@ -522,7 +522,6 @@ public class DataSourceService extends BaseService{ separator = ";"; } - Map parameterMap = new LinkedHashMap(6); parameterMap.put(TYPE, connectType); parameterMap.put(Constants.ADDRESS, address); parameterMap.put(Constants.DATABASE, database); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index f9458a82bf..29c625337d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -16,13 +16,11 @@ */ package org.apache.dolphinscheduler.dao.upgrade; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.DbType; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.apache.dolphinscheduler.common.utils.SchemaUtils; -import org.apache.dolphinscheduler.common.utils.ScriptRunner; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; import org.slf4j.Logger; @@ -282,12 +280,16 @@ public abstract class UpgradeDao extends AbstractBaseDao { Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); for (Map.Entry entry : processDefinitionJsonMap.entrySet()){ - JSONObject jsonObject = JSONObject.parseObject(entry.getValue()); - JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks")); + ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); + ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); for (int i = 0 ;i < tasks.size() ; i++){ - JSONObject task = tasks.getJSONObject(i); - Integer workerGroupId = task.getInteger("workerGroupId"); + ObjectNode task = (ObjectNode) tasks.path(i); + ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId"); + Integer workerGroupId = -1; + if(workerGroupNode != null && workerGroupNode.canConvertToInt()){ + workerGroupId = workerGroupNode.asInt(-1); + } if (workerGroupId == -1) { task.put("workerGroup", "default"); }else { @@ -295,11 +297,11 @@ public abstract class UpgradeDao extends AbstractBaseDao { } } - jsonObject.remove(jsonObject.getString("tasks")); + jsonObject.remove("task"); jsonObject.put("tasks",tasks); - replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString()); + replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toString()); } if (replaceProcessDefinitionMap.size() > 0){ processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap); diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/OracleDataSourceTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/OracleDataSourceTest.java index 5d3d63c168..8392f72b12 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/OracleDataSourceTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/OracleDataSourceTest.java @@ -25,7 +25,7 @@ public class OracleDataSourceTest { @Test public void testGetOracleJdbcUrl() { OracleDataSource oracleDataSource = new OracleDataSource(); - oracleDataSource.setType(DbConnectType.ORACLE_SERVICE_NAME); + oracleDataSource.setConnectType(DbConnectType.ORACLE_SERVICE_NAME); oracleDataSource.setAddress("jdbc:oracle:thin:@//127.0.0.1:1521"); oracleDataSource.setDatabase("test"); oracleDataSource.setPassword("123456"); @@ -43,7 +43,7 @@ public class OracleDataSourceTest { oracleDataSource2.setDatabase("orcl"); oracleDataSource2.setPassword("123456"); oracleDataSource2.setUser("test"); - oracleDataSource2.setType(DbConnectType.ORACLE_SID); + oracleDataSource2.setConnectType(DbConnectType.ORACLE_SID); Assert.assertEquals("jdbc:oracle:thin:@127.0.0.1:1521:orcl", oracleDataSource2.getJdbcUrl()); //set fake principal oracleDataSource2.setPrincipal("fake principal"); @@ -58,7 +58,7 @@ public class OracleDataSourceTest { OracleDataSource oracleDataSource = new OracleDataSource(); oracleDataSource.setAddress("jdbc:oracle:thin:@//127.0.0.1:1521"); oracleDataSource.setDatabase("test"); - oracleDataSource.setType(DbConnectType.ORACLE_SERVICE_NAME); + oracleDataSource.setConnectType(DbConnectType.ORACLE_SERVICE_NAME); StringBuilder jdbcUrl = new StringBuilder(oracleDataSource.getAddress()); oracleDataSource.appendDatabase(jdbcUrl); Assert.assertEquals("jdbc:oracle:thin:@//127.0.0.1:1521/test", jdbcUrl.toString()); @@ -66,7 +66,7 @@ public class OracleDataSourceTest { OracleDataSource oracleDataSource2 = new OracleDataSource(); oracleDataSource2.setAddress("jdbc:oracle:thin:@127.0.0.1:1521"); oracleDataSource2.setDatabase("orcl"); - oracleDataSource2.setType(DbConnectType.ORACLE_SID); + oracleDataSource2.setConnectType(DbConnectType.ORACLE_SID); StringBuilder jdbcUrl2 = new StringBuilder(oracleDataSource2.getAddress()); oracleDataSource2.appendDatabase(jdbcUrl2); Assert.assertEquals("jdbc:oracle:thin:@127.0.0.1:1521:orcl", jdbcUrl2.toString()); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java index 4cf66265cf..a91cb2add0 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -56,7 +56,7 @@ public class RemoveTaskLogRequestCommand implements Serializable { public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.REMOVE_TAK_LOG_REQUEST); - byte[] body = FastJsonSerializer.serialize(this); + byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java index a72f84ab41..39e8672127 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.remote.command.log; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; +import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -28,7 +28,7 @@ import java.io.Serializable; */ public class RemoveTaskLogResponseCommand implements Serializable { - /** + /*TaskPriorityQueueConsumer.* * log path */ private Boolean status; @@ -56,7 +56,7 @@ public class RemoveTaskLogResponseCommand implements Serializable { public Command convert2Command(long opaque){ Command command = new Command(opaque); command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE); - byte[] body = FastJsonSerializer.serialize(this); + byte[] body = JsonSerializer.serialize(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 9ce1f2226f..68db1f2061 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.consumer; -import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.SqoopJobType; @@ -33,7 +32,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 09a6973438..6d701a00a6 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -141,7 +141,7 @@ public class DataxTask extends AbstractTask { CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime()); - // run datax process + // run datax procesDataSourceService.s String jsonFilePath = buildDataxJsonFile(paramsMap); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); @@ -176,7 +176,7 @@ public class DataxTask extends AbstractTask { * @return datax json file name * @throws Exception if error throws Exception */ - private String buildDataxJsonFile() + private String buildDataxJsonFile(Map paramsMap) throws Exception { // generate json String fileName = String.format("%s/%s_job.json", diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index aa5d8ebcd0..92d38d470a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.service.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.log.*; @@ -162,7 +163,7 @@ public class LogClientService { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if(response != null){ - RemoveTaskLogResponseCommand taskLogResponse = FastJsonSerializer.deserialize( + RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize( response.getBody(), RemoveTaskLogResponseCommand.class); return taskLogResponse.getStatus(); }