Browse Source

Split the upgrade code (#13436)

1. Split the upgrade code into different version
2. Log the dml/ddl sql in origin friendly format
3. Fix ddl of 3.0.0 and 3.1.0
4. Fix search resource regular expression injection
3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
713fdbc444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  2. 18
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java
  3. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
  4. 25
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
  5. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
  6. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
  7. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java
  8. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql
  9. 59
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql
  10. 21
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
  11. 17
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java
  12. 22
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java
  13. 39
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java
  14. 71
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java
  15. 607
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java
  16. 2
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java
  17. 47
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java
  18. 97
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java
  19. 151
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java
  20. 400
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java
  21. 5
      dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -763,7 +763,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String trimmedSearchVal = searchVal != null ? searchVal.trim() : "";
// filter based on trimmed searchVal
List<StorageEntity> filteredResourceList = resourcesList.stream()
.filter(x -> x.getFileName().matches("(.*)" + trimmedSearchVal + "(.*)")).collect(Collectors.toList());
.filter(x -> x.getFileName().contains(trimmedSearchVal)).collect(Collectors.toList());
// inefficient pagination
List<StorageEntity> slicedResourcesList = filteredResourceList.stream().skip((long) (pageNo - 1) * pageSize)
.limit(pageSize).collect(Collectors.toList());

18
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ScriptRunner.java

@ -24,6 +24,8 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -91,17 +93,17 @@ public class ScriptRunner {
* @throws IOException if there is an error reading from the Reader
*/
private void runScript(Connection conn, Reader reader) throws IOException, SQLException {
StringBuffer command = null;
List<String> command = null;
try {
LineNumberReader lineReader = new LineNumberReader(reader);
String line;
while ((line = lineReader.readLine()) != null) {
if (command == null) {
command = new StringBuffer();
command = new ArrayList<>();
}
String trimmedLine = line.trim();
if (trimmedLine.startsWith("--")) {
logger.info(trimmedLine);
logger.info("\n{}", trimmedLine);
} else if (trimmedLine.length() < 1 || trimmedLine.startsWith("//")) {
// Do nothing
} else if (trimmedLine.startsWith("delimiter")) {
@ -110,12 +112,11 @@ public class ScriptRunner {
} else if (!fullLineDelimiter && trimmedLine.endsWith(getDelimiter())
|| fullLineDelimiter && trimmedLine.equals(getDelimiter())) {
command.append(line, 0, line.lastIndexOf(getDelimiter()));
command.append(" ");
logger.info("sql: {}", command);
command.add(line.substring(0, line.lastIndexOf(getDelimiter())));
logger.info("\n{}", String.join("\n", command));
try (Statement statement = conn.createStatement()) {
statement.execute(command.toString());
statement.execute(String.join(" ", command));
try (ResultSet rs = statement.getResultSet()) {
if (stopOnError && rs != null) {
ResultSetMetaData md = rs.getMetaData();
@ -142,8 +143,7 @@ public class ScriptRunner {
command = null;
Thread.yield();
} else {
command.append(line);
command.append(" ");
command.add(line);
}
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
@ -92,8 +91,6 @@ public class JsonSplitDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
@ -160,8 +157,6 @@ public class JsonSplitDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
@ -250,8 +245,6 @@ public class JsonSplitDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

25
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.sql.Connection;
@ -50,11 +49,9 @@ public class ProcessDefinitionDao {
Map<Integer, String> processDefinitionJsonMap = new HashMap<>();
String sql = "SELECT id,process_definition_json FROM t_ds_process_definition";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
@ -65,8 +62,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitionJsonMap;
@ -91,8 +86,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
@ -100,11 +93,9 @@ public class ProcessDefinitionDao {
List<ProcessDefinition> processDefinitions = new ArrayList<>();
String sql =
"SELECT id,code,project_code,user_id,locations,name,description,release_state,flag,create_time FROM t_ds_process_definition";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(rs.getInt(1));
@ -127,8 +118,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitions;
}
@ -164,8 +153,6 @@ public class ProcessDefinitionDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -42,11 +41,9 @@ public class ProjectDao {
public Map<Integer, Long> queryAllProject(Connection conn) {
Map<Integer, Long> projectMap = new HashMap<>();
String sql = "SELECT id,code FROM t_ds_project";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
long code = rs.getLong(2);
@ -58,8 +55,6 @@ public class ProjectDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return projectMap;
}
@ -83,8 +78,6 @@ public class ProjectDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -42,11 +40,9 @@ public class ScheduleDao {
public Map<Integer, Long> queryAllSchedule(Connection conn) {
Map<Integer, Long> scheduleMap = new HashMap<>();
String sql = "SELECT id,process_definition_code FROM t_ds_schedules";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
long processDefinitionCode = rs.getLong(2);
@ -55,8 +51,6 @@ public class ScheduleDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return scheduleMap;
}
@ -92,8 +86,6 @@ public class ScheduleDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/WorkerGroupDao.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -40,12 +38,10 @@ public class WorkerGroupDao {
public Map<Integer, String> queryAllOldWorkerGroup(Connection conn) {
Map<Integer, String> workerGroupMap = new HashMap<>();
String sql = String.format("select id,name from t_ds_worker_group");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
String sql = "select id,name from t_ds_worker_group";
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
int id = rs.getInt(1);
@ -56,8 +52,6 @@ public class WorkerGroupDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return workerGroupMap;

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.0.0_schema/mysql/dolphinscheduler_ddl.sql

@ -469,7 +469,7 @@ BEGIN
ALTER TABLE `t_ds_alert` ADD COLUMN `project_code` bigint DEFAULT NULL COMMENT 'project_code';
ALTER TABLE `t_ds_alert` ADD COLUMN `process_definition_code` bigint DEFAULT NULL COMMENT 'process_definition_code';
ALTER TABLE `t_ds_alert` ADD COLUMN `process_instance_id` int DEFAULT NULL COMMENT 'process_instance_id';
ALTER TABLE `t_ds_alert` MODIFY COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type';
ALTER TABLE `t_ds_alert` ADD COLUMN `alert_type` int DEFAULT NULL COMMENT 'alert_type';
END IF;
END;
d//

59
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.0_schema/mysql/dolphinscheduler_ddl.sql

@ -392,41 +392,46 @@ CALL modify_t_ds_task_group_col_description;
DROP PROCEDURE modify_t_ds_task_group_col_description;
-- alter table `t_ds_worker_group` add `other_params_json` text;
-- alter table `t_ds_process_instance` add `state_history` text;
drop procedure if exists add_column_safety;
drop procedure if exists add_t_ds_task_group_other_params_json;
delimiter d//
create procedure add_column_safety(target_table_name varchar(256), target_column varchar(256),
target_column_type varchar(256), sths_else varchar(256))
begin
declare target_database varchar(256);
select database() into target_database;
IF EXISTS(SELECT *
FROM information_schema.COLUMNS
WHERE COLUMN_NAME = target_column
AND TABLE_NAME = target_table_name
)
CREATE PROCEDURE add_t_ds_task_group_other_params_json()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME='t_ds_worker_group'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='other_params_json')
THEN
set @statement =
concat('alter table ', target_table_name, ' change column ', target_column, ' ', target_column, ' ',
target_column_type, ' ',
sths_else);
PREPARE STMT_c FROM @statement;
EXECUTE STMT_c;
alter table `t_ds_worker_group` add column `other_params_json` text DEFAULT NULL COMMENT "other params json";
ELSE
set @statement =
concat('alter table ', target_table_name, ' add column ', target_column, ' ', target_column_type, ' ',
sths_else);
PREPARE STMT_a FROM @statement;
EXECUTE STMT_a;
alter table `t_ds_worker_group` modify column `other_params_json` text DEFAULT NULL COMMENT "other params json";
END IF;
end;
END;
d//
delimiter ;
call add_column_safety('t_ds_worker_group','other_params_json', 'text' , "DEFAULT NULL COMMENT 'other params json'");
call add_column_safety('t_ds_process_instance','state_history', 'text' , "DEFAULT NULL COMMENT 'state history desc' AFTER `state`");
call add_t_ds_task_group_other_params_json();
drop procedure if exists add_t_ds_task_group_other_params_json;
-- alter table `t_ds_process_instance` add `state_history` text;
drop procedure if exists add_t_ds_process_instance_state_history;
delimiter d//
CREATE PROCEDURE add_t_ds_process_instance_state_history()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='state_history')
THEN
alter table `t_ds_process_instance` add column `state_history` text DEFAULT NULL COMMENT "other params json";
ELSE
alter table `t_ds_process_instance` modify column `state_history` text DEFAULT NULL COMMENT "other params json";
END IF;
END;
d//
delimiter ;
call add_t_ds_process_instance_state_history();
drop procedure if exists add_t_ds_process_instance_state_history;
drop procedure if exists add_column_safety;
alter table t_ds_process_instance alter column process_instance_priority set default 2;
alter table t_ds_schedules alter column process_instance_priority set default 2;

21
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql

@ -65,5 +65,22 @@ d//
delimiter ;
-- ALTER TABLE t_ds_worker_group ADD COLUMN description varchar(255) DEFAULT NULL COMMENT 'ds worker group description';
call add_column_safety('t_ds_worker_group','description', 'varchar(255)' , "DEFAULT NULL COMMENT 'ds worker group description'");
drop procedure if exists add_column_safety;
drop procedure if exists modify_t_ds_worker_group_description;
delimiter d//
CREATE PROCEDURE modify_t_ds_worker_group_description()
BEGIN
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME='t_ds_worker_group'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='description')
THEN
alter table `t_ds_worker_group` add column `description` varchar(255) DEFAULT NULL COMMENT "ds worker group description";
ELSE
alter table `t_ds_worker_group` modify column `description` varchar(255) DEFAULT NULL COMMENT "ds worker group description";
END IF;
END;
d//
delimiter ;
call modify_t_ds_worker_group_description();
drop procedure if exists modify_t_ds_worker_group_description;

17
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/DolphinSchedulerManager.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
@ -45,7 +46,7 @@ public class DolphinSchedulerManager {
private final UpgradeDao upgradeDao;
private Map<String, DolphinSchedulerUpgrader> upgraderMap = new HashMap<>();
private Map<DolphinSchedulerVersion, DolphinSchedulerUpgrader> upgraderMap = new HashMap<>();
public DolphinSchedulerManager(DataSource dataSource, List<UpgradeDao> daos,
List<DolphinSchedulerUpgrader> dolphinSchedulerUpgraders) throws Exception {
@ -121,21 +122,11 @@ public class DolphinSchedulerManager {
logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion);
logger.info("Begin upgrading DolphinScheduler's table structure");
upgradeDao.upgradeDolphinScheduler(schemaDir);
if ("1.3.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList();
} else if ("2.0.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerTo200(schemaDir);
}
DolphinSchedulerUpgrader dolphinSchedulerUpgrader = upgraderMap.get(schemaVersion);
if (dolphinSchedulerUpgrader != null) {
dolphinSchedulerUpgrader.doUpgrade();
}
DolphinSchedulerVersion.getVersion(schemaVersion).ifPresent(v -> upgraderMap.get(v).doUpgrade());
version = schemaVersion;
}
}
// todo: do we need to do this in all version > 2.0.6?
if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion)
&& SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceFileSize();

22
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/MySQLUpgradeDao.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
@ -56,17 +55,13 @@ public class MySQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsTable(String tableName) {
ResultSet rs = null;
Connection conn = null;
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getTables(conn.getCatalog(), conn.getSchema(), tableName, null);
try (
Connection conn = dataSource.getConnection();
ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), conn.getSchema(), tableName, null)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);
}
}
@ -79,19 +74,16 @@ public class MySQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsColumn(String tableName, String columnName) {
Connection conn = null;
try {
conn = dataSource.getConnection();
ResultSet rs = conn.getMetaData().getColumns(conn.getCatalog(), conn.getSchema(), tableName, columnName);
try (
Connection conn = dataSource.getConnection();
ResultSet rs =
conn.getMetaData().getColumns(conn.getCatalog(), conn.getSchema(), tableName, columnName)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

39
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/PostgreSQLUpgradeDao.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
@ -51,13 +50,10 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
}
public String getSchema() {
Connection conn = null;
PreparedStatement pstmt = null;
ResultSet resultSet = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement("select current_schema()");
resultSet = pstmt.executeQuery();
try (
Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement("select current_schema()");
ResultSet resultSet = pstmt.executeQuery()) {
while (resultSet.next()) {
if (resultSet.isFirst()) {
return resultSet.getString(1);
@ -66,8 +62,6 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(resultSet, pstmt, conn);
}
return "";
}
@ -80,21 +74,14 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsTable(String tableName) {
Connection conn = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getTables(conn.getCatalog(), getSchema(), tableName, null);
try (
Connection conn = dataSource.getConnection();
ResultSet rs = conn.getMetaData().getTables(conn.getCatalog(), getSchema(), tableName, null)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);
}
}
/**
@ -106,20 +93,14 @@ public class PostgreSQLUpgradeDao extends UpgradeDao {
*/
@Override
public boolean isExistsColumn(String tableName, String columnName) {
Connection conn = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
rs = conn.getMetaData().getColumns(conn.getCatalog(), getSchema(), tableName, columnName);
try (
Connection conn = dataSource.getConnection();
ResultSet rs = conn.getMetaData().getColumns(conn.getCatalog(), getSchema(), tableName, columnName)) {
return rs.next();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(rs, conn);
}
}
}

71
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/ResourceDao.java

@ -17,18 +17,14 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,38 +39,6 @@ public class ResourceDao {
public static final Logger logger = LoggerFactory.getLogger(ResourceDao.class);
/**
* list all resources
*
* @param conn connection
* @return map that key is full_name and value is id
*/
Map<String, Integer> listAllResources(Connection conn) {
Map<String, Integer> resourceMap = new HashMap<>();
String sql = String.format("SELECT id,full_name FROM t_ds_resources");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
Integer id = rs.getInt(1);
String fullName = rs.getString(2);
resourceMap.put(fullName, id);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return resourceMap;
}
/**
* list all resources by the type
*
@ -86,15 +50,13 @@ public class ResourceDao {
String sql =
String.format("SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d", type);
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
String fullName = rs.getString("full_name");
Boolean isDirectory = rs.getBoolean("is_directory");
boolean isDirectory = rs.getBoolean("is_directory");
long fileSize = rs.getLong("size");
if (StringUtils.isNotBlank(fullName) && !isDirectory) {
@ -111,16 +73,6 @@ public class ResourceDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
if (Objects.nonNull(pstmt)) {
try {
if (!pstmt.isClosed()) {
pstmt.close();
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
return resourceSizeMap;
}
@ -134,9 +86,7 @@ public class ResourceDao {
Map<String, Long> resourceSizeMap = listAllResourcesByFileType(conn, type);
String sql = "UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true";
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
for (Map.Entry<String, Long> entry : resourceSizeMap.entrySet()) {
pstmt.setLong(1, entry.getValue());
pstmt.setInt(2, type);
@ -147,17 +97,6 @@ public class ResourceDao {
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
if (Objects.nonNull(pstmt)) {
try {
if (!pstmt.isClosed()) {
pstmt.close();
}
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
ConnectionUtils.releaseResource(conn);
}
}

607
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/dao/UpgradeDao.java

@ -17,50 +17,17 @@
package org.apache.dolphinscheduler.tools.datasource.dao;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao;
import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.upgrade.ProjectDao;
import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao;
import org.apache.dolphinscheduler.dao.upgrade.SchemaUtils;
import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.CollectionUtils;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.sql.DataSource;
@ -69,14 +36,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
public abstract class UpgradeDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
@ -100,6 +59,7 @@ public abstract class UpgradeDao {
/**
* run init sql to init db schema
*
* @param dbType db type
*/
private void runInitSql(DbType dbType) {
@ -112,8 +72,8 @@ public abstract class UpgradeDao {
initScriptRunner.runScript(initSqlReader);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
logger.error("Execute init sql file: {} error", sqlFile, e);
throw new RuntimeException(String.format("Execute init sql file: %s error", sqlFile), e);
}
}
@ -123,56 +83,24 @@ public abstract class UpgradeDao {
public String getCurrentVersion(String versionName) {
String sql = String.format("select version from %s", versionName);
Connection conn = null;
ResultSet rs = null;
PreparedStatement pstmt = null;
String version = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
try (
Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
if (rs.next()) {
version = rs.getString(1);
}
return version;
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
logger.error("Get current version from database error, sql: {}", sql, e);
throw new RuntimeException("Get current version from database error, sql: " + sql, e);
}
}
public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDML(schemaDir);
}
/**
* upgrade DolphinScheduler worker group
* ds-1.3.0 modify the worker group for process definition json
*/
public void upgradeDolphinSchedulerWorkerGroup() {
updateProcessDefinitionJsonWorkerGroup();
}
/**
* upgrade DolphinScheduler resource list
* ds-1.3.2 modify the resource list for process definition json
*/
public void upgradeDolphinSchedulerResourceList() {
updateProcessDefinitionJsonResourceList();
}
/**
* upgrade DolphinScheduler to 2.0.0
*/
public void upgradeDolphinSchedulerTo200(String schemaDir) {
processDefinitionJsonSplit();
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
upgradeDolphinSchedulerDML(schemaDir, "dolphinscheduler_dml.sql");
}
/**
@ -180,183 +108,51 @@ public abstract class UpgradeDao {
*/
public void upgradeDolphinSchedulerResourceFileSize() {
ResourceDao resourceDao = new ResourceDao();
try {
try (Connection conn = dataSource.getConnection()) {
// update the size of the folder that is the type of file.
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0);
resourceDao.updateResourceFolderSizeByFileType(conn, 0);
// update the size of the folder that is the type of udf.
resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1);
resourceDao.updateResourceFolderSizeByFileType(conn, 1);
} catch (Exception ex) {
logger.error("Failed to upgrade because of failing to update the folder's size of resource files.");
}
}
/**
* updateProcessDefinitionJsonWorkerGroup
*/
protected void updateProcessDefinitionJsonWorkerGroup() {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
int workerGroupId = -1;
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
} else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
}
}
jsonObject.remove("task");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),
replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json workergroup error", e);
}
}
protected void updateProcessDefinitionJsonResourceList() {
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<String, Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.get(i);
ObjectNode param = (ObjectNode) task.get("params");
if (param != null) {
List<ResourceInfo> resourceList =
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar =
JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == null) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes()
: String.format("/%s", mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes()
: String.format("/%s", resInfo.getRes());
if (resInfo.getId() == null && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
task.put("params", param);
}
jsonObject.remove("tasks");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),
replaceProcessDefinitionMap);
}
} catch (Exception e) {
logger.error("update process definition json resource list error", e);
}
}
private void upgradeDolphinSchedulerDML(String schemaDir) {
private void upgradeDolphinSchedulerDML(String schemaDir, String scriptFile) {
String schemaVersion = schemaDir.split("_")[0];
Resource sqlFilePath = new ClassPathResource(String.format("sql/upgrade/%s/%s/dolphinscheduler_dml.sql",
schemaDir, getDbType().name().toLowerCase()));
logger.info("sqlSQLFilePath: {}", sqlFilePath);
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
Resource sqlFilePath = new ClassPathResource(
String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
// Execute the upgraded dolphinscheduler dml
ScriptRunner scriptRunner = new ScriptRunner(conn, false, true);
try (Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream())) {
scriptRunner.runScript(sqlReader);
String upgradeSQL;
if (isExistsTable(T_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
upgradeSQL = String.format("update %s set version = ?", T_VERSION_NAME);
} else if (isExistsTable(T_NEW_VERSION_NAME)) {
// Change version in the version table to the new version
String upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME);
pstmt = conn.prepareStatement(upgradeSQL);
upgradeSQL = String.format("update %s set version = ?", T_NEW_VERSION_NAME);
} else {
throw new RuntimeException("The version table does not exist");
}
try (PreparedStatement pstmt = conn.prepareStatement(upgradeSQL)) {
pstmt.setString(1, schemaVersion);
pstmt.executeUpdate();
}
conn.commit();
}
logger.info("Success execute the dml file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile);
} catch (FileNotFoundException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
logger.error("Cannot find the DDL file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile, e);
throw new RuntimeException("sql file not found ", e);
} catch (IOException e) {
try {
conn.rollback();
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} catch (Exception e) {
try {
if (null != conn) {
conn.rollback();
}
} catch (SQLException e1) {
logger.error(e1.getMessage(), e1);
}
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
logger.error("Execute ddl file failed, meet an unknown exception, schemaDir: {}, ddlScript: {}", schemaDir,
scriptFile, e);
throw new RuntimeException("Execute ddl file failed, meet an unknown exception", e);
}
}
/**
@ -364,30 +160,24 @@ public abstract class UpgradeDao {
*
* @param schemaDir schemaDir
*/
private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
public void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
Resource sqlFilePath = new ClassPathResource(
String.format("sql/upgrade/%s/%s/%s", schemaDir, getDbType().name().toLowerCase(), scriptFile));
Connection conn = null;
PreparedStatement pstmt = null;
try {
conn = dataSource.getConnection();
String dbName = conn.getCatalog();
logger.info(dbName);
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(true);
// Execute the dolphinscheduler ddl.sql for the upgrade
ScriptRunner scriptRunner = new ScriptRunner(conn, true, true);
try (Reader sqlReader = new InputStreamReader(sqlFilePath.getInputStream())) {
scriptRunner.runScript(sqlReader);
}
logger.info("Success execute the ddl file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile);
} catch (FileNotFoundException e) {
logger.error(e.getMessage(), e);
logger.error("Cannot find the DDL file, schemaDir: {}, ddlScript: {}", schemaDir, scriptFile, e);
throw new RuntimeException("sql file not found ", e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage(), e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
logger.error("Execute ddl file failed, meet an unknown exception, schemaDir: {}, ddlScript: {}", schemaDir,
scriptFile, e);
throw new RuntimeException("Execute ddl file failed, meet an unknown exception", e);
}
}
@ -403,330 +193,15 @@ public abstract class UpgradeDao {
versionName = "t_ds_version";
}
String upgradeSQL = String.format("update %s set version = ?", versionName);
PreparedStatement pstmt = null;
Connection conn = null;
try {
conn = dataSource.getConnection();
pstmt = conn.prepareStatement(upgradeSQL);
try (
Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(upgradeSQL)) {
pstmt.setString(1, version);
pstmt.executeUpdate();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + upgradeSQL, e);
} finally {
ConnectionUtils.releaseResource(pstmt, conn);
}
}
/**
* upgrade DolphinScheduler to 2.0.0, json split
*/
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
try {
// execute project
Map<Integer, Long> projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection());
projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap);
// execute process definition code
List<ProcessDefinition> processDefinitions =
processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions,
projectIdCodeMap);
// execute schedule
Map<Integer, Long> allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection());
Map<Integer, Long> processIdCodeMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode));
scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap);
// json split
Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
List<ProcessDefinitionLog> processDefinitionLogs = new ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new HashMap<>();
splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs,
processTaskRelationLogs, taskDefinitionLogs, processTaskMap);
convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs);
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs);
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs);
} catch (Exception e) {
logger.error("json split error", e);
}
}
private void splitProcessDefinitionJson(List<ProcessDefinition> processDefinitions,
Map<Integer, String> processDefinitionJsonMap,
List<ProcessDefinitionLog> processDefinitionLogs,
List<ProcessTaskRelationLog> processTaskRelationLogs,
List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
throw new Exception("processDefinitionJson is null");
}
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey());
if (processDefinition != null) {
processDefinition
.setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt());
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
} else {
throw new Exception("It can't find processDefinition, please check !");
}
Map<String, Long> taskIdCodeMap = new HashMap<>();
Map<String, List<String>> taskNamePreMap = new HashMap<>();
Map<String, Long> taskNameCodeMap = new HashMap<>();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new HashMap<>();
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
String taskType = task.get("type").asText();
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
List<ResourceInfo> resourceList =
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List<Integer> resourceIds =
resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(Joiner.on(Constants.COMMA).join(resourceIds));
} else {
taskDefinitionLog.setResourceIds("");
}
if (TASK_TYPE_SUB_PROCESS.equals(taskType)) {
JsonNode jsonNodeDefinitionId = param.get("processDefinitionId");
if (jsonNodeDefinitionId != null) {
param.put("processDefinitionCode",
processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());
param.remove("processDefinitionId");
}
}
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
}
TaskTimeoutParameter timeout =
JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
taskDefinitionLog.setTimeout(timeout.getInterval());
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
String desc = task.get("description") != null ? task.get("description").asText()
: task.get("desc") != null ? task.get("desc").asText() : "";
taskDefinitionLog.setDescription(desc);
taskDefinitionLog.setFlag(
Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO);
taskDefinitionLog.setTaskType(taskType);
taskDefinitionLog.setFailRetryInterval(
TASK_TYPE_SUB_PROCESS.equals(taskType) ? 1 : task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(
TASK_TYPE_SUB_PROCESS.equals(taskType) ? 0 : task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils
.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class));
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog
.setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText());
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
taskDefinitionLog.setUserId(processDefinition.getUserId());
taskDefinitionLog.setEnvironmentCode(-1);
taskDefinitionLog.setDelayTime(0);
taskDefinitionLog.setOperator(1);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode);
List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
convertConditions(taskDefinitionLogList, taskNameCodeMap);
taskDefinitionLogs.addAll(taskDefinitionLogList);
processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs);
processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap);
processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
}
}
public void convertConditions(List<TaskDefinitionLog> taskDefinitionLogList,
Map<String, Long> taskNameCodeMap) throws Exception {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
// reset conditionResult
ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult");
List<String> successNode =
JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
List<Long> nodeCode = new ArrayList<>();
successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
List<String> failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
nodeCode.clear();
failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
// reset dependItemList
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
JsonNode depTasks = dependItem.get("depTasks");
dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText()));
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private String convertLocations(String locations, Map<String, Long> taskIdCodeMap) {
if (Strings.isNullOrEmpty(locations)) {
return locations;
}
Map<String, ObjectNode> locationsMap =
JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
});
if (locationsMap == null) {
return locations;
logger.error("Update version error, sql: {}", upgradeSQL, e);
throw new RuntimeException("Upgrade version error, sql: " + upgradeSQL, e);
}
ArrayNode jsonNodes = JSONUtils.createArrayNode();
for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
ObjectNode nodes = JSONUtils.createObjectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
ObjectNode oldNodes = entry.getValue();
nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes);
}
return jsonNodes.toString();
}
public void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Long> projectIdCodeMap,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) {
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt()));
int definitionId = dependItem.get("definitionId").asInt();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
if (processCodeTaskNameCodeMap == null) {
logger.warn(
"We can't find processDefinition [{}], please check it is not exist, remove this dependence",
definitionId);
dependItemList.remove(j);
continue;
}
Optional<Map.Entry<Long, Map<String, Long>>> mapEntry =
processCodeTaskNameCodeMap.entrySet().stream().findFirst();
if (mapEntry.isPresent()) {
Map.Entry<Long, Map<String, Long>> processCodeTaskNameCodeEntry = mapEntry.get();
dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey());
String depTasks = dependItem.get("depTasks").asText();
long taskCode =
"ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L
: processCodeTaskNameCodeEntry.getValue().get(depTasks);
dependItem.put("depTaskCode", taskCode);
}
dependItem.remove("projectId");
dependItem.remove("definitionId");
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private void handleProcessTaskRelation(Map<String, List<String>> taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> processTaskRelationLogs) {
Date now = new Date();
for (Map.Entry<String, List<String>> entry : taskNamePreMap.entrySet()) {
List<String> entryValue = entry.getValue();
if (CollectionUtils.isNotEmpty(entryValue)) {
for (String preTaskName : entryValue) {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
} else {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
}
}
private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setOperator(1);
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
}
}

