|
|
@ -19,10 +19,8 @@ package org.apache.dolphinscheduler.dao.upgrade; |
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
import com.alibaba.fastjson.JSONArray; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import com.alibaba.fastjson.JSONObject; |
|
|
|
import org.apache.dolphinscheduler.common.enums.DbType; |
|
|
|
import org.apache.dolphinscheduler.common.enums.DbType; |
|
|
|
import org.apache.dolphinscheduler.common.utils.ConnectionUtils; |
|
|
|
import org.apache.dolphinscheduler.common.process.ResourceInfo; |
|
|
|
import org.apache.dolphinscheduler.common.utils.SchemaUtils; |
|
|
|
import org.apache.dolphinscheduler.common.utils.*; |
|
|
|
import org.apache.dolphinscheduler.common.utils.ScriptRunner; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.common.utils.StringUtils; |
|
|
|
|
|
|
|
import org.apache.dolphinscheduler.dao.AbstractBaseDao; |
|
|
|
import org.apache.dolphinscheduler.dao.AbstractBaseDao; |
|
|
|
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; |
|
|
|
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.Logger; |
|
|
@ -36,7 +34,9 @@ import java.sql.ResultSet; |
|
|
|
import java.sql.SQLException; |
|
|
|
import java.sql.SQLException; |
|
|
|
import java.text.MessageFormat; |
|
|
|
import java.text.MessageFormat; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.HashMap; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Map; |
|
|
|
|
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
|
|
|
|
public abstract class UpgradeDao extends AbstractBaseDao { |
|
|
|
public abstract class UpgradeDao extends AbstractBaseDao { |
|
|
|
|
|
|
|
|
|
|
@ -270,6 +270,15 @@ public abstract class UpgradeDao extends AbstractBaseDao { |
|
|
|
public void upgradeDolphinSchedulerWorkerGroup() { |
|
|
|
public void upgradeDolphinSchedulerWorkerGroup() { |
|
|
|
updateProcessDefinitionJsonWorkerGroup(); |
|
|
|
updateProcessDefinitionJsonWorkerGroup(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* upgrade DolphinScheduler resource list |
|
|
|
|
|
|
|
* ds-1.3.2 modify the resource list for process definition json |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
public void upgradeDolphinSchedulerResourceList() { |
|
|
|
|
|
|
|
updateProcessDefinitionJsonResourceList(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* updateProcessDefinitionJsonWorkerGroup |
|
|
|
* updateProcessDefinitionJsonWorkerGroup |
|
|
|
*/ |
|
|
|
*/ |
|
|
@ -288,13 +297,16 @@ public abstract class UpgradeDao extends AbstractBaseDao { |
|
|
|
for (int i = 0 ;i < tasks.size() ; i++){ |
|
|
|
for (int i = 0 ;i < tasks.size() ; i++){ |
|
|
|
JSONObject task = tasks.getJSONObject(i); |
|
|
|
JSONObject task = tasks.getJSONObject(i); |
|
|
|
Integer workerGroupId = task.getInteger("workerGroupId"); |
|
|
|
Integer workerGroupId = task.getInteger("workerGroupId"); |
|
|
|
|
|
|
|
if (workerGroupId != null) { |
|
|
|
if (workerGroupId == -1) { |
|
|
|
if (workerGroupId == -1) { |
|
|
|
task.put("workerGroup", "default"); |
|
|
|
task.put("workerGroup", "default"); |
|
|
|
}else { |
|
|
|
} else { |
|
|
|
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); |
|
|
|
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
jsonObject.remove(jsonObject.getString("tasks")); |
|
|
|
jsonObject.remove(jsonObject.getString("tasks")); |
|
|
|
|
|
|
|
|
|
|
|
jsonObject.put("tasks",tasks); |
|
|
|
jsonObject.put("tasks",tasks); |
|
|
@ -310,6 +322,58 @@ public abstract class UpgradeDao extends AbstractBaseDao { |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* updateProcessDefinitionJsonResourceList |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
protected void updateProcessDefinitionJsonResourceList(){ |
|
|
|
|
|
|
|
ResourceDao resourceDao = new ResourceDao(); |
|
|
|
|
|
|
|
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); |
|
|
|
|
|
|
|
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>(); |
|
|
|
|
|
|
|
try { |
|
|
|
|
|
|
|
Map<String,Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection()); |
|
|
|
|
|
|
|
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){ |
|
|
|
|
|
|
|
JSONObject jsonObject = JSONObject.parseObject(entry.getValue()); |
|
|
|
|
|
|
|
JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks")); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0 ;i < tasks.size() ; i++){ |
|
|
|
|
|
|
|
JSONObject task = tasks.getJSONObject(i); |
|
|
|
|
|
|
|
JSONObject param = (JSONObject) task.get("params"); |
|
|
|
|
|
|
|
if (param != null) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
List<ResourceInfo> resourceList = JSONUtils.toList(param.getString("resourceList"), ResourceInfo.class); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (CollectionUtils.isNotEmpty(resourceList)) { |
|
|
|
|
|
|
|
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> { |
|
|
|
|
|
|
|
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s",resInfo.getRes()); |
|
|
|
|
|
|
|
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) { |
|
|
|
|
|
|
|
resInfo.setId(resourcesMap.get(fullName)); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return resInfo; |
|
|
|
|
|
|
|
}).collect(Collectors.toList()); |
|
|
|
|
|
|
|
param.put("resourceList",JSONArray.parse(JSONObject.toJSONString(newResourceList))); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
task.put("params",param); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
jsonObject.remove(jsonObject.getString("tasks")); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
jsonObject.put("tasks",tasks); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (replaceProcessDefinitionMap.size() > 0){ |
|
|
|
|
|
|
|
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}catch (Exception e){ |
|
|
|
|
|
|
|
logger.error("update process definition json resource list error",e); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* upgradeDolphinScheduler DML |
|
|
|
* upgradeDolphinScheduler DML |
|
|
|
* @param schemaDir schemaDir |
|
|
|
* @param schemaDir schemaDir |
|
|
|