diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java index 8d1d862640..b2daae28cb 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java @@ -117,6 +117,8 @@ public class DolphinSchedulerManager { upgradeDao.upgradeDolphinScheduler(schemaDir); if ("1.3.0".equals(schemaVersion)) { upgradeDao.upgradeDolphinSchedulerWorkerGroup(); + } else if ("1.3.2".equals(schemaVersion)) { + upgradeDao.upgradeDolphinSchedulerResourceList(); } version = schemaVersion; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index 29c625337d..2321e13030 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -16,10 +16,10 @@ */ package org.apache.dolphinscheduler.dao.upgrade; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; @@ -34,7 +34,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.text.MessageFormat; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public abstract class UpgradeDao extends AbstractBaseDao { @@ -268,6 +270,15 @@ public abstract class UpgradeDao extends AbstractBaseDao { public void upgradeDolphinSchedulerWorkerGroup() { updateProcessDefinitionJsonWorkerGroup(); } + + /** + * upgrade DolphinScheduler resource list + * ds-1.3.2 modify the resource list for process definition json + */ + public void upgradeDolphinSchedulerResourceList() { + updateProcessDefinitionJsonResourceList(); + } + /** * updateProcessDefinitionJsonWorkerGroup */ @@ -312,6 +323,66 @@ public abstract class UpgradeDao extends AbstractBaseDao { } + /** + * updateProcessDefinitionJsonResourceList + */ + protected void updateProcessDefinitionJsonResourceList(){ + ResourceDao resourceDao = new ResourceDao(); + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + Map replaceProcessDefinitionMap = new HashMap<>(); + try { + Map resourcesMap = resourceDao.listAllResources(dataSource.getConnection()); + Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); + + for (Map.Entry entry : processDefinitionJsonMap.entrySet()){ + ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); + ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); + + for (int i = 0 ;i < tasks.size() ; i++){ + ObjectNode task = (ObjectNode) tasks.get(i); + ObjectNode param = (ObjectNode)task.get("params"); + if (param != null) { + + List resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); + ResourceInfo mainJar = JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class); + if (mainJar != null && mainJar.getId() == 0) { + String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() : String.format("/%s",mainJar.getRes()); + if (resourcesMap.containsKey(fullName)) { + mainJar.setId(resourcesMap.get(fullName)); + param.put("mainJar",JSONUtils.parseObject(JSONUtils.toJsonString(mainJar))); + } + } + + if (CollectionUtils.isNotEmpty(resourceList)) { + List newResourceList = resourceList.stream().map(resInfo -> { + String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s",resInfo.getRes()); + if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) { + resInfo.setId(resourcesMap.get(fullName)); + } + return resInfo; + }).collect(Collectors.toList()); + param.put("resourceList",JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList))); + } + } + task.put("params",param); + + } + + jsonObject.remove("tasks"); + + jsonObject.put("tasks",tasks); + + replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toString()); + } + if (replaceProcessDefinitionMap.size() > 0){ + processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap); + } + }catch (Exception e){ + logger.error("update process definition json resource list error",e); + } + + } + /** * upgradeDolphinScheduler DML * @param schemaDir schemaDir