2
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerUpgrader.java

@ -21,5 +21,5 @@ public interface DolphinSchedulerUpgrader {
void doUpgrade();
String getCurrentVersion();
DolphinSchedulerVersion getCurrentVersion();
}

47
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/DolphinSchedulerVersion.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.datasource.upgrader;
import java.util.Optional;
public enum DolphinSchedulerVersion {
V1_3_0("1.3.0"),
V1_3_2("1.3.2"),
V2_0_0("2.0.0"),
V3_2_0("3.2.0"),
;
private final String versionName;
DolphinSchedulerVersion(String versionName) {
this.versionName = versionName;
}
public String getVersionName() {
return versionName;
}
public static Optional<DolphinSchedulerVersion> getVersion(String versionName) {
for (DolphinSchedulerVersion version : DolphinSchedulerVersion.values()) {
if (version.getVersionName().equals(versionName)) {
return Optional.of(version);
}
}
return Optional.empty();
}
}

97
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v130/V130DolphinSchedulerUpgrader.java

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.datasource.upgrader.v130;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.upgrade.WorkerGroupDao;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@Slf4j
@Component
public class V130DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
@Autowired
private DataSource dataSource;
@Override
public void doUpgrade() {
updateProcessDefinitionJsonWorkerGroup();
}
private void updateProcessDefinitionJsonWorkerGroup() {
WorkerGroupDao workerGroupDao = new WorkerGroupDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try (Connection connection = dataSource.getConnection()) {
Map<Integer, String> oldWorkerGroupMap = workerGroupDao.queryAllOldWorkerGroup(connection);
Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(connection);
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode workerGroupNode = (ObjectNode) task.path("workerGroupId");
int workerGroupId = -1;
if (workerGroupNode != null && workerGroupNode.canConvertToInt()) {
workerGroupId = workerGroupNode.asInt(-1);
}
if (workerGroupId == -1) {
task.put("workerGroup", "default");
} else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
}
}
jsonObject.remove("task");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(connection, replaceProcessDefinitionMap);
}
} catch (Exception e) {
log.error("update process definition json workergroup error", e);
}
}
@Override
public DolphinSchedulerVersion getCurrentVersion() {
return DolphinSchedulerVersion.V1_3_0;
}
}

