Browse Source

[Upgrade][Install] fix upgrade 2.0 bug (#6734)

* add convert dependent/conditions

* fix upgrade 2.0 bug

* fix upgrade 2.0 bug
2.0.7-release
JinYong Li 3 years ago committed by lenboo
parent
commit
98465b1281
  1. 36
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
  2. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
  3. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
  4. 31
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  5. 3
      sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql

36
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java

@ -53,7 +53,7 @@ public class JsonSplitDao {
processUpdate.setInt(2, processDefinitionLog.getTimeout()); processUpdate.setInt(2, processDefinitionLog.getTimeout());
processUpdate.setInt(3, processDefinitionLog.getTenantId()); processUpdate.setInt(3, processDefinitionLog.getTenantId());
processUpdate.setString(4, processDefinitionLog.getLocations()); processUpdate.setString(4, processDefinitionLog.getLocations());
processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime()); processUpdate.setDate(5, new Date(processDefinitionLog.getUpdateTime().getTime()));
processUpdate.setInt(6, processDefinitionLog.getId()); processUpdate.setInt(6, processDefinitionLog.getId());
processUpdate.addBatch(); processUpdate.addBatch();
@ -70,9 +70,9 @@ public class JsonSplitDao {
insertLog.setInt(11, processDefinitionLog.getTimeout()); insertLog.setInt(11, processDefinitionLog.getTimeout());
insertLog.setInt(12, processDefinitionLog.getTenantId()); insertLog.setInt(12, processDefinitionLog.getTenantId());
insertLog.setInt(13, processDefinitionLog.getOperator()); insertLog.setInt(13, processDefinitionLog.getOperator());
insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime()); insertLog.setDate(14, new Date(processDefinitionLog.getOperateTime().getTime()));
insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime()); insertLog.setDate(15, new Date(processDefinitionLog.getCreateTime().getTime()));
insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime()); insertLog.setDate(16, new Date(processDefinitionLog.getUpdateTime().getTime()));
insertLog.addBatch(); insertLog.addBatch();
i++; i++;
@ -121,8 +121,8 @@ public class JsonSplitDao {
insert.setInt(7, processTaskRelationLog.getPostTaskVersion()); insert.setInt(7, processTaskRelationLog.getPostTaskVersion());
insert.setInt(8, processTaskRelationLog.getConditionType().getCode()); insert.setInt(8, processTaskRelationLog.getConditionType().getCode());
insert.setString(9, processTaskRelationLog.getConditionParams()); insert.setString(9, processTaskRelationLog.getConditionParams());
insert.setDate(10, (Date) processTaskRelationLog.getCreateTime()); insert.setDate(10, new Date(processTaskRelationLog.getCreateTime().getTime()));
insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime()); insert.setDate(11, new Date(processTaskRelationLog.getUpdateTime().getTime()));
insert.addBatch(); insert.addBatch();
insertLog.setLong(1, processTaskRelationLog.getProjectCode()); insertLog.setLong(1, processTaskRelationLog.getProjectCode());
@ -135,9 +135,9 @@ public class JsonSplitDao {
insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode()); insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode());
insertLog.setString(9, processTaskRelationLog.getConditionParams()); insertLog.setString(9, processTaskRelationLog.getConditionParams());
insertLog.setInt(10, processTaskRelationLog.getOperator()); insertLog.setInt(10, processTaskRelationLog.getOperator());
insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime()); insertLog.setDate(11, new Date(processTaskRelationLog.getOperateTime().getTime()));
insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime()); insertLog.setDate(12, new Date(processTaskRelationLog.getCreateTime().getTime()));
insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime()); insertLog.setDate(13, new Date(processTaskRelationLog.getUpdateTime().getTime()));
insertLog.addBatch(); insertLog.addBatch();
i++; i++;
@ -169,10 +169,10 @@ public class JsonSplitDao {
public void executeJsonSplitTaskDefinition(Connection conn, List<TaskDefinitionLog> taskDefinitionLogs) { public void executeJsonSplitTaskDefinition(Connection conn, List<TaskDefinitionLog> taskDefinitionLogs) {
String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority," String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids," + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,"
+ "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + "create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority," String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator," + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,"
+ "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + "operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
try { try {
PreparedStatement insert = conn.prepareStatement(insertSql); PreparedStatement insert = conn.prepareStatement(insertSql);
PreparedStatement insertLog = conn.prepareStatement(insertLogSql); PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
@ -193,12 +193,12 @@ public class JsonSplitDao {
insert.setInt(13, taskDefinitionLog.getFailRetryTimes()); insert.setInt(13, taskDefinitionLog.getFailRetryTimes());
insert.setInt(14, taskDefinitionLog.getFailRetryInterval()); insert.setInt(14, taskDefinitionLog.getFailRetryInterval());
insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode()); insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
insert.setInt(17, taskDefinitionLog.getTimeout()); insert.setInt(17, taskDefinitionLog.getTimeout());
insert.setInt(18, taskDefinitionLog.getDelayTime()); insert.setInt(18, taskDefinitionLog.getDelayTime());
insert.setString(19, taskDefinitionLog.getResourceIds()); insert.setString(19, taskDefinitionLog.getResourceIds());
insert.setDate(20, (Date) taskDefinitionLog.getCreateTime()); insert.setDate(20, new Date(taskDefinitionLog.getCreateTime().getTime()));
insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime()); insert.setDate(21, new Date(taskDefinitionLog.getUpdateTime().getTime()));
insert.addBatch(); insert.addBatch();
insertLog.setLong(1, taskDefinitionLog.getCode()); insertLog.setLong(1, taskDefinitionLog.getCode());
@ -216,14 +216,14 @@ public class JsonSplitDao {
insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes()); insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes());
insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval()); insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval());
insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode()); insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy() == null ? 0 : taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
insertLog.setInt(17, taskDefinitionLog.getTimeout()); insertLog.setInt(17, taskDefinitionLog.getTimeout());
insertLog.setInt(18, taskDefinitionLog.getDelayTime()); insertLog.setInt(18, taskDefinitionLog.getDelayTime());
insertLog.setString(19, taskDefinitionLog.getResourceIds()); insertLog.setString(19, taskDefinitionLog.getResourceIds());
insertLog.setInt(20, taskDefinitionLog.getOperator()); insertLog.setInt(20, taskDefinitionLog.getOperator());
insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime()); insertLog.setDate(21, new Date(taskDefinitionLog.getOperateTime().getTime()));
insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime()); insertLog.setDate(22, new Date(taskDefinitionLog.getCreateTime().getTime()));
insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime()); insertLog.setDate(23, new Date(taskDefinitionLog.getUpdateTime().getTime()));
insertLog.addBatch(); insertLog.addBatch();
i++; i++;

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java

