diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index a7acafc475..235f223f9b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; @@ -54,6 +55,7 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import javax.sql.DataSource; @@ -61,6 +63,8 @@ import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -599,7 +603,9 @@ public abstract class UpgradeDao extends AbstractBaseDao { List processDefinitionLogs = new ArrayList<>(); List processTaskRelationLogs = new ArrayList<>(); List taskDefinitionLogs = new ArrayList<>(); - splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs); + Map>> processTaskMap = new HashMap<>(); + splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs, processTaskMap); + convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap); // execute json split jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs); @@ -614,7 +620,8 @@ public abstract class UpgradeDao extends AbstractBaseDao { Map processDefinitionJsonMap, List processDefinitionLogs, List processTaskRelationLogs, - List taskDefinitionLogs) throws Exception { + List taskDefinitionLogs, + Map>> processTaskMap) throws Exception { Map processDefinitionMap = processDefinitions.stream() .collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition)); Date now = new Date(); @@ -634,6 +641,8 @@ public abstract class UpgradeDao extends AbstractBaseDao { Map taskIdCodeMap = new HashMap<>(); Map> taskNamePreMap = new HashMap<>(); Map taskNameCodeMap = new HashMap<>(); + Map> processCodeTaskNameCodeMap = new HashMap<>(); + List taskDefinitionLogList = new ArrayList<>(); ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); for (int i = 0; i < tasks.size(); i++) { ObjectNode task = (ObjectNode) tasks.path(i); @@ -647,7 +656,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { } param.put("conditionResult", task.get("conditionResult")); param.put("dependence", task.get("dependence")); - taskDefinitionLog.setTaskParams(param.toString()); + taskDefinitionLog.setTaskParams(param.asText()); } TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); if (timeout != null) { @@ -655,15 +664,15 @@ public abstract class UpgradeDao extends AbstractBaseDao { taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy()); } - taskDefinitionLog.setDescription(task.get("description").toString()); - taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").toString()) ? Flag.YES : Flag.NO); - taskDefinitionLog.setTaskType(task.get("type").toString()); + taskDefinitionLog.setDescription(task.get("description").asText()); + taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO); + taskDefinitionLog.setTaskType(task.get("type").asText()); taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt()); taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt()); taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class)); - String name = task.get("name").toString(); + String name = task.get("name").asText(); taskDefinitionLog.setName(name); - taskDefinitionLog.setWorkerGroup(task.get("workerGroup").toString()); + taskDefinitionLog.setWorkerGroup(task.get("workerGroup").asText()); long taskCode = SnowFlakeUtils.getInstance().nextId(); taskDefinitionLog.setCode(taskCode); taskDefinitionLog.setVersion(Constants.VERSION_FIRST); @@ -675,12 +684,14 @@ public abstract class UpgradeDao extends AbstractBaseDao { taskDefinitionLog.setOperateTime(now); taskDefinitionLog.setCreateTime(now); taskDefinitionLog.setUpdateTime(now); - taskDefinitionLogs.add(taskDefinitionLog); - taskIdCodeMap.put(task.get("id").toString(), taskCode); - List preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class); + taskDefinitionLogList.add(taskDefinitionLog); + taskIdCodeMap.put(task.get("id").asText(), taskCode); + List preTasks = JSONUtils.toList(task.get("preTasks").asText(), String.class); taskNamePreMap.put(name, preTasks); taskNameCodeMap.put(name, taskCode); } + convertConditions(taskDefinitionLogList, taskNameCodeMap); + taskDefinitionLogs.addAll(taskDefinitionLogList); processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap)); ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); processDefinitionLog.setOperator(1); @@ -688,6 +699,45 @@ public abstract class UpgradeDao extends AbstractBaseDao { processDefinitionLog.setUpdateTime(now); processDefinitionLogs.add(processDefinitionLog); handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs); + processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap); + processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap); + } + } + + public void convertConditions(List taskDefinitionLogList, Map taskNameCodeMap) throws Exception { + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) { + if (TaskType.CONDITIONS.getDesc().equals(taskDefinitionLog.getTaskType())) { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); + // reset conditionResult + ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult"); + List successNode = JSONUtils.toList(conditionResult.get("successNode").toString(), String.class); + List nodeCode = new ArrayList<>(); + successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); + conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); + List failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class); + nodeCode.clear(); + failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); + conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); + // reset dependItemList + ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); + ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); + for (int i = 0; i < dependTaskList.size(); i++) { + ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); + ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); + for (int j = 0; j < dependItemList.size(); j++) { + ObjectNode dependItem = (ObjectNode) dependItemList.path(j); + JsonNode depTasks = dependItem.get("depTasks"); + dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText())); + dependItem.remove("depTasks"); + dependItemList.set(j, dependItem); + } + dependTask.put("dependItemList", dependItemList); + dependTaskList.set(i, dependTask); + } + dependence.put("dependTaskList", dependTaskList); + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); + } } } @@ -709,6 +759,44 @@ public abstract class UpgradeDao extends AbstractBaseDao { return jsonNodes.toString(); } + public void convertDependence(List taskDefinitionLogs, + Map projectIdCodeMap, + Map>> processTaskMap) { + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + if (TaskType.DEPENDENT.getDesc().equals(taskDefinitionLog.getTaskType())) { + ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); + ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); + ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); + for (int i = 0; i < dependTaskList.size(); i++) { + ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); + ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); + for (int j = 0; j < dependItemList.size(); j++) { + ObjectNode dependItem = (ObjectNode) dependItemList.path(j); + dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt())); + int definitionId = dependItem.get("definitionId").asInt(); + Map> processCodeTaskNameCodeMap = processTaskMap.get(definitionId); + Optional>> mapEntry = processCodeTaskNameCodeMap.entrySet().stream().findFirst(); + if (mapEntry.isPresent()) { + Map.Entry> processCodeTaskNameCodeEntry = mapEntry.get(); + dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey()); + String depTasks = dependItem.get("depTasks").asText(); + long taskCode = "ALL".equals(depTasks) ? 0L : processCodeTaskNameCodeEntry.getValue().get(depTasks); + dependItem.put("depTaskCode", taskCode); + } + dependItem.remove("projectId"); + dependItem.remove("definitionId"); + dependItem.remove("depTasks"); + dependItemList.set(j, dependItem); + } + dependTask.put("dependItemList", dependItemList); + dependTaskList.set(i, dependTask); + } + dependence.put("dependTaskList", dependTaskList); + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); + } + } + } + private void handleProcessTaskRelation(Map> taskNamePreMap, Map taskNameCodeMap, ProcessDefinition processDefinition,