151
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v132/V132DolphinSchedulerUpgrader.java

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.datasource.upgrader.v132;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@Slf4j
@Component
public class V132DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
@Autowired
private DataSource dataSource;
@Override
public void doUpgrade() {
updateProcessDefinitionJsonResourceList();
}
private void updateProcessDefinitionJsonResourceList() {
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer, String> replaceProcessDefinitionMap = new HashMap<>();
try (Connection connection = dataSource.getConnection()) {
Map<String, Integer> resourcesMap = listAllResources(connection);
Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(connection);
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.get(i);
ObjectNode param = (ObjectNode) task.get("params");
if (param != null) {
List<ResourceInfo> resourceList =
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
ResourceInfo mainJar =
JSONUtils.parseObject(param.get("mainJar").toString(), ResourceInfo.class);
if (mainJar != null && mainJar.getId() == null) {
String fullName = mainJar.getRes().startsWith("/") ? mainJar.getRes()
: String.format("/%s", mainJar.getRes());
if (resourcesMap.containsKey(fullName)) {
mainJar.setId(resourcesMap.get(fullName));
param.put("mainJar", JSONUtils.parseObject(JSONUtils.toJsonString(mainJar)));
}
}
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes()
: String.format("/%s", resInfo.getRes());
if (resInfo.getId() == null && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList", JSONUtils.parseObject(JSONUtils.toJsonString(newResourceList)));
}
}
task.put("params", param);
}
jsonObject.remove("tasks");
jsonObject.put("tasks", tasks);
replaceProcessDefinitionMap.put(entry.getKey(), jsonObject.toString());
}
if (replaceProcessDefinitionMap.size() > 0) {
processDefinitionDao.updateProcessDefinitionJson(connection, replaceProcessDefinitionMap);
}
} catch (Exception e) {
log.error("update process definition json resource list error", e);
}
}
/**
* list all resources
*
* @param conn connection
* @return map that key is full_name and value is id
*/
private Map<String, Integer> listAllResources(Connection conn) {
Map<String, Integer> resourceMap = new HashMap<>();
String sql = "SELECT id,full_name FROM t_ds_resources";
try (
PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
Integer id = rs.getInt(1);
String fullName = rs.getString(2);
resourceMap.put(fullName, id);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
}
return resourceMap;
}
@Override
public DolphinSchedulerVersion getCurrentVersion() {
return DolphinSchedulerVersion.V1_3_2;
}
}