@ -148,7 +148,7 @@ public class ProcessDefinitionDao {
pstmt.setLong(1, processDefinition.getCode()); pstmt.setLong(1, processDefinition.getCode());
long projectCode = processDefinition.getProjectCode(); long projectCode = processDefinition.getProjectCode();
if (String.valueOf(projectCode).length() <= 10) { if (String.valueOf(projectCode).length() <= 10) {
Integer projectId = Integer.getInteger(String.valueOf(projectCode)); Integer projectId = Integer.parseInt(String.valueOf(projectCode));
if (projectIdCodeMap.containsKey(projectId)) { if (projectIdCodeMap.containsKey(projectId)) {
projectCode = projectIdCodeMap.get(projectId); projectCode = projectIdCodeMap.get(projectId);
processDefinition.setProjectCode(projectCode); processDefinition.setProjectCode(projectCode);

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java

@ -77,7 +77,7 @@ public class ScheduleDao {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) { try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
long projectDefinitionCode = entry.getValue(); long projectDefinitionCode = entry.getValue();
if (String.valueOf(projectDefinitionCode).length() <= 10) { if (String.valueOf(projectDefinitionCode).length() <= 10) {
Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode)); Integer projectDefinitionId = Integer.parseInt(String.valueOf(projectDefinitionCode));
if (processIdCodeMap.containsKey(projectDefinitionId)) { if (processIdCodeMap.containsKey(projectDefinitionId)) {
projectDefinitionCode = processIdCodeMap.get(projectDefinitionId); projectDefinitionCode = processIdCodeMap.get(projectDefinitionId);
} }

31
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -63,10 +63,10 @@ import javax.sql.DataSource;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class UpgradeDao extends AbstractBaseDao { public abstract class UpgradeDao extends AbstractBaseDao {
@ -649,14 +649,17 @@ public abstract class UpgradeDao extends AbstractBaseDao {
ObjectNode param = (ObjectNode) task.get("params"); ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
if (param != null) { if (param != null) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); JsonNode resourceJsonNode = param.get("resourceList");
if (!resourceList.isEmpty()) { if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList()); List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ",")); taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, Constants.COMMA));
} else {
taskDefinitionLog.setResourceIds(StringUtils.EMPTY);
} }
param.put("conditionResult", task.get("conditionResult")); param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence")); param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(param.asText()); taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
} }
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) { if (timeout != null) {
@ -674,6 +677,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog.setName(name); taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText()); taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText());
long taskCode = SnowFlakeUtils.getInstance().nextId(); long taskCode = SnowFlakeUtils.getInstance().nextId();
// System.out.println(taskCode);
taskDefinitionLog.setCode(taskCode); taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST); taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode()); taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
@ -686,7 +690,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
taskDefinitionLog.setUpdateTime(now); taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog); taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode); taskIdCodeMap.put(task.get("id").asText(), taskCode);
List<String> preTasks = JSONUtils.toList(task.get("preTasks").asText(), String.class); List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks); taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode); taskNameCodeMap.put(name, taskCode);
} }
@ -745,13 +749,16 @@ public abstract class UpgradeDao extends AbstractBaseDao {
if (StringUtils.isBlank(locations)) { if (StringUtils.isBlank(locations)) {
return locations; return locations;
} }
Map<String, String> locationsMap = JSONUtils.toMap(locations); Map<String, ObjectNode> locationsMap = JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
JsonNodeFactory factory = new JsonNodeFactory(false); });
ArrayNode jsonNodes = factory.arrayNode(); if (locationsMap == null) {
for (Map.Entry<String, String> entry : locationsMap.entrySet()) { return locations;
ObjectNode nodes = factory.objectNode(); }
ArrayNode jsonNodes = JSONUtils.createArrayNode();
for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
ObjectNode nodes = JSONUtils.createObjectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey())); nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue()); ObjectNode oldNodes = entry.getValue();
nodes.put("x", oldNodes.get("x").asInt()); nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt()); nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes); jsonNodes.add(nodes);

3
sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql

@ -15,8 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
alter table t_ds_process_definition drop primary key; alter table t_ds_process_definition drop primary key, ADD PRIMARY KEY (`id`,`code`);
ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`);
ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`; ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`;
ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`; ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`;
alter table t_ds_process_definition drop process_definition_json; alter table t_ds_process_definition drop process_definition_json;

Loading…
Cancel
Save