From 98465b1281f48c7e80271ef9db8614a7e556a723 Mon Sep 17 00:00:00 2001 From: JinYong Li <42576980+JinyLeeChina@users.noreply.github.com> Date: Mon, 8 Nov 2021 11:57:38 +0800 Subject: [PATCH] [Upgrade][Install] fix upgrade 2.0 bug (#6734) * add convert dependent/conditions * fix upgrade 2.0 bug * fix upgrade 2.0 bug --- .../dao/upgrade/JsonSplitDao.java | 36 +++++++++---------- .../dao/upgrade/ProcessDefinitionDao.java | 2 +- .../dao/upgrade/ScheduleDao.java | 2 +- .../dao/upgrade/UpgradeDao.java | 31 +++++++++------- .../mysql/dolphinscheduler_ddl_post.sql | 3 +- 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java index 790d33a8f2..46c7610fde 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java @@ -53,7 +53,7 @@ public class JsonSplitDao { processUpdate.setInt(2, processDefinitionLog.getTimeout()); processUpdate.setInt(3, processDefinitionLog.getTenantId()); processUpdate.setString(4, processDefinitionLog.getLocations()); - processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime()); + processUpdate.setDate(5, new Date(processDefinitionLog.getUpdateTime().getTime())); processUpdate.setInt(6, processDefinitionLog.getId()); processUpdate.addBatch(); @@ -70,9 +70,9 @@ public class JsonSplitDao { insertLog.setInt(11, processDefinitionLog.getTimeout()); insertLog.setInt(12, processDefinitionLog.getTenantId()); insertLog.setInt(13, processDefinitionLog.getOperator()); - insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime()); - insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime()); - insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime()); + insertLog.setDate(14, new Date(processDefinitionLog.getOperateTime().getTime())); + insertLog.setDate(15, new Date(processDefinitionLog.getCreateTime().getTime())); + insertLog.setDate(16, new Date(processDefinitionLog.getUpdateTime().getTime())); insertLog.addBatch(); i++; @@ -121,8 +121,8 @@ public class JsonSplitDao { insert.setInt(7, processTaskRelationLog.getPostTaskVersion()); insert.setInt(8, processTaskRelationLog.getConditionType().getCode()); insert.setString(9, processTaskRelationLog.getConditionParams()); - insert.setDate(10, (Date) processTaskRelationLog.getCreateTime()); - insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime()); + insert.setDate(10, new Date(processTaskRelationLog.getCreateTime().getTime())); + insert.setDate(11, new Date(processTaskRelationLog.getUpdateTime().getTime())); insert.addBatch(); insertLog.setLong(1, processTaskRelationLog.getProjectCode()); @@ -135,9 +135,9 @@ public class JsonSplitDao { insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode()); insertLog.setString(9, processTaskRelationLog.getConditionParams()); insertLog.setInt(10, processTaskRelationLog.getOperator()); - insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime()); - insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime()); - insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime()); + insertLog.setDate(11, new Date(processTaskRelationLog.getOperateTime().getTime())); + insertLog.setDate(12, new Date(processTaskRelationLog.getCreateTime().getTime())); + insertLog.setDate(13, new Date(processTaskRelationLog.getUpdateTime().getTime())); insertLog.addBatch(); i++; @@ -169,10 +169,10 @@ public class JsonSplitDao { public void executeJsonSplitTaskDefinition(Connection conn, List taskDefinitionLogs) { String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority," + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids," - + "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + "create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority," + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator," - + "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + "operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; try { PreparedStatement insert = conn.prepareStatement(insertSql); PreparedStatement insertLog = conn.prepareStatement(insertLogSql); @@ -193,12 +193,12 @@ public class JsonSplitDao { insert.setInt(13, taskDefinitionLog.getFailRetryTimes()); insert.setInt(14, taskDefinitionLog.getFailRetryInterval()); insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode()); - insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); + insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); insert.setInt(17, taskDefinitionLog.getTimeout()); insert.setInt(18, taskDefinitionLog.getDelayTime()); insert.setString(19, taskDefinitionLog.getResourceIds()); - insert.setDate(20, (Date) taskDefinitionLog.getCreateTime()); - insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime()); + insert.setDate(20, new Date(taskDefinitionLog.getCreateTime().getTime())); + insert.setDate(21, new Date(taskDefinitionLog.getUpdateTime().getTime())); insert.addBatch(); insertLog.setLong(1, taskDefinitionLog.getCode()); @@ -216,14 +216,14 @@ public class JsonSplitDao { insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes()); insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval()); insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode()); - insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); + insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); insertLog.setInt(17, taskDefinitionLog.getTimeout()); insertLog.setInt(18, taskDefinitionLog.getDelayTime()); insertLog.setString(19, taskDefinitionLog.getResourceIds()); insertLog.setInt(20, taskDefinitionLog.getOperator()); - insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime()); - insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime()); - insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime()); + insertLog.setDate(21, new Date(taskDefinitionLog.getOperateTime().getTime())); + insertLog.setDate(22, new Date(taskDefinitionLog.getCreateTime().getTime())); + insertLog.setDate(23, new Date(taskDefinitionLog.getUpdateTime().getTime())); insertLog.addBatch(); i++; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java index 9cb03b8917..c41359aa35 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java @@ -148,7 +148,7 @@ public class ProcessDefinitionDao { pstmt.setLong(1, processDefinition.getCode()); long projectCode = processDefinition.getProjectCode(); if (String.valueOf(projectCode).length() <= 10) { - Integer projectId = Integer.getInteger(String.valueOf(projectCode)); + Integer projectId = Integer.parseInt(String.valueOf(projectCode)); if (projectIdCodeMap.containsKey(projectId)) { projectCode = projectIdCodeMap.get(projectId); processDefinition.setProjectCode(projectCode); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java index 097020089d..80a49fd42b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java @@ -77,7 +77,7 @@ public class ScheduleDao { try (PreparedStatement pstmt = conn.prepareStatement(sql)) { long projectDefinitionCode = entry.getValue(); if (String.valueOf(projectDefinitionCode).length() <= 10) { - Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode)); + Integer projectDefinitionId = Integer.parseInt(String.valueOf(projectDefinitionCode)); if (processIdCodeMap.containsKey(projectDefinitionId)) { projectDefinitionCode = processIdCodeMap.get(projectDefinitionId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index 235f223f9b..9109be59ff 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -63,10 +63,10 @@ import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; public abstract class UpgradeDao extends AbstractBaseDao { @@ -649,14 +649,17 @@ public abstract class UpgradeDao extends AbstractBaseDao { ObjectNode param = (ObjectNode) task.get("params"); TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); if (param != null) { - List resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); - if (!resourceList.isEmpty()) { + JsonNode resourceJsonNode = param.get("resourceList"); + if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) { + List resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); List resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList()); - taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ",")); + taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA)); + } else { + taskDefinitionLog.setResourceIds(StringUtils.EMPTY); } param.put("conditionResult", task.get("conditionResult")); param.put("dependence", task.get("dependence")); - taskDefinitionLog.setTaskParams(param.asText()); + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param)); } TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); if (timeout != null) { @@ -674,6 +677,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { taskDefinitionLog.setName(name); taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText()); long taskCode = SnowFlakeUtils.getInstance().nextId(); + // System.out.println(taskCode); taskDefinitionLog.setCode(taskCode); taskDefinitionLog.setVersion(Constants.VERSION_FIRST); taskDefinitionLog.setProjectCode(processDefinition.getProjectCode()); @@ -686,7 +690,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { taskDefinitionLog.setUpdateTime(now); taskDefinitionLogList.add(taskDefinitionLog); taskIdCodeMap.put(task.get("id").asText(), taskCode); - List preTasks = JSONUtils.toList(task.get("preTasks").asText(), String.class); + List preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class); taskNamePreMap.put(name, preTasks); taskNameCodeMap.put(name, taskCode); } @@ -745,13 +749,16 @@ public abstract class UpgradeDao extends AbstractBaseDao { if (StringUtils.isBlank(locations)) { return locations; } - Map locationsMap = JSONUtils.toMap(locations); - JsonNodeFactory factory = new JsonNodeFactory(false); - ArrayNode jsonNodes = factory.arrayNode(); - for (Map.Entry entry : locationsMap.entrySet()) { - ObjectNode nodes = factory.objectNode(); + Map locationsMap = JSONUtils.parseObject(locations, new TypeReference>() { + }); + if (locationsMap == null) { + return locations; + } + ArrayNode jsonNodes = JSONUtils.createArrayNode(); + for (Map.Entry entry : locationsMap.entrySet()) { + ObjectNode nodes = JSONUtils.createObjectNode(); nodes.put("taskCode", taskIdCodeMap.get(entry.getKey())); - ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue()); + ObjectNode oldNodes = entry.getValue(); nodes.put("x", oldNodes.get("x").asInt()); nodes.put("y", oldNodes.get("y").asInt()); jsonNodes.add(nodes); diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql index 5f3f65fba4..dfde96209e 100644 --- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql +++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql @@ -15,8 +15,7 @@ * limitations under the License. */ -alter table t_ds_process_definition drop primary key; -ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`); +alter table t_ds_process_definition drop primary key, ADD PRIMARY KEY (`id`,`code`); ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`; ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`; alter table t_ds_process_definition drop process_definition_json;