400
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v200/V200DolphinSchedulerUpgrader.java

@ -0,0 +1,400 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.tools.datasource.upgrader.v200;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.upgrade.JsonSplitDao;
import org.apache.dolphinscheduler.dao.upgrade.ProcessDefinitionDao;
import org.apache.dolphinscheduler.dao.upgrade.ProjectDao;
import org.apache.dolphinscheduler.dao.upgrade.ScheduleDao;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.tools.datasource.dao.UpgradeDao;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
@Slf4j
@Component
public class V200DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
@Autowired
private DataSource dataSource;
@Lazy()
@Autowired
private UpgradeDao upgradeDao;
@Override
public void doUpgrade() {
processDefinitionJsonSplit();
upgradeDao.upgradeDolphinSchedulerDDL("2.0.0_schema", "dolphinscheduler_ddl_post.sql");
}
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
try (Connection connection = dataSource.getConnection()) {
// execute project
Map<Integer, Long> projectIdCodeMap = projectDao.queryAllProject(connection);
projectDao.updateProjectCode(connection, projectIdCodeMap);
// execute process definition code
List<ProcessDefinition> processDefinitions =
processDefinitionDao.queryProcessDefinition(connection);
processDefinitionDao.updateProcessDefinitionCode(connection, processDefinitions,
projectIdCodeMap);
// execute schedule
Map<Integer, Long> allSchedule = scheduleDao.queryAllSchedule(connection);
Map<Integer, Long> processIdCodeMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode));
scheduleDao.updateScheduleCode(connection, allSchedule, processIdCodeMap);
// json split
Map<Integer, String> processDefinitionJsonMap =
processDefinitionDao.queryAllProcessDefinition(connection);
List<ProcessDefinitionLog> processDefinitionLogs = new ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap = new HashMap<>();
splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs,
processTaskRelationLogs, taskDefinitionLogs, processTaskMap);
convertDependence(taskDefinitionLogs, projectIdCodeMap, processTaskMap);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(connection, processDefinitionLogs);
jsonSplitDao.executeJsonSplitProcessTaskRelation(connection, processTaskRelationLogs);
jsonSplitDao.executeJsonSplitTaskDefinition(connection, taskDefinitionLogs);
} catch (Exception e) {
log.error("json split error", e);
}
}
private void splitProcessDefinitionJson(List<ProcessDefinition> processDefinitions,
Map<Integer, String> processDefinitionJsonMap,
List<ProcessDefinitionLog> processDefinitionLogs,
List<ProcessTaskRelationLog> processTaskRelationLogs,
List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
throw new Exception("processDefinitionJson is null");
}
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey());
if (processDefinition != null) {
processDefinition
.setTenantId(jsonObject.get("tenantId") == null ? -1 : jsonObject.get("tenantId").asInt());
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
} else {
throw new Exception("It can't find processDefinition, please check !");
}
Map<String, Long> taskIdCodeMap = new HashMap<>();
Map<String, List<String>> taskNamePreMap = new HashMap<>();
Map<String, Long> taskNameCodeMap = new HashMap<>();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = new HashMap<>();
List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
String taskType = task.get("type").asText();
if (param != null) {
JsonNode resourceJsonNode = param.get("resourceList");
if (resourceJsonNode != null && !resourceJsonNode.isEmpty()) {
List<ResourceInfo> resourceList =
JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
List<Integer> resourceIds =
resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(Joiner.on(Constants.COMMA).join(resourceIds));
} else {
taskDefinitionLog.setResourceIds("");
}
if (TASK_TYPE_SUB_PROCESS.equals(taskType)) {
JsonNode jsonNodeDefinitionId = param.get("processDefinitionId");
if (jsonNodeDefinitionId != null) {
param.put("processDefinitionCode",
processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());
param.remove("processDefinitionId");
}
}
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(param));
}
TaskTimeoutParameter timeout =
JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
taskDefinitionLog.setTimeout(timeout.getInterval());
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
String desc = task.get("description") != null ? task.get("description").asText()
: task.get("desc") != null ? task.get("desc").asText() : "";
taskDefinitionLog.setDescription(desc);
taskDefinitionLog.setFlag(
Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").asText()) ? Flag.YES : Flag.NO);
taskDefinitionLog.setTaskType(taskType);
taskDefinitionLog.setFailRetryInterval(
TASK_TYPE_SUB_PROCESS.equals(taskType) ? 1 : task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(
TASK_TYPE_SUB_PROCESS.equals(taskType) ? 0 : task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils
.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class));
String name = task.get("name").asText();
taskDefinitionLog.setName(name);
taskDefinitionLog
.setWorkerGroup(task.get("workerGroup") == null ? "default" : task.get("workerGroup").asText());
long taskCode = CodeGenerateUtils.getInstance().genCode();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
taskDefinitionLog.setUserId(processDefinition.getUserId());
taskDefinitionLog.setEnvironmentCode(-1);
taskDefinitionLog.setDelayTime(0);
taskDefinitionLog.setOperator(1);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogList.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").asText(), taskCode);
List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
convertConditions(taskDefinitionLogList, taskNameCodeMap);
taskDefinitionLogs.addAll(taskDefinitionLogList);
processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs);
processCodeTaskNameCodeMap.put(processDefinition.getCode(), taskNameCodeMap);
processTaskMap.put(entry.getKey(), processCodeTaskNameCodeMap);
}
}
private void convertDependence(List<TaskDefinitionLog> taskDefinitionLogs,
Map<Integer, Long> projectIdCodeMap,
Map<Integer, Map<Long, Map<String, Long>>> processTaskMap) {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
if (TASK_TYPE_DEPENDENT.equals(taskDefinitionLog.getTaskType())) {
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
dependItem.put("projectCode", projectIdCodeMap.get(dependItem.get("projectId").asInt()));
int definitionId = dependItem.get("definitionId").asInt();
Map<Long, Map<String, Long>> processCodeTaskNameCodeMap = processTaskMap.get(definitionId);
if (processCodeTaskNameCodeMap == null) {
log.warn(
"We can't find processDefinition [{}], please check it is not exist, remove this dependence",
definitionId);
dependItemList.remove(j);
continue;
}
Optional<Map.Entry<Long, Map<String, Long>>> mapEntry =
processCodeTaskNameCodeMap.entrySet().stream().findFirst();
if (mapEntry.isPresent()) {
Map.Entry<Long, Map<String, Long>> processCodeTaskNameCodeEntry = mapEntry.get();
dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey());
String depTasks = dependItem.get("depTasks").asText();
long taskCode =
"ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L
: processCodeTaskNameCodeEntry.getValue().get(depTasks);
dependItem.put("depTaskCode", taskCode);
}
dependItem.remove("projectId");
dependItem.remove("definitionId");
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private void convertConditions(List<TaskDefinitionLog> taskDefinitionLogList,
Map<String, Long> taskNameCodeMap) throws Exception {
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogList) {
if (TASK_TYPE_CONDITIONS.equals(taskDefinitionLog.getTaskType())) {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode taskParams = JSONUtils.parseObject(taskDefinitionLog.getTaskParams());
// reset conditionResult
ObjectNode conditionResult = (ObjectNode) taskParams.get("conditionResult");
List<String> successNode =
JSONUtils.toList(conditionResult.get("successNode").toString(), String.class);
List<Long> nodeCode = new ArrayList<>();
successNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("successNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
List<String> failedNode = JSONUtils.toList(conditionResult.get("failedNode").toString(), String.class);
nodeCode.clear();
failedNode.forEach(node -> nodeCode.add(taskNameCodeMap.get(node)));
conditionResult.set("failedNode", objectMapper.readTree(objectMapper.writeValueAsString(nodeCode)));
// reset dependItemList
ObjectNode dependence = (ObjectNode) taskParams.get("dependence");
ArrayNode dependTaskList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
for (int i = 0; i < dependTaskList.size(); i++) {
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
ArrayNode dependItemList =
JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
for (int j = 0; j < dependItemList.size(); j++) {
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
JsonNode depTasks = dependItem.get("depTasks");
dependItem.put("depTaskCode", taskNameCodeMap.get(depTasks.asText()));
dependItem.remove("depTasks");
dependItemList.set(j, dependItem);
}
dependTask.put("dependItemList", dependItemList);
dependTaskList.set(i, dependTask);
}
dependence.put("dependTaskList", dependTaskList);
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(taskParams));
}
}
}
private String convertLocations(String locations, Map<String, Long> taskIdCodeMap) {
if (Strings.isNullOrEmpty(locations)) {
return locations;
}
Map<String, ObjectNode> locationsMap =
JSONUtils.parseObject(locations, new TypeReference<Map<String, ObjectNode>>() {
});
if (locationsMap == null) {
return locations;
}
ArrayNode jsonNodes = JSONUtils.createArrayNode();
for (Map.Entry<String, ObjectNode> entry : locationsMap.entrySet()) {
ObjectNode nodes = JSONUtils.createObjectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
ObjectNode oldNodes = entry.getValue();
nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes);
}
return jsonNodes.toString();
}
private void handleProcessTaskRelation(Map<String, List<String>> taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> processTaskRelationLogs) {
Date now = new Date();
for (Map.Entry<String, List<String>> entry : taskNamePreMap.entrySet()) {
List<String> entryValue = entry.getValue();
if (CollectionUtils.isNotEmpty(entryValue)) {
for (String preTaskName : entryValue) {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
} else {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
}
}
private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setOperator(1);
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
}
@Override
public DolphinSchedulerVersion getCurrentVersion() {
return DolphinSchedulerVersion.V2_0_0;
}
}

5
dolphinscheduler-tools/src/main/java/org/apache/dolphinscheduler/tools/datasource/upgrader/v320/V320DolphinSchedulerUpgrader.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerUpgrader;
import org.apache.dolphinscheduler.tools.datasource.upgrader.DolphinSchedulerVersion;
import org.apache.commons.collections4.CollectionUtils;
@ -139,7 +140,7 @@ public class V320DolphinSchedulerUpgrader implements DolphinSchedulerUpgrader {
}
@Override
public String getCurrentVersion() {
return "3.2.0";
public DolphinSchedulerVersion getCurrentVersion() {
return DolphinSchedulerVersion.V3_2_0;
}
}

Loading…
Cancel
Save