From d0fbfeaf1764fe6202cbbbfa1ac7e48f8f0e5a1d Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Mon, 25 May 2020 17:18:15 +0800 Subject: [PATCH] process definition json worker group convert #2794 (#2809) * add LoggerServerTest UT * add LoggerServerTest UT * add LoggerServerTest UT add RemoveTaskLogRequestCommandTest UT add RemoveTaskLogResponseCommandTest * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * add not worker log and remove worker invalid property * process definition json worker group convert #2794 * process definition json worker group convert #2794 * process definition json worker group convert #2794 * process definition json worker group convert #2794 * process definition json worker group convert #2794 * process definition json worker group convert #2794 * process definition json worker group convert #2794 * process definition json worker group convert #2794 Co-authored-by: qiaozhanwei --- .../dao/upgrade/UpgradeDao.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index 692351b5f0..39aab3ec41 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.dao.upgrade; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.*; @@ -274,18 +276,24 @@ public abstract class UpgradeDao extends AbstractBaseDao { Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); for (Map.Entry entry : processDefinitionJsonMap.entrySet()){ - ProcessData processData = JSONUtils.parseObject(entry.getValue(), ProcessData.class); - - List tasks = processData.getTasks(); - for (TaskNode taskNode : tasks){ - Integer workerGroupId = taskNode.getWorkerGroupId(); - if (workerGroupId == -1){ - taskNode.setWorkerGroup("default"); + JSONObject jsonObject = JSONObject.parseObject(entry.getValue()); + JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks")); + + for (int i = 0 ;i < tasks.size() ; i++){ + JSONObject task = tasks.getJSONObject(i); + Integer workerGroupId = task.getInteger("workerGroupId"); + if (workerGroupId == -1) { + task.put("workerGroup", "default"); }else { - taskNode.setWorkerGroup(oldWorkerGroupMap.get(workerGroupId)); + task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); } } - replaceProcessDefinitionMap.put(entry.getKey(),JSONUtils.toJson(processData)); + + jsonObject.remove(jsonObject.getString("tasks")); + + jsonObject.put("tasks",tasks); + + replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString()); } if (replaceProcessDefinitionMap.size() > 0){ processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);