From 713fdbc444fc0dbaf68c34277f9afa4561449336 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 20 Jan 2023 20:48:07 +0800 Subject: [PATCH] Split the upgrade code (#13436) 1. Split the upgrade code into different version 2. Log the dml/ddl sql in origin friendly format 3. Fix ddl of 3.0.0 and 3.1.0 4. Fix search resource regular expression injection --- .../service/impl/ResourcesServiceImpl.java | 2 +- .../common/utils/ScriptRunner.java | 18 +- .../dao/upgrade/JsonSplitDao.java | 7 - .../dao/upgrade/ProcessDefinitionDao.java | 25 +- .../dao/upgrade/ProjectDao.java | 13 +- .../dao/upgrade/ScheduleDao.java | 14 +- .../dao/upgrade/WorkerGroupDao.java | 14 +- .../mysql/dolphinscheduler_ddl.sql | 2 +- .../mysql/dolphinscheduler_ddl.sql | 59 +- .../mysql/dolphinscheduler_ddl.sql | 21 +- .../datasource/DolphinSchedulerManager.java | 17 +- .../tools/datasource/dao/MySQLUpgradeDao.java | 22 +- .../datasource/dao/PostgreSQLUpgradeDao.java | 39 +- .../tools/datasource/dao/ResourceDao.java | 71 +- .../tools/datasource/dao/UpgradeDao.java | 607 ++---------------- .../upgrader/DolphinSchedulerUpgrader.java | 2 +- .../upgrader/DolphinSchedulerVersion.java | 47 ++ .../v130/V130DolphinSchedulerUpgrader.java | 97 +++ .../v132/V132DolphinSchedulerUpgrader.java | 151 +++++ .../v200/V200DolphinSchedulerUpgrader.java | 400 ++++++++++++ .../v320/V320DolphinSchedulerUpgrader.java | 5 +- 21 files changed, 844 insertions(+), 789 deletions(-) create mode 100644 dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java create mode 100644 dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java create mode 100644 dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java create mode 100644 dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index f1821fcb63..430e84077f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -763,7 +763,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String trimmedSearchVal = searchVal != null ? searchVal.trim() : ""; // filter based on trimmed searchVal List filteredResourceList = resourcesList.stream() - .filter(x -> x.getFileName().matches("(.*)" + trimmedSearchVal + "(.*)")).collect(Collectors.toList()); + .filter(x -> x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList()); // inefficient pagination List slicedResourcesList = filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize) .limit(pageSize).collect(Collectors.toList()); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java index 33c632b8d6..e3212931f2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java @@ -24,6 +24,8 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,17 +93,17 @@ public class ScriptRunner { * @throws IOException if there is an error reading from the Reader */ private void runScript(Connection conn, Reader reader) throws IOException, SQLException { - StringBuffer command = null; + List command = null; try { LineNumberReader lineReader = new LineNumberReader(reader); String line; while ((line = lineReader.readLine()) != null) { if (command == null) { - command = new StringBuffer(); + command = new ArrayList<>(); } String trimmedLine = line.trim(); if (trimmedLine.startsWith("--")) { - logger.info(trimmedLine); + logger.info("\n{}", trimmedLine); } else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) { // Do nothing } else if (trimmedLine.startsWith("delimiter")) { @@ -110,12 +112,11 @@ public class ScriptRunner { } else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter()) || fullLineDelimiter && trimmedLine.equals(getDelimiter())) { - command.append(line, 0, line.lastIndexOf(getDelimiter())); - command.append(" "); - logger.info("sql: {}", command); + command.add(line.substring(0, line.lastIndexOf(getDelimiter()))); + logger.info("\n{}", String.join("\n", command)); try (Statement statement = conn.createStatement()) { - statement.execute(command.toString()); + statement.execute(String.join(" ", command)); try (ResultSet rs = statement.getResultSet()) { if (stopOnError && rs != null) { ResultSetMetaData md = rs.getMetaData(); @@ -142,8 +143,7 @@ public class ScriptRunner { command = null; Thread.yield(); } else { - command.append(line); - command.append(" "); + command.add(line); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java index f2cb65877f..25bdb8b3cc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.dao.upgrade; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; @@ -92,8 +91,6 @@ public class JsonSplitDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); - } finally { - ConnectionUtils.releaseResource(conn); } } @@ -160,8 +157,6 @@ public class JsonSplitDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); - } finally { - ConnectionUtils.releaseResource(conn); } } @@ -250,8 +245,6 @@ public class JsonSplitDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); - } finally { - ConnectionUtils.releaseResource(conn); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java index 75a66a5e2f..61fbf81900 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import java.sql.Connection; @@ -50,11 +49,9 @@ public class ProcessDefinitionDao { Map processDefinitionJsonMap = new HashMap<>(); String sql = "SELECT id,process_definition_json FROM t_ds_process_definition"; - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { Integer id = rs.getInt(1); @@ -65,8 +62,6 @@ public class ProcessDefinitionDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); } return processDefinitionJsonMap; @@ -91,8 +86,6 @@ public class ProcessDefinitionDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(conn); } } @@ -100,11 +93,9 @@ public class ProcessDefinitionDao { List processDefinitions = new ArrayList<>(); String sql = "SELECT id,code,project_code,user_id,locations,name,description,release_state,flag,create_time FROM t_ds_process_definition"; - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(rs.getInt(1)); @@ -127,8 +118,6 @@ public class ProcessDefinitionDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); } return processDefinitions; } @@ -164,8 +153,6 @@ public class ProcessDefinitionDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(conn); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java index 2906902f5a..e8e3f9ad2c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.dao.upgrade; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import java.sql.Connection; import java.sql.PreparedStatement; @@ -42,11 +41,9 @@ public class ProjectDao { public Map queryAllProject(Connection conn) { Map projectMap = new HashMap<>(); String sql = "SELECT id,code FROM t_ds_project"; - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { Integer id = rs.getInt(1); long code = rs.getLong(2); @@ -58,8 +55,6 @@ public class ProjectDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); } return projectMap; } @@ -83,8 +78,6 @@ public class ProjectDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(conn); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java index 9d486d0077..3717df0a6b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.dao.upgrade; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; - import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -42,11 +40,9 @@ public class ScheduleDao { public Map queryAllSchedule(Connection conn) { Map scheduleMap = new HashMap<>(); String sql = "SELECT id,process_definition_code FROM t_ds_schedules"; - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { Integer id = rs.getInt(1); long processDefinitionCode = rs.getLong(2); @@ -55,8 +51,6 @@ public class ScheduleDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); } return scheduleMap; } @@ -92,8 +86,6 @@ public class ScheduleDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(conn); } } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java index e8a9fc7d82..797e4b94ea 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.dao.upgrade; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; - import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -40,12 +38,10 @@ public class WorkerGroupDao { public Map queryAllOldWorkerGroup(Connection conn) { Map workerGroupMap = new HashMap<>(); - String sql = String.format("select id,name from t_ds_worker_group"); - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); + String sql = "select id,name from t_ds_worker_group"; + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { int id = rs.getInt(1); @@ -56,8 +52,6 @@ public class WorkerGroupDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); } return workerGroupMap; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql index 630984e3b0..72acb02051 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql @@ -469,7 +469,7 @@ BEGIN ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code'; ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code'; ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id'; -ALTER TABLE `t_ds_alert` MODIFY COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type'; +ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type'; END IF; END; d// diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql index 059f0d27ef..85fe7390e8 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql @@ -392,41 +392,46 @@ CALL modify_t_ds_task_group_col_description; DROP PROCEDURE modify_t_ds_task_group_col_description; -- alter table `t_ds_worker_group` add `other_params_json` text; --- alter table `t_ds_process_instance` add `state_history` text; -drop procedure if exists add_column_safety; +drop procedure if exists add_t_ds_task_group_other_params_json; delimiter d// -create procedure add_column_safety(target_table_name varchar(256), target_column varchar(256), - target_column_type varchar(256), sths_else varchar(256)) -begin - declare target_database varchar(256); -select database() into target_database; -IF EXISTS(SELECT * - FROM information_schema.COLUMNS - WHERE COLUMN_NAME = target_column - AND TABLE_NAME = target_table_name - ) +CREATE PROCEDURE add_t_ds_task_group_other_params_json() +BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME='t_ds_worker_group' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='other_params_json') THEN - set @statement = - concat('alter table ', target_table_name, ' change column ', target_column, ' ', target_column, ' ', - target_column_type, ' ', - sths_else); -PREPARE STMT_c FROM @statement; -EXECUTE STMT_c; +alter table `t_ds_worker_group` add column `other_params_json` text DEFAULT NULL COMMENT "other params json"; ELSE - set @statement = - concat('alter table ', target_table_name, ' add column ', target_column, ' ', target_column_type, ' ', - sths_else); -PREPARE STMT_a FROM @statement; -EXECUTE STMT_a; +alter table `t_ds_worker_group` modify column `other_params_json` text DEFAULT NULL COMMENT "other params json"; END IF; -end; +END; d// delimiter ; -call add_column_safety('t_ds_worker_group','other_params_json', 'text' , "DEFAULT NULL COMMENT 'other params json'"); -call add_column_safety('t_ds_process_instance','state_history', 'text' , "DEFAULT NULL COMMENT 'state history desc' AFTER `state`"); +call add_t_ds_task_group_other_params_json(); +drop procedure if exists add_t_ds_task_group_other_params_json; + +-- alter table `t_ds_process_instance` add `state_history` text; +drop procedure if exists add_t_ds_process_instance_state_history; +delimiter d// +CREATE PROCEDURE add_t_ds_process_instance_state_history() +BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME='t_ds_process_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='state_history') + THEN +alter table `t_ds_process_instance` add column `state_history` text DEFAULT NULL COMMENT "other params json"; +ELSE +alter table `t_ds_process_instance` modify column `state_history` text DEFAULT NULL COMMENT "other params json"; +END IF; +END; +d// +delimiter ; +call add_t_ds_process_instance_state_history(); +drop procedure if exists add_t_ds_process_instance_state_history; -drop procedure if exists add_column_safety; alter table t_ds_process_instance alter column process_instance_priority set default 2; alter table t_ds_schedules alter column process_instance_priority set default 2; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql index 978f1bf39f..989c9b51ab 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql @@ -65,5 +65,22 @@ d// delimiter ; -- ALTER TABLE t_ds_worker_group ADD COLUMN description varchar(255) DEFAULT NULL COMMENT 'ds worker group description'; -call add_column_safety('t_ds_worker_group','description', 'varchar(255)' , "DEFAULT NULL COMMENT 'ds worker group description'"); -drop procedure if exists add_column_safety; +drop procedure if exists modify_t_ds_worker_group_description; +delimiter d// +CREATE PROCEDURE modify_t_ds_worker_group_description() +BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME='t_ds_worker_group' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='description') + THEN + alter table `t_ds_worker_group` add column `description` varchar(255) DEFAULT NULL COMMENT "ds worker group description"; +ELSE +alter table `t_ds_worker_group` modify column `description` varchar(255) DEFAULT NULL COMMENT "ds worker group description"; +END IF; +END; +d// +delimiter ; + +call modify_t_ds_worker_group_description(); +drop procedure if exists modify_t_ds_worker_group_description; diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java index 3fe4d0c276..74cf447621 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao; import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion; import org.apache.commons.collections4.CollectionUtils; @@ -45,7 +46,7 @@ public class DolphinSchedulerManager { private final UpgradeDao upgradeDao; - private Map upgraderMap = new HashMap<>(); + private Map upgraderMap = new HashMap<>(); public DolphinSchedulerManager(DataSource dataSource, List daos, List dolphinSchedulerUpgraders) throws Exception { @@ -121,21 +122,11 @@ public class DolphinSchedulerManager { logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion); logger.info("Begin upgrading DolphinScheduler's table structure"); upgradeDao.upgradeDolphinScheduler(schemaDir); - if ("1.3.0".equals(schemaVersion)) { - upgradeDao.upgradeDolphinSchedulerWorkerGroup(); - } else if ("1.3.2".equals(schemaVersion)) { - upgradeDao.upgradeDolphinSchedulerResourceList(); - } else if ("2.0.0".equals(schemaVersion)) { - upgradeDao.upgradeDolphinSchedulerTo200(schemaDir); - } - DolphinSchedulerUpgrader dolphinSchedulerUpgrader = upgraderMap.get(schemaVersion); - if (dolphinSchedulerUpgrader != null) { - dolphinSchedulerUpgrader.doUpgrade(); - } + DolphinSchedulerVersion.getVersion(schemaVersion).ifPresent(v -> upgraderMap.get(v).doUpgrade()); version = schemaVersion; } } - + // todo: do we need to do this in all version > 2.0.6? if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion) && SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) { upgradeDao.upgradeDolphinSchedulerResourceFileSize(); diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java index 7361932034..20651213ec 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.tools.datasource.dao; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import java.sql.Connection; @@ -56,17 +55,13 @@ public class MySQLUpgradeDao extends UpgradeDao { */ @Override public boolean isExistsTable(String tableName) { - ResultSet rs = null; - Connection conn = null; - try { - conn = dataSource.getConnection(); - rs = conn.getMetaData().getTables(conn.getCatalog(), conn.getSchema(), tableName, null); + try ( + Connection conn = dataSource.getConnection(); + ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), conn.getSchema(), tableName, null)) { return rs.next(); } catch (SQLException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(rs, conn); } } @@ -79,19 +74,16 @@ public class MySQLUpgradeDao extends UpgradeDao { */ @Override public boolean isExistsColumn(String tableName, String columnName) { - Connection conn = null; - try { - conn = dataSource.getConnection(); - ResultSet rs = conn.getMetaData().getColumns(conn.getCatalog(), conn.getSchema(), tableName, columnName); + try ( + Connection conn = dataSource.getConnection(); + ResultSet rs = + conn.getMetaData().getColumns(conn.getCatalog(), conn.getSchema(), tableName, columnName)) { return rs.next(); } catch (SQLException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(conn); } - } } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java index 43761ba564..d854a6ef7a 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.tools.datasource.dao; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import org.apache.dolphinscheduler.spi.enums.DbType; import java.sql.Connection; @@ -51,13 +50,10 @@ public class PostgreSQLUpgradeDao extends UpgradeDao { } public String getSchema() { - Connection conn = null; - PreparedStatement pstmt = null; - ResultSet resultSet = null; - try { - conn = dataSource.getConnection(); - pstmt = conn.prepareStatement("select current_schema()"); - resultSet = pstmt.executeQuery(); + try ( + Connection conn = dataSource.getConnection(); + PreparedStatement pstmt = conn.prepareStatement("select current_schema()"); + ResultSet resultSet = pstmt.executeQuery()) { while (resultSet.next()) { if (resultSet.isFirst()) { return resultSet.getString(1); @@ -66,8 +62,6 @@ public class PostgreSQLUpgradeDao extends UpgradeDao { } catch (SQLException e) { logger.error(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(resultSet, pstmt, conn); } return ""; } @@ -80,21 +74,14 @@ public class PostgreSQLUpgradeDao extends UpgradeDao { */ @Override public boolean isExistsTable(String tableName) { - Connection conn = null; - ResultSet rs = null; - try { - conn = dataSource.getConnection(); - - rs = conn.getMetaData().getTables(conn.getCatalog(), getSchema(), tableName, null); - + try ( + Connection conn = dataSource.getConnection(); + ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), getSchema(), tableName, null)) { return rs.next(); } catch (SQLException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(rs, conn); } - } /** @@ -106,20 +93,14 @@ public class PostgreSQLUpgradeDao extends UpgradeDao { */ @Override public boolean isExistsColumn(String tableName, String columnName) { - Connection conn = null; - ResultSet rs = null; - try { - conn = dataSource.getConnection(); - rs = conn.getMetaData().getColumns(conn.getCatalog(), getSchema(), tableName, columnName); + try ( + Connection conn = dataSource.getConnection(); + ResultSet rs = conn.getMetaData().getColumns(conn.getCatalog(), getSchema(), tableName, columnName)) { return rs.next(); } catch (SQLException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(rs, conn); - } - } } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java index 6bfe47baca..d542428a11 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java @@ -17,18 +17,14 @@ package org.apache.dolphinscheduler.tools.datasource.dao; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; - import org.apache.commons.lang3.StringUtils; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,38 +39,6 @@ public class ResourceDao { public static final Logger logger = LoggerFactory.getLogger(ResourceDao.class); - /** - * list all resources - * - * @param conn connection - * @return map that key is full_name and value is id - */ - Map listAllResources(Connection conn) { - Map resourceMap = new HashMap<>(); - - String sql = String.format("SELECT id,full_name FROM t_ds_resources"); - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); - - while (rs.next()) { - Integer id = rs.getInt(1); - String fullName = rs.getString(2); - resourceMap.put(fullName, id); - } - - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); - } - - return resourceMap; - } - /** * list all resources by the type * @@ -86,15 +50,13 @@ public class ResourceDao { String sql = String.format("SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d", type); - ResultSet rs = null; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { while (rs.next()) { String fullName = rs.getString("full_name"); - Boolean isDirectory = rs.getBoolean("is_directory"); + boolean isDirectory = rs.getBoolean("is_directory"); long fileSize = rs.getLong("size"); if (StringUtils.isNotBlank(fullName) && !isDirectory) { @@ -111,16 +73,6 @@ public class ResourceDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - if (Objects.nonNull(pstmt)) { - try { - if (!pstmt.isClosed()) { - pstmt.close(); - } - } catch (SQLException e) { - logger.error(e.getMessage(), e); - } - } } return resourceSizeMap; } @@ -134,9 +86,7 @@ public class ResourceDao { Map resourceSizeMap = listAllResourcesByFileType(conn, type); String sql = "UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true"; - PreparedStatement pstmt = null; - try { - pstmt = conn.prepareStatement(sql); + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { for (Map.Entry entry : resourceSizeMap.entrySet()) { pstmt.setLong(1, entry.getValue()); pstmt.setInt(2, type); @@ -147,17 +97,6 @@ public class ResourceDao { } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); - } finally { - if (Objects.nonNull(pstmt)) { - try { - if (!pstmt.isClosed()) { - pstmt.close(); - } - } catch (SQLException e) { - logger.error(e.getMessage(), e); - } - } - ConnectionUtils.releaseResource(conn); } } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java index 7b4bb6b646..201dd5dfba 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java @@ -17,50 +17,17 @@ package org.apache.dolphinscheduler.tools.datasource.dao; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.ConditionType; -import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.common.enums.TimeoutFlag; -import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; -import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ScriptRunner; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; -import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; -import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao; -import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao; -import org.apache.dolphinscheduler.dao.upgrade.ProjectDao; -import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao; import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils; -import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao; -import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; -import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; import org.apache.dolphinscheduler.spi.enums.DbType; -import org.apache.commons.collections4.CollectionUtils; - import java.io.FileNotFoundException; -import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; import javax.sql.DataSource; @@ -69,14 +36,6 @@ import org.slf4j.LoggerFactory; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; - public abstract class UpgradeDao { public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); @@ -100,6 +59,7 @@ public abstract class UpgradeDao { /** * run init sql to init db schema + * * @param dbType db type */ private void runInitSql(DbType dbType) { @@ -112,8 +72,8 @@ public abstract class UpgradeDao { initScriptRunner.runScript(initSqlReader); } } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); + logger.error("Execute init sql file: {} error", sqlFile, e); + throw new RuntimeException(String.format("Execute init sql file: %s error", sqlFile), e); } } @@ -123,56 +83,24 @@ public abstract class UpgradeDao { public String getCurrentVersion(String versionName) { String sql = String.format("select version from %s", versionName); - Connection conn = null; - ResultSet rs = null; - PreparedStatement pstmt = null; String version = null; - try { - conn = dataSource.getConnection(); - pstmt = conn.prepareStatement(sql); - rs = pstmt.executeQuery(); - + try ( + Connection conn = dataSource.getConnection(); + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { if (rs.next()) { version = rs.getString(1); } - return version; - } catch (SQLException e) { - logger.error(e.getMessage(), e); - throw new RuntimeException("sql: " + sql, e); - } finally { - ConnectionUtils.releaseResource(rs, pstmt, conn); + logger.error("Get current version from database error, sql: {}", sql, e); + throw new RuntimeException("Get current version from database error, sql: " + sql, e); } } public void upgradeDolphinScheduler(String schemaDir) { upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql"); - upgradeDolphinSchedulerDML(schemaDir); - } - - /** - * upgrade DolphinScheduler worker group - * ds-1.3.0 modify the worker group for process definition json - */ - public void upgradeDolphinSchedulerWorkerGroup() { - updateProcessDefinitionJsonWorkerGroup(); - } - - /** - * upgrade DolphinScheduler resource list - * ds-1.3.2 modify the resource list for process definition json - */ - public void upgradeDolphinSchedulerResourceList() { - updateProcessDefinitionJsonResourceList(); - } - - /** - * upgrade DolphinScheduler to 2.0.0 - */ - public void upgradeDolphinSchedulerTo200(String schemaDir) { - processDefinitionJsonSplit(); - upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql"); + upgradeDolphinSchedulerDML(schemaDir, "dolphinscheduler_dml.sql"); } /** @@ -180,183 +108,51 @@ public abstract class UpgradeDao { */ public void upgradeDolphinSchedulerResourceFileSize() { ResourceDao resourceDao = new ResourceDao(); - try { + try (Connection conn = dataSource.getConnection()) { // update the size of the folder that is the type of file. - resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0); + resourceDao.updateResourceFolderSizeByFileType(conn, 0); // update the size of the folder that is the type of udf. - resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1); + resourceDao.updateResourceFolderSizeByFileType(conn, 1); } catch (Exception ex) { logger.error("Failed to upgrade because of failing to update the folder's size of resource files."); } } - /** - * updateProcessDefinitionJsonWorkerGroup - */ - protected void updateProcessDefinitionJsonWorkerGroup() { - WorkerGroupDao workerGroupDao = new WorkerGroupDao(); - ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); - Map replaceProcessDefinitionMap = new HashMap<>(); - try { - Map oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection()); - Map processDefinitionJsonMap = - processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); - - for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { - ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); - ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); - - for (int i = 0; i < tasks.size(); i++) { - ObjectNode task = (ObjectNode) tasks.path(i); - ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId"); - int workerGroupId = -1; - if (workerGroupNode != null && workerGroupNode.canConvertToInt()) { - workerGroupId = workerGroupNode.asInt(-1); - } - if (workerGroupId == -1) { - task.put("workerGroup", "default"); - } else { - task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); - } - } - - jsonObject.remove("task"); - - jsonObject.put("tasks", tasks); - - replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString()); - } - if (replaceProcessDefinitionMap.size() > 0) { - processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), - replaceProcessDefinitionMap); - } - } catch (Exception e) { - logger.error("update process definition json workergroup error", e); - } - } - - protected void updateProcessDefinitionJsonResourceList() { - ResourceDao resourceDao = new ResourceDao(); - ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); - Map replaceProcessDefinitionMap = new HashMap<>(); - try { - Map resourcesMap = resourceDao.listAllResources(dataSource.getConnection()); - Map processDefinitionJsonMap = - processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); - - for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { - ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); - ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); - - for (int i = 0; i < tasks.size(); i++) { - ObjectNode task = (ObjectNode) tasks.get(i); - ObjectNode param = (ObjectNode) task.get("params"); - if (param != null) { - - List resourceList = - JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); - ResourceInfo mainJar = - JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class); - if (mainJar != null && mainJar.getId() == null) { - String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() - : String.format("/%s", mainJar.getRes()); - if (resourcesMap.containsKey(fullName)) { - mainJar.setId(resourcesMap.get(fullName)); - param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar))); - } - } - - if (CollectionUtils.isNotEmpty(resourceList)) { - List newResourceList = resourceList.stream().map(resInfo -> { - String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() - : String.format("/%s", resInfo.getRes()); - if (resInfo.getId() == null && resourcesMap.containsKey(fullName)) { - resInfo.setId(resourcesMap.get(fullName)); - } - return resInfo; - }).collect(Collectors.toList()); - param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList))); - } - } - task.put("params", param); - - } - - jsonObject.remove("tasks"); - - jsonObject.put("tasks", tasks); - - replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString()); - } - if (replaceProcessDefinitionMap.size() > 0) { - processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(), - replaceProcessDefinitionMap); - } - } catch (Exception e) { - logger.error("update process definition json resource list error", e); - } - - } - - private void upgradeDolphinSchedulerDML(String schemaDir) { + private void upgradeDolphinSchedulerDML(String schemaDir, String scriptFile) { String schemaVersion = schemaDir.split("_")[0]; - Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql", - schemaDir, getDbType().name().toLowerCase())); - logger.info("sqlSQLFilePath: {}", sqlFilePath); - Connection conn = null; - PreparedStatement pstmt = null; - try { - conn = dataSource.getConnection(); + Resource sqlFilePath = new ClassPathResource( + String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile)); + try (Connection conn = dataSource.getConnection()) { conn.setAutoCommit(false); // Execute the upgraded dolphinscheduler dml ScriptRunner scriptRunner = new ScriptRunner(conn, false, true); try (Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream())) { scriptRunner.runScript(sqlReader); + String upgradeSQL; if (isExistsTable(T_VERSION_NAME)) { // Change version in the version table to the new version - String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME); - pstmt = conn.prepareStatement(upgradeSQL); - pstmt.setString(1, schemaVersion); - pstmt.executeUpdate(); + upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME); } else if (isExistsTable(T_NEW_VERSION_NAME)) { // Change version in the version table to the new version - String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME); - pstmt = conn.prepareStatement(upgradeSQL); + upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME); + } else { + throw new RuntimeException("The version table does not exist"); + } + try (PreparedStatement pstmt = conn.prepareStatement(upgradeSQL)) { pstmt.setString(1, schemaVersion); pstmt.executeUpdate(); } conn.commit(); } + logger.info("Success execute the dml file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile); } catch (FileNotFoundException e) { - try { - conn.rollback(); - } catch (SQLException e1) { - logger.error(e1.getMessage(), e1); - } - logger.error(e.getMessage(), e); + logger.error("Cannot find the DDL file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile, e); throw new RuntimeException("sql file not found ", e); - } catch (IOException e) { - try { - conn.rollback(); - } catch (SQLException e1) { - logger.error(e1.getMessage(), e1); - } - logger.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); } catch (Exception e) { - try { - if (null != conn) { - conn.rollback(); - } - } catch (SQLException e1) { - logger.error(e1.getMessage(), e1); - } - logger.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(pstmt, conn); + logger.error("Execute ddl file failed, meet an unknown exception, schemaDir: {}, ddlScript: {}", schemaDir, + scriptFile, e); + throw new RuntimeException("Execute ddl file failed, meet an unknown exception", e); } - } /** @@ -364,30 +160,24 @@ public abstract class UpgradeDao { * * @param schemaDir schemaDir */ - private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) { + public void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) { Resource sqlFilePath = new ClassPathResource( String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile)); - Connection conn = null; - PreparedStatement pstmt = null; - try { - conn = dataSource.getConnection(); - String dbName = conn.getCatalog(); - logger.info(dbName); + try (Connection conn = dataSource.getConnection()) { conn.setAutoCommit(true); // Execute the dolphinscheduler ddl.sql for the upgrade ScriptRunner scriptRunner = new ScriptRunner(conn, true, true); try (Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream())) { scriptRunner.runScript(sqlReader); } + logger.info("Success execute the ddl file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile); } catch (FileNotFoundException e) { - - logger.error(e.getMessage(), e); + logger.error("Cannot find the DDL file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile, e); throw new RuntimeException("sql file not found ", e); } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } finally { - ConnectionUtils.releaseResource(pstmt, conn); + logger.error("Execute ddl file failed, meet an unknown exception, schemaDir: {}, ddlScript: {}", schemaDir, + scriptFile, e); + throw new RuntimeException("Execute ddl file failed, meet an unknown exception", e); } } @@ -403,330 +193,15 @@ public abstract class UpgradeDao { versionName = "t_ds_version"; } String upgradeSQL = String.format("update %s set version = ?", versionName); - PreparedStatement pstmt = null; - Connection conn = null; - try { - conn = dataSource.getConnection(); - pstmt = conn.prepareStatement(upgradeSQL); + try ( + Connection conn = dataSource.getConnection(); + PreparedStatement pstmt = conn.prepareStatement(upgradeSQL)) { pstmt.setString(1, version); pstmt.executeUpdate(); } catch (SQLException e) { - logger.error(e.getMessage(), e); - throw new RuntimeException("sql: " + upgradeSQL, e); - } finally { - ConnectionUtils.releaseResource(pstmt, conn); - } - - } - - /** - * upgrade DolphinScheduler to 2.0.0, json split - */ - private void processDefinitionJsonSplit() { - ProjectDao projectDao = new ProjectDao(); - ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); - ScheduleDao scheduleDao = new ScheduleDao(); - JsonSplitDao jsonSplitDao = new JsonSplitDao(); - try { - // execute project - Map projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection()); - projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap); - - // execute process definition code - List processDefinitions = - processDefinitionDao.queryProcessDefinition(dataSource.getConnection()); - processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, - projectIdCodeMap); - - // execute schedule - Map allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection()); - Map processIdCodeMap = processDefinitions.stream() - .collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode)); - scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap); - - // json split - Map processDefinitionJsonMap = - processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); - List processDefinitionLogs = new ArrayList<>(); - List processTaskRelationLogs = new ArrayList<>(); - List taskDefinitionLogs = new ArrayList<>(); - Map>> processTaskMap = new HashMap<>(); - splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, - processTaskRelationLogs, taskDefinitionLogs, processTaskMap); - convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap); - - // execute json split - jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs); - jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs); - jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs); - } catch (Exception e) { - logger.error("json split error", e); - } - } - - private void splitProcessDefinitionJson(List processDefinitions, - Map processDefinitionJsonMap, - List processDefinitionLogs, - List processTaskRelationLogs, - List taskDefinitionLogs, - Map>> processTaskMap) throws Exception { - Map processDefinitionMap = processDefinitions.stream() - .collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition)); - Date now = new Date(); - for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { - if (entry.getValue() == null) { - throw new Exception("processDefinitionJson is null"); - } - ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); - ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey()); - if (processDefinition != null) { - processDefinition - .setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt()); - processDefinition.setTimeout(jsonObject.get("timeout").asInt()); - processDefinition.setGlobalParams(jsonObject.get("globalParams").toString()); - } else { - throw new Exception("It can't find processDefinition, please check !"); - } - Map taskIdCodeMap = new HashMap<>(); - Map> taskNamePreMap = new HashMap<>(); - Map taskNameCodeMap = new HashMap<>(); - Map> processCodeTaskNameCodeMap = new HashMap<>(); - List taskDefinitionLogList = new ArrayList<>(); - ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); - for (int i = 0; i < tasks.size(); i++) { - ObjectNode task = (ObjectNode) tasks.path(i); - ObjectNode param = (ObjectNode) task.get("params"); - TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); - String taskType = task.get("type").asText(); - if (param != null) { - JsonNode resourceJsonNode = param.get("resourceList"); - if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) { - List resourceList = - JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); - List resourceIds = - resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList()); - taskDefinitionLog.setResourceIds(Joiner.on(Constants.COMMA).join(resourceIds)); - } else { - taskDefinitionLog.setResourceIds(""); - } - if (TASK_TYPE_SUB_PROCESS.equals(taskType)) { - JsonNode jsonNodeDefinitionId = param.get("processDefinitionId"); - if (jsonNodeDefinitionId != null) { - param.put("processDefinitionCode", - processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode()); - param.remove("processDefinitionId"); - } - } - param.put("conditionResult", task.get("conditionResult")); - param.put("dependence", task.get("dependence")); - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param)); - } - TaskTimeoutParameter timeout = - JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); - if (timeout != null) { - taskDefinitionLog.setTimeout(timeout.getInterval()); - taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); - taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy()); - } - String desc = task.get("description") != null ? task.get("description").asText() - : task.get("desc") != null ? task.get("desc").asText() : ""; - taskDefinitionLog.setDescription(desc); - taskDefinitionLog.setFlag( - Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO); - taskDefinitionLog.setTaskType(taskType); - taskDefinitionLog.setFailRetryInterval( - TASK_TYPE_SUB_PROCESS.equals(taskType) ? 1 : task.get("retryInterval").asInt()); - taskDefinitionLog.setFailRetryTimes( - TASK_TYPE_SUB_PROCESS.equals(taskType) ? 0 : task.get("maxRetryTimes").asInt()); - taskDefinitionLog.setTaskPriority(JSONUtils - .parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class)); - String name = task.get("name").asText(); - taskDefinitionLog.setName(name); - taskDefinitionLog - .setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText()); - long taskCode = CodeGenerateUtils.getInstance().genCode(); - taskDefinitionLog.setCode(taskCode); - taskDefinitionLog.setVersion(Constants.VERSION_FIRST); - taskDefinitionLog.setProjectCode(processDefinition.getProjectCode()); - taskDefinitionLog.setUserId(processDefinition.getUserId()); - taskDefinitionLog.setEnvironmentCode(-1); - taskDefinitionLog.setDelayTime(0); - taskDefinitionLog.setOperator(1); - taskDefinitionLog.setOperateTime(now); - taskDefinitionLog.setCreateTime(now); - taskDefinitionLog.setUpdateTime(now); - taskDefinitionLogList.add(taskDefinitionLog); - taskIdCodeMap.put(task.get("id").asText(), taskCode); - List preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class); - taskNamePreMap.put(name, preTasks); - taskNameCodeMap.put(name, taskCode); - } - convertConditions(taskDefinitionLogList, taskNameCodeMap); - taskDefinitionLogs.addAll(taskDefinitionLogList); - processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap)); - ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); - processDefinitionLog.setOperator(1); - processDefinitionLog.setOperateTime(now); - processDefinitionLog.setUpdateTime(now); - processDefinitionLogs.add(processDefinitionLog); - handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs); - processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap); - processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap); - } - } - - public void convertConditions(List taskDefinitionLogList, - Map taskNameCodeMap) throws Exception { - for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) { - if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) { - ObjectMapper objectMapper = new ObjectMapper(); - ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); - // reset conditionResult - ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult"); - List successNode = - JSONUtils.toList(conditionResult.get("successNode").toString(), String.class); - List nodeCode = new ArrayList<>(); - successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); - conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); - List failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class); - nodeCode.clear(); - failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); - conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); - // reset dependItemList - ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); - ArrayNode dependTaskList = - JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); - for (int i = 0; i < dependTaskList.size(); i++) { - ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); - ArrayNode dependItemList = - JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); - for (int j = 0; j < dependItemList.size(); j++) { - ObjectNode dependItem = (ObjectNode) dependItemList.path(j); - JsonNode depTasks = dependItem.get("depTasks"); - dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText())); - dependItem.remove("depTasks"); - dependItemList.set(j, dependItem); - } - dependTask.put("dependItemList", dependItemList); - dependTaskList.set(i, dependTask); - } - dependence.put("dependTaskList", dependTaskList); - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); - } - } - } - - private String convertLocations(String locations, Map taskIdCodeMap) { - if (Strings.isNullOrEmpty(locations)) { - return locations; - } - Map locationsMap = - JSONUtils.parseObject(locations, new TypeReference>() { - }); - if (locationsMap == null) { - return locations; + logger.error("Update version error, sql: {}", upgradeSQL, e); + throw new RuntimeException("Upgrade version error, sql: " + upgradeSQL, e); } - ArrayNode jsonNodes = JSONUtils.createArrayNode(); - for (Map.Entry entry : locationsMap.entrySet()) { - ObjectNode nodes = JSONUtils.createObjectNode(); - nodes.put("taskCode", taskIdCodeMap.get(entry.getKey())); - ObjectNode oldNodes = entry.getValue(); - nodes.put("x", oldNodes.get("x").asInt()); - nodes.put("y", oldNodes.get("y").asInt()); - jsonNodes.add(nodes); - } - return jsonNodes.toString(); - } - - public void convertDependence(List taskDefinitionLogs, - Map projectIdCodeMap, - Map>> processTaskMap) { - for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { - if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) { - ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); - ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); - ArrayNode dependTaskList = - JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); - for (int i = 0; i < dependTaskList.size(); i++) { - ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); - ArrayNode dependItemList = - JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); - for (int j = 0; j < dependItemList.size(); j++) { - ObjectNode dependItem = (ObjectNode) dependItemList.path(j); - dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt())); - int definitionId = dependItem.get("definitionId").asInt(); - Map> processCodeTaskNameCodeMap = processTaskMap.get(definitionId); - if (processCodeTaskNameCodeMap == null) { - logger.warn( - "We can't find processDefinition [{}], please check it is not exist, remove this dependence", - definitionId); - dependItemList.remove(j); - continue; - } - Optional>> mapEntry = - processCodeTaskNameCodeMap.entrySet().stream().findFirst(); - if (mapEntry.isPresent()) { - Map.Entry> processCodeTaskNameCodeEntry = mapEntry.get(); - dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey()); - String depTasks = dependItem.get("depTasks").asText(); - long taskCode = - "ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L - : processCodeTaskNameCodeEntry.getValue().get(depTasks); - dependItem.put("depTaskCode", taskCode); - } - dependItem.remove("projectId"); - dependItem.remove("definitionId"); - dependItem.remove("depTasks"); - dependItemList.set(j, dependItem); - } - dependTask.put("dependItemList", dependItemList); - dependTaskList.set(i, dependTask); - } - dependence.put("dependTaskList", dependTaskList); - taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); - } - } - } - - private void handleProcessTaskRelation(Map> taskNamePreMap, - Map taskNameCodeMap, - ProcessDefinition processDefinition, - List processTaskRelationLogs) { - Date now = new Date(); - for (Map.Entry> entry : taskNamePreMap.entrySet()) { - List entryValue = entry.getValue(); - if (CollectionUtils.isNotEmpty(entryValue)) { - for (String preTaskName : entryValue) { - ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); - processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName)); - processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST); - processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); - processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); - processTaskRelationLogs.add(processTaskRelationLog); - } - } else { - ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); - processTaskRelationLog.setPreTaskCode(0); - processTaskRelationLog.setPreTaskVersion(0); - processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); - processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); - processTaskRelationLogs.add(processTaskRelationLog); - } - } - } - - private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) { - ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); - processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); - processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); - processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); - processTaskRelationLog.setConditionType(ConditionType.NONE); - processTaskRelationLog.setConditionParams("{}"); - processTaskRelationLog.setOperator(1); - processTaskRelationLog.setOperateTime(now); - processTaskRelationLog.setCreateTime(now); - processTaskRelationLog.setUpdateTime(now); - return processTaskRelationLog; } } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java index b9d7d0edcf..006a7438df 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java @@ -21,5 +21,5 @@ public interface DolphinSchedulerUpgrader { void doUpgrade(); - String getCurrentVersion(); + DolphinSchedulerVersion getCurrentVersion(); } diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java new file mode 100644 index 0000000000..e28d04fe30 --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.datasource.upgrader; + +import java.util.Optional; + +public enum DolphinSchedulerVersion { + + V1_3_0("1.3.0"), + V1_3_2("1.3.2"), + V2_0_0("2.0.0"), + V3_2_0("3.2.0"), + ; + private final String versionName; + + DolphinSchedulerVersion(String versionName) { + this.versionName = versionName; + } + + public String getVersionName() { + return versionName; + } + + public static Optional getVersion(String versionName) { + for (DolphinSchedulerVersion version : DolphinSchedulerVersion.values()) { + if (version.getVersionName().equals(versionName)) { + return Optional.of(version); + } + } + return Optional.empty(); + } +} diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java new file mode 100644 index 0000000000..27871277bf --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.datasource.upgrader.v130; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion; + +import java.sql.Connection; +import java.util.HashMap; +import java.util.Map; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@Slf4j +@Component +public class V130DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { + + @Autowired + private DataSource dataSource; + + @Override + public void doUpgrade() { + updateProcessDefinitionJsonWorkerGroup(); + } + + private void updateProcessDefinitionJsonWorkerGroup() { + WorkerGroupDao workerGroupDao = new WorkerGroupDao(); + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + Map replaceProcessDefinitionMap = new HashMap<>(); + try (Connection connection = dataSource.getConnection()) { + Map oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(connection); + Map processDefinitionJsonMap = + processDefinitionDao.queryAllProcessDefinition(connection); + + for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { + ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); + ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); + + for (int i = 0; i < tasks.size(); i++) { + ObjectNode task = (ObjectNode) tasks.path(i); + ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId"); + int workerGroupId = -1; + if (workerGroupNode != null && workerGroupNode.canConvertToInt()) { + workerGroupId = workerGroupNode.asInt(-1); + } + if (workerGroupId == -1) { + task.put("workerGroup", "default"); + } else { + task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId)); + } + } + + jsonObject.remove("task"); + + jsonObject.put("tasks", tasks); + + replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString()); + } + if (replaceProcessDefinitionMap.size() > 0) { + processDefinitionDao.updateProcessDefinitionJson(connection, replaceProcessDefinitionMap); + } + } catch (Exception e) { + log.error("update process definition json workergroup error", e); + } + } + + @Override + public DolphinSchedulerVersion getCurrentVersion() { + return DolphinSchedulerVersion.V1_3_0; + } +} diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java new file mode 100644 index 0000000000..abb7045e34 --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.datasource.upgrader.v132; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao; +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion; + +import org.apache.commons.collections4.CollectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@Slf4j +@Component +public class V132DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { + + @Autowired + private DataSource dataSource; + + @Override + public void doUpgrade() { + updateProcessDefinitionJsonResourceList(); + } + + private void updateProcessDefinitionJsonResourceList() { + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + Map replaceProcessDefinitionMap = new HashMap<>(); + try (Connection connection = dataSource.getConnection()) { + Map resourcesMap = listAllResources(connection); + Map processDefinitionJsonMap = + processDefinitionDao.queryAllProcessDefinition(connection); + + for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { + ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); + ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); + + for (int i = 0; i < tasks.size(); i++) { + ObjectNode task = (ObjectNode) tasks.get(i); + ObjectNode param = (ObjectNode) task.get("params"); + if (param != null) { + + List resourceList = + JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); + ResourceInfo mainJar = + JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class); + if (mainJar != null && mainJar.getId() == null) { + String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes() + : String.format("/%s", mainJar.getRes()); + if (resourcesMap.containsKey(fullName)) { + mainJar.setId(resourcesMap.get(fullName)); + param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar))); + } + } + + if (CollectionUtils.isNotEmpty(resourceList)) { + List newResourceList = resourceList.stream().map(resInfo -> { + String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() + : String.format("/%s", resInfo.getRes()); + if (resInfo.getId() == null && resourcesMap.containsKey(fullName)) { + resInfo.setId(resourcesMap.get(fullName)); + } + return resInfo; + }).collect(Collectors.toList()); + param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList))); + } + } + task.put("params", param); + + } + + jsonObject.remove("tasks"); + + jsonObject.put("tasks", tasks); + + replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString()); + } + if (replaceProcessDefinitionMap.size() > 0) { + processDefinitionDao.updateProcessDefinitionJson(connection, replaceProcessDefinitionMap); + } + } catch (Exception e) { + log.error("update process definition json resource list error", e); + } + + } + + /** + * list all resources + * + * @param conn connection + * @return map that key is full_name and value is id + */ + private Map listAllResources(Connection conn) { + Map resourceMap = new HashMap<>(); + + String sql = "SELECT id,full_name FROM t_ds_resources"; + try ( + PreparedStatement pstmt = conn.prepareStatement(sql); + ResultSet rs = pstmt.executeQuery()) { + + while (rs.next()) { + Integer id = rs.getInt(1); + String fullName = rs.getString(2); + resourceMap.put(fullName, id); + } + + } catch (Exception e) { + log.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } + + return resourceMap; + } + + @Override + public DolphinSchedulerVersion getCurrentVersion() { + return DolphinSchedulerVersion.V1_3_2; + } +} diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java new file mode 100644 index 0000000000..c986119d8c --- /dev/null +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.tools.datasource.upgrader.v200; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ConditionType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; +import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao; +import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao; +import org.apache.dolphinscheduler.dao.upgrade.ProjectDao; +import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao; +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; +import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion; + +import org.apache.commons.collections4.CollectionUtils; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import javax.sql.DataSource; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; + +@Slf4j +@Component +public class V200DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { + + @Autowired + private DataSource dataSource; + + @Lazy() + @Autowired + private UpgradeDao upgradeDao; + + @Override + public void doUpgrade() { + processDefinitionJsonSplit(); + upgradeDao.upgradeDolphinSchedulerDDL("2.0.0_schema", "dolphinscheduler_ddl_post.sql"); + } + + private void processDefinitionJsonSplit() { + ProjectDao projectDao = new ProjectDao(); + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + ScheduleDao scheduleDao = new ScheduleDao(); + JsonSplitDao jsonSplitDao = new JsonSplitDao(); + try (Connection connection = dataSource.getConnection()) { + // execute project + Map projectIdCodeMap = projectDao.queryAllProject(connection); + projectDao.updateProjectCode(connection, projectIdCodeMap); + + // execute process definition code + List processDefinitions = + processDefinitionDao.queryProcessDefinition(connection); + processDefinitionDao.updateProcessDefinitionCode(connection, processDefinitions, + projectIdCodeMap); + + // execute schedule + Map allSchedule = scheduleDao.queryAllSchedule(connection); + Map processIdCodeMap = processDefinitions.stream() + .collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode)); + scheduleDao.updateScheduleCode(connection, allSchedule, processIdCodeMap); + + // json split + Map processDefinitionJsonMap = + processDefinitionDao.queryAllProcessDefinition(connection); + List processDefinitionLogs = new ArrayList<>(); + List processTaskRelationLogs = new ArrayList<>(); + List taskDefinitionLogs = new ArrayList<>(); + Map>> processTaskMap = new HashMap<>(); + splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, + processTaskRelationLogs, taskDefinitionLogs, processTaskMap); + convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap); + + // execute json split + jsonSplitDao.executeJsonSplitProcessDefinition(connection, processDefinitionLogs); + jsonSplitDao.executeJsonSplitProcessTaskRelation(connection, processTaskRelationLogs); + jsonSplitDao.executeJsonSplitTaskDefinition(connection, taskDefinitionLogs); + } catch (Exception e) { + log.error("json split error", e); + } + } + + private void splitProcessDefinitionJson(List processDefinitions, + Map processDefinitionJsonMap, + List processDefinitionLogs, + List processTaskRelationLogs, + List taskDefinitionLogs, + Map>> processTaskMap) throws Exception { + Map processDefinitionMap = processDefinitions.stream() + .collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition)); + Date now = new Date(); + for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { + if (entry.getValue() == null) { + throw new Exception("processDefinitionJson is null"); + } + ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); + ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey()); + if (processDefinition != null) { + processDefinition + .setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt()); + processDefinition.setTimeout(jsonObject.get("timeout").asInt()); + processDefinition.setGlobalParams(jsonObject.get("globalParams").toString()); + } else { + throw new Exception("It can't find processDefinition, please check !"); + } + Map taskIdCodeMap = new HashMap<>(); + Map> taskNamePreMap = new HashMap<>(); + Map taskNameCodeMap = new HashMap<>(); + Map> processCodeTaskNameCodeMap = new HashMap<>(); + List taskDefinitionLogList = new ArrayList<>(); + ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); + for (int i = 0; i < tasks.size(); i++) { + ObjectNode task = (ObjectNode) tasks.path(i); + ObjectNode param = (ObjectNode) task.get("params"); + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + String taskType = task.get("type").asText(); + if (param != null) { + JsonNode resourceJsonNode = param.get("resourceList"); + if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) { + List resourceList = + JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); + List resourceIds = + resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList()); + taskDefinitionLog.setResourceIds(Joiner.on(Constants.COMMA).join(resourceIds)); + } else { + taskDefinitionLog.setResourceIds(""); + } + if (TASK_TYPE_SUB_PROCESS.equals(taskType)) { + JsonNode jsonNodeDefinitionId = param.get("processDefinitionId"); + if (jsonNodeDefinitionId != null) { + param.put("processDefinitionCode", + processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode()); + param.remove("processDefinitionId"); + } + } + param.put("conditionResult", task.get("conditionResult")); + param.put("dependence", task.get("dependence")); + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param)); + } + TaskTimeoutParameter timeout = + JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); + if (timeout != null) { + taskDefinitionLog.setTimeout(timeout.getInterval()); + taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); + taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy()); + } + String desc = task.get("description") != null ? task.get("description").asText() + : task.get("desc") != null ? task.get("desc").asText() : ""; + taskDefinitionLog.setDescription(desc); + taskDefinitionLog.setFlag( + Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO); + taskDefinitionLog.setTaskType(taskType); + taskDefinitionLog.setFailRetryInterval( + TASK_TYPE_SUB_PROCESS.equals(taskType) ? 1 : task.get("retryInterval").asInt()); + taskDefinitionLog.setFailRetryTimes( + TASK_TYPE_SUB_PROCESS.equals(taskType) ? 0 : task.get("maxRetryTimes").asInt()); + taskDefinitionLog.setTaskPriority(JSONUtils + .parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class)); + String name = task.get("name").asText(); + taskDefinitionLog.setName(name); + taskDefinitionLog + .setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText()); + long taskCode = CodeGenerateUtils.getInstance().genCode(); + taskDefinitionLog.setCode(taskCode); + taskDefinitionLog.setVersion(Constants.VERSION_FIRST); + taskDefinitionLog.setProjectCode(processDefinition.getProjectCode()); + taskDefinitionLog.setUserId(processDefinition.getUserId()); + taskDefinitionLog.setEnvironmentCode(-1); + taskDefinitionLog.setDelayTime(0); + taskDefinitionLog.setOperator(1); + taskDefinitionLog.setOperateTime(now); + taskDefinitionLog.setCreateTime(now); + taskDefinitionLog.setUpdateTime(now); + taskDefinitionLogList.add(taskDefinitionLog); + taskIdCodeMap.put(task.get("id").asText(), taskCode); + List preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class); + taskNamePreMap.put(name, preTasks); + taskNameCodeMap.put(name, taskCode); + } + convertConditions(taskDefinitionLogList, taskNameCodeMap); + taskDefinitionLogs.addAll(taskDefinitionLogList); + processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap)); + ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); + processDefinitionLog.setOperator(1); + processDefinitionLog.setOperateTime(now); + processDefinitionLog.setUpdateTime(now); + processDefinitionLogs.add(processDefinitionLog); + handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs); + processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap); + processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap); + } + } + + private void convertDependence(List taskDefinitionLogs, + Map projectIdCodeMap, + Map>> processTaskMap) { + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) { + ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); + ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); + ArrayNode dependTaskList = + JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); + for (int i = 0; i < dependTaskList.size(); i++) { + ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); + ArrayNode dependItemList = + JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); + for (int j = 0; j < dependItemList.size(); j++) { + ObjectNode dependItem = (ObjectNode) dependItemList.path(j); + dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt())); + int definitionId = dependItem.get("definitionId").asInt(); + Map> processCodeTaskNameCodeMap = processTaskMap.get(definitionId); + if (processCodeTaskNameCodeMap == null) { + log.warn( + "We can't find processDefinition [{}], please check it is not exist, remove this dependence", + definitionId); + dependItemList.remove(j); + continue; + } + Optional>> mapEntry = + processCodeTaskNameCodeMap.entrySet().stream().findFirst(); + if (mapEntry.isPresent()) { + Map.Entry> processCodeTaskNameCodeEntry = mapEntry.get(); + dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey()); + String depTasks = dependItem.get("depTasks").asText(); + long taskCode = + "ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L + : processCodeTaskNameCodeEntry.getValue().get(depTasks); + dependItem.put("depTaskCode", taskCode); + } + dependItem.remove("projectId"); + dependItem.remove("definitionId"); + dependItem.remove("depTasks"); + dependItemList.set(j, dependItem); + } + dependTask.put("dependItemList", dependItemList); + dependTaskList.set(i, dependTask); + } + dependence.put("dependTaskList", dependTaskList); + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); + } + } + } + + private void convertConditions(List taskDefinitionLogList, + Map taskNameCodeMap) throws Exception { + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) { + if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) { + ObjectMapper objectMapper = new ObjectMapper(); + ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams()); + // reset conditionResult + ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult"); + List successNode = + JSONUtils.toList(conditionResult.get("successNode").toString(), String.class); + List nodeCode = new ArrayList<>(); + successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); + conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); + List failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class); + nodeCode.clear(); + failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node))); + conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode))); + // reset dependItemList + ObjectNode dependence = (ObjectNode) taskParams.get("dependence"); + ArrayNode dependTaskList = + JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); + for (int i = 0; i < dependTaskList.size(); i++) { + ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); + ArrayNode dependItemList = + JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); + for (int j = 0; j < dependItemList.size(); j++) { + ObjectNode dependItem = (ObjectNode) dependItemList.path(j); + JsonNode depTasks = dependItem.get("depTasks"); + dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText())); + dependItem.remove("depTasks"); + dependItemList.set(j, dependItem); + } + dependTask.put("dependItemList", dependItemList); + dependTaskList.set(i, dependTask); + } + dependence.put("dependTaskList", dependTaskList); + taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams)); + } + } + } + + private String convertLocations(String locations, Map taskIdCodeMap) { + if (Strings.isNullOrEmpty(locations)) { + return locations; + } + Map locationsMap = + JSONUtils.parseObject(locations, new TypeReference>() { + }); + if (locationsMap == null) { + return locations; + } + ArrayNode jsonNodes = JSONUtils.createArrayNode(); + for (Map.Entry entry : locationsMap.entrySet()) { + ObjectNode nodes = JSONUtils.createObjectNode(); + nodes.put("taskCode", taskIdCodeMap.get(entry.getKey())); + ObjectNode oldNodes = entry.getValue(); + nodes.put("x", oldNodes.get("x").asInt()); + nodes.put("y", oldNodes.get("y").asInt()); + jsonNodes.add(nodes); + } + return jsonNodes.toString(); + } + + private void handleProcessTaskRelation(Map> taskNamePreMap, + Map taskNameCodeMap, + ProcessDefinition processDefinition, + List processTaskRelationLogs) { + Date now = new Date(); + for (Map.Entry> entry : taskNamePreMap.entrySet()) { + List entryValue = entry.getValue(); + if (CollectionUtils.isNotEmpty(entryValue)) { + for (String preTaskName : entryValue) { + ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); + processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName)); + processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLogs.add(processTaskRelationLog); + } + } else { + ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); + processTaskRelationLog.setPreTaskCode(0); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLogs.add(processTaskRelationLog); + } + } + } + + private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); + processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelationLog.setConditionType(ConditionType.NONE); + processTaskRelationLog.setConditionParams("{}"); + processTaskRelationLog.setOperator(1); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + return processTaskRelationLog; + } + + @Override + public DolphinSchedulerVersion getCurrentVersion() { + return DolphinSchedulerVersion.V2_0_0; + } +} diff --git a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java index 293ec2a61a..32e6b011d6 100644 --- a/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java +++ b/dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader; +import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion; import org.apache.commons.collections4.CollectionUtils; @@ -139,7 +140,7 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader { } @Override - public String getCurrentVersion() { - return "3.2.0"; + public DolphinSchedulerVersion getCurrentVersion() { + return DolphinSchedulerVersion.V3_2_0; } }