Browse Source

[Upgrade][Install] add upgrade 2.0 code (#6672)

* Optimizing SQL scripts

* add upgrade 2.0 ddl

* add upgrade 2.0.0 code

* fix valid license header

* fix valid license header

* fix valid license header

* fix ut

* fix code style

* fix code style
3.0.0/version-upgrade
JinYong Li 3 years ago committed by GitHub
parent
commit
66fbcae9dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
  2. 248
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java
  3. 79
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java
  4. 90
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java
  5. 98
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java
  6. 228
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  7. 2
      sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql
  8. 1
      sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java

@ -119,7 +119,7 @@ public class DolphinSchedulerManager {
} else if ("1.3.2".equals(schemaVersion)) { } else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList(); upgradeDao.upgradeDolphinSchedulerResourceList();
} else if ("2.0.0".equals(schemaVersion)) { } else if ("2.0.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerJsonSplit(); upgradeDao.upgradeDolphinSchedulerTo200(schemaDir);
} }
version = schemaVersion; version = schemaVersion;
} }

248
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonSplitDao {
public static final Logger logger = LoggerFactory.getLogger(JsonSplitDao.class);
/**
* executeJsonSplitProcessDefinition
*
* @param conn jdbc connection
* @param processDefinitionLogs processDefinitionLogs
*/
public void executeJsonSplitProcessDefinition(Connection conn, List<ProcessDefinitionLog> processDefinitionLogs) {
String updateSql = "UPDATE t_ds_process_definition SET global_params=?,timeout=?,tenant_id=?,locations=?,update_time=? where id=?";
String insertLogSql = "insert into t_ds_process_definition_log (code,name,version,description,project_code,release_state,user_id,"
+ "global_params,flag,locations,timeout,tenant_id,operator,operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
try {
PreparedStatement processUpdate = conn.prepareStatement(updateSql);
PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
int i = 0;
for (ProcessDefinitionLog processDefinitionLog : processDefinitionLogs) {
processUpdate.setString(1, processDefinitionLog.getGlobalParams());
processUpdate.setInt(2, processDefinitionLog.getTimeout());
processUpdate.setInt(3, processDefinitionLog.getTenantId());
processUpdate.setString(4, processDefinitionLog.getLocations());
processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime());
processUpdate.setInt(6, processDefinitionLog.getId());
processUpdate.addBatch();
insertLog.setLong(1, processDefinitionLog.getCode());
insertLog.setString(2, processDefinitionLog.getName());
insertLog.setInt(3, processDefinitionLog.getVersion());
insertLog.setString(4, processDefinitionLog.getDescription());
insertLog.setLong(5, processDefinitionLog.getProjectCode());
insertLog.setInt(6, processDefinitionLog.getReleaseState().getCode());
insertLog.setInt(7, processDefinitionLog.getUserId());
insertLog.setString(8, processDefinitionLog.getGlobalParams());
insertLog.setInt(9, processDefinitionLog.getFlag().getCode());
insertLog.setString(10, processDefinitionLog.getLocations());
insertLog.setInt(11, processDefinitionLog.getTimeout());
insertLog.setInt(12, processDefinitionLog.getTenantId());
insertLog.setInt(13, processDefinitionLog.getOperator());
insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime());
insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime());
insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime());
insertLog.addBatch();
i++;
if (i % 1000 == 0) {
processUpdate.executeBatch();
processUpdate.clearBatch();
insertLog.executeBatch();
insertLog.clearBatch();
}
}
processUpdate.executeBatch();
insertLog.executeBatch();
processUpdate.close();
insertLog.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
/**
* executeJsonSplitProcessDefinition
*
* @param conn jdbc connection
* @param processTaskRelationLogs processTaskRelationLogs
*/
public void executeJsonSplitProcessTaskRelation(Connection conn, List<ProcessTaskRelationLog> processTaskRelationLogs) {
String insertSql = "insert into t_ds_process_task_relation (project_code,process_definition_code,process_definition_version,pre_task_code,pre_task_version,"
+ "post_task_code,post_task_version,condition_type,condition_params,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?)";
String insertLogSql = "insert into t_ds_process_task_relation_log (project_code,process_definition_code,process_definition_version,pre_task_code,"
+ "pre_task_version,post_task_code,post_task_version,condition_type,condition_params,operator,operate_time,create_time,update_time) "
+ "values (?,?,?,?,?,?,?,?,?,?,?,?,?)";
try {
PreparedStatement insert = conn.prepareStatement(insertSql);
PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
int i = 0;
for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogs) {
insert.setLong(1, processTaskRelationLog.getProjectCode());
insert.setLong(2, processTaskRelationLog.getProcessDefinitionCode());
insert.setInt(3, processTaskRelationLog.getProcessDefinitionVersion());
insert.setLong(4, processTaskRelationLog.getPreTaskCode());
insert.setInt(5, processTaskRelationLog.getPreTaskVersion());
insert.setLong(6, processTaskRelationLog.getPostTaskCode());
insert.setInt(7, processTaskRelationLog.getPostTaskVersion());
insert.setInt(8, processTaskRelationLog.getConditionType().getCode());
insert.setString(9, processTaskRelationLog.getConditionParams());
insert.setDate(10, (Date) processTaskRelationLog.getCreateTime());
insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime());
insert.addBatch();
insertLog.setLong(1, processTaskRelationLog.getProjectCode());
insertLog.setLong(2, processTaskRelationLog.getProcessDefinitionCode());
insertLog.setInt(3, processTaskRelationLog.getProcessDefinitionVersion());
insertLog.setLong(4, processTaskRelationLog.getPreTaskCode());
insertLog.setInt(5, processTaskRelationLog.getPreTaskVersion());
insertLog.setLong(6, processTaskRelationLog.getPostTaskCode());
insertLog.setInt(7, processTaskRelationLog.getPostTaskVersion());
insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode());
insertLog.setString(9, processTaskRelationLog.getConditionParams());
insertLog.setInt(10, processTaskRelationLog.getOperator());
insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime());
insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime());
insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime());
insertLog.addBatch();
i++;
if (i % 1000 == 0) {
insert.executeBatch();
insert.clearBatch();
insertLog.executeBatch();
insertLog.clearBatch();
}
}
insert.executeBatch();
insertLog.executeBatch();
insert.close();
insertLog.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
/**
* executeJsonSplitTaskDefinition
*
* @param conn jdbc connection
* @param taskDefinitionLogs taskDefinitionLogs
*/
public void executeJsonSplitTaskDefinition(Connection conn, List<TaskDefinitionLog> taskDefinitionLogs) {
String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,"
+ "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority,"
+ "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator,"
+ "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
try {
PreparedStatement insert = conn.prepareStatement(insertSql);
PreparedStatement insertLog = conn.prepareStatement(insertLogSql);
int i = 0;
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
insert.setLong(1, taskDefinitionLog.getCode());
insert.setString(2, taskDefinitionLog.getName());
insert.setInt(3, taskDefinitionLog.getVersion());
insert.setString(4, taskDefinitionLog.getDescription());
insert.setLong(5, taskDefinitionLog.getProjectCode());
insert.setInt(6, taskDefinitionLog.getUserId());
insert.setString(7, taskDefinitionLog.getTaskType());
insert.setString(8, taskDefinitionLog.getTaskParams());
insert.setInt(9, taskDefinitionLog.getFlag().getCode());
insert.setInt(10, taskDefinitionLog.getTaskPriority().getCode());
insert.setString(11, taskDefinitionLog.getWorkerGroup());
insert.setLong(12, taskDefinitionLog.getEnvironmentCode());
insert.setInt(13, taskDefinitionLog.getFailRetryTimes());
insert.setInt(14, taskDefinitionLog.getFailRetryInterval());
insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
insert.setInt(17, taskDefinitionLog.getTimeout());
insert.setInt(18, taskDefinitionLog.getDelayTime());
insert.setString(19, taskDefinitionLog.getResourceIds());
insert.setDate(20, (Date) taskDefinitionLog.getCreateTime());
insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime());
insert.addBatch();
insertLog.setLong(1, taskDefinitionLog.getCode());
insertLog.setString(2, taskDefinitionLog.getName());
insertLog.setInt(3, taskDefinitionLog.getVersion());
insertLog.setString(4, taskDefinitionLog.getDescription());
insertLog.setLong(5, taskDefinitionLog.getProjectCode());
insertLog.setInt(6, taskDefinitionLog.getUserId());
insertLog.setString(7, taskDefinitionLog.getTaskType());
insertLog.setString(8, taskDefinitionLog.getTaskParams());
insertLog.setInt(9, taskDefinitionLog.getFlag().getCode());
insertLog.setInt(10, taskDefinitionLog.getTaskPriority().getCode());
insertLog.setString(11, taskDefinitionLog.getWorkerGroup());
insertLog.setLong(12, taskDefinitionLog.getEnvironmentCode());
insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes());
insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval());
insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode());
insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode());
insertLog.setInt(17, taskDefinitionLog.getTimeout());
insertLog.setInt(18, taskDefinitionLog.getDelayTime());
insertLog.setString(19, taskDefinitionLog.getResourceIds());
insertLog.setInt(20, taskDefinitionLog.getOperator());
insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime());
insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime());
insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime());
insertLog.addBatch();
i++;
if (i % 1000 == 0) {
insert.executeBatch();
insert.clearBatch();
insertLog.executeBatch();
insertLog.clearBatch();
}
}
insert.executeBatch();
insertLog.executeBatch();
insert.close();
insertLog.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

79
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java

@ -17,12 +17,19 @@
package org.apache.dolphinscheduler.dao.upgrade; package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils; import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -43,7 +50,7 @@ public class ProcessDefinitionDao {
Map<Integer, String> processDefinitionJsonMap = new HashMap<>(); Map<Integer, String> processDefinitionJsonMap = new HashMap<>();
String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition"); String sql = "SELECT id,process_definition_json FROM t_ds_process_definition";
ResultSet rs = null; ResultSet rs = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try { try {
@ -66,7 +73,6 @@ public class ProcessDefinitionDao {
return processDefinitionJsonMap; return processDefinitionJsonMap;
} }
/** /**
* updateProcessDefinitionJson * updateProcessDefinitionJson
* *
@ -82,9 +88,78 @@ public class ProcessDefinitionDao {
pstmt.setInt(2, entry.getKey()); pstmt.setInt(2, entry.getKey());
pstmt.executeUpdate(); pstmt.executeUpdate();
} }
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
public List<ProcessDefinition> queryProcessDefinition(Connection conn) {
List<ProcessDefinition> processDefinitions = new ArrayList<>();
String sql = "SELECT id,code,project_code,user_id,locations,name,description,release_state,flag,create_time FROM t_ds_process_definition";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setId(rs.getInt(1));
long code = rs.getLong(2);
if (code == 0L) {
code = SnowFlakeUtils.getInstance().nextId();
}
processDefinition.setCode(code);
processDefinition.setVersion(Constants.VERSION_FIRST);
processDefinition.setProjectCode(rs.getLong(3));
processDefinition.setUserId(rs.getInt(4));
processDefinition.setLocations(rs.getString(5));
processDefinition.setName(rs.getString(6));
processDefinition.setDescription(rs.getString(7));
processDefinition.setReleaseState(ReleaseState.getEnum(rs.getInt(8)));
processDefinition.setFlag(rs.getInt(9) == 1 ? Flag.YES : Flag.NO);
processDefinition.setCreateTime(rs.getDate(10));
processDefinitions.add(processDefinition);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return processDefinitions;
} }
/**
* updateProcessDefinitionCode
*
* @param conn jdbc connection
* @param processDefinitions processDefinitions
* @param projectIdCodeMap projectIdCodeMap
*/
public void updateProcessDefinitionCode(Connection conn, List<ProcessDefinition> processDefinitions, Map<Integer, Long> projectIdCodeMap) {
String sql = "UPDATE t_ds_process_definition SET code=?, project_code=?, version=? where id=?";
try {
for (ProcessDefinition processDefinition : processDefinitions) {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setLong(1, processDefinition.getCode());
long projectCode = processDefinition.getProjectCode();
if (String.valueOf(projectCode).length() <= 10) {
Integer projectId = Integer.getInteger(String.valueOf(projectCode));
if (projectIdCodeMap.containsKey(projectId)) {
projectCode = projectIdCodeMap.get(projectId);
processDefinition.setProjectCode(projectCode);
}
}
pstmt.setLong(2, projectCode);
pstmt.setInt(3, processDefinition.getVersion());
pstmt.setInt(4, processDefinition.getId());
pstmt.executeUpdate();
}
}
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e); throw new RuntimeException("sql: " + sql, e);

90
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProjectDao {
public static final Logger logger = LoggerFactory.getLogger(ProjectDao.class);
/**
* queryAllProject
*
* @param conn jdbc connection
* @return Project List
*/
public Map<Integer, Long> queryAllProject(Connection conn) {
Map<Integer, Long> projectMap = new HashMap<>();
String sql = "SELECT id,code FROM t_ds_project";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
Integer id = rs.getInt(1);
long code = rs.getLong(2);
if (code == 0L) {
code = SnowFlakeUtils.getInstance().nextId();
}
projectMap.put(id, code);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return projectMap;
}
/**
* updateProjectCode
*
* @param conn jdbc connection
* @param projectMap projectMap
*/
public void updateProjectCode(Connection conn, Map<Integer, Long> projectMap) {
String sql = "UPDATE t_ds_project SET code=? where id=?";
try {
for (Map.Entry<Integer, Long> entry : projectMap.entrySet()) {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setLong(1, entry.getValue());
pstmt.setInt(2, entry.getKey());
pstmt.executeUpdate();
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

98
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ScheduleDao {
public static final Logger logger = LoggerFactory.getLogger(ScheduleDao.class);
/**
* queryAllSchedule
*
* @param conn jdbc connection
* @return Schedule List
*/
public Map<Integer, Long> queryAllSchedule(Connection conn) {
Map<Integer, Long> scheduleMap = new HashMap<>();
String sql = "SELECT id,process_definition_code FROM t_ds_schedules";
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
Integer id = rs.getInt(1);
long processDefinitionCode = rs.getLong(2);
scheduleMap.put(id, processDefinitionCode);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return scheduleMap;
}
/**
* update schedule
*
* @param conn jdbc connection
* @param scheduleMap scheduleMap
* @param processIdCodeMap processIdCodeMap
*/
public void updateScheduleCode(Connection conn, Map<Integer, Long> scheduleMap, Map<Integer, Long> processIdCodeMap) {
String sql = "UPDATE t_ds_schedules SET process_definition_code=?,timezone_id=?,environment_code=-1 where id=?";
try {
Clock clock = Clock.systemDefaultZone();
String timezoneId = clock.getZone().getId();
for (Map.Entry<Integer, Long> entry : scheduleMap.entrySet()) {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
long projectDefinitionCode = entry.getValue();
if (String.valueOf(projectDefinitionCode).length() <= 10) {
Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode));
if (processIdCodeMap.containsKey(projectDefinitionId)) {
projectDefinitionCode = processIdCodeMap.get(projectDefinitionId);
}
}
pstmt.setLong(1, projectDefinitionCode);
pstmt.setString(2, timezoneId);
pstmt.setInt(3, entry.getKey());
pstmt.executeUpdate();
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(conn);
}
}
}

228
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -16,31 +16,55 @@
*/ */
package org.apache.dolphinscheduler.dao.upgrade; package org.apache.dolphinscheduler.dao.upgrade;
import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.dolphinscheduler.common.Constants;
import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import java.io.File;
import org.slf4j.LoggerFactory; import java.io.FileNotFoundException;
import java.io.FileReader;
import javax.sql.DataSource; import java.io.IOException;
import java.io.*; import java.io.Reader;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
public abstract class UpgradeDao extends AbstractBaseDao { public abstract class UpgradeDao extends AbstractBaseDao {
public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class);
@ -266,9 +290,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
* @param schemaDir schema dir * @param schemaDir schema dir
*/ */
public void upgradeDolphinScheduler(String schemaDir) { public void upgradeDolphinScheduler(String schemaDir) {
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql");
upgradeDolphinSchedulerDDL(schemaDir);
upgradeDolphinSchedulerDML(schemaDir); upgradeDolphinSchedulerDML(schemaDir);
} }
@ -292,8 +314,9 @@ public abstract class UpgradeDao extends AbstractBaseDao {
/** /**
* upgrade DolphinScheduler to 2.0.0 * upgrade DolphinScheduler to 2.0.0
*/ */
public void upgradeDolphinSchedulerJsonSplit() { public void upgradeDolphinSchedulerTo200(String schemaDir) {
processDefinitionJsonSplit(); processDefinitionJsonSplit();
upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql");
} }
/** /**
@ -481,11 +504,11 @@ public abstract class UpgradeDao extends AbstractBaseDao {
* *
* @param schemaDir schemaDir * @param schemaDir schemaDir
*/ */
private void upgradeDolphinSchedulerDDL(String schemaDir) { private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) {
if (StringUtils.isEmpty(rootDir)) { if (StringUtils.isEmpty(rootDir)) {
throw new RuntimeException("Environment variable user.dir not found"); throw new RuntimeException("Environment variable user.dir not found");
} }
String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_ddl.sql", rootDir, schemaDir, getDbType().name().toLowerCase()); String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/{3}", rootDir, schemaDir, getDbType().name().toLowerCase(), scriptFile);
Connection conn = null; Connection conn = null;
PreparedStatement pstmt = null; PreparedStatement pstmt = null;
try { try {
@ -517,7 +540,6 @@ public abstract class UpgradeDao extends AbstractBaseDao {
} finally { } finally {
ConnectionUtils.releaseResource(pstmt, conn); ConnectionUtils.releaseResource(pstmt, conn);
} }
} }
@ -550,7 +572,181 @@ public abstract class UpgradeDao extends AbstractBaseDao {
} }
public void processDefinitionJsonSplit() { /**
* upgrade DolphinScheduler to 2.0.0, json split
*/
private void processDefinitionJsonSplit() {
ProjectDao projectDao = new ProjectDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
ScheduleDao scheduleDao = new ScheduleDao();
JsonSplitDao jsonSplitDao = new JsonSplitDao();
try {
// execute project
Map<Integer, Long> projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection());
projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap);
// execute process definition code
List<ProcessDefinition> processDefinitions = processDefinitionDao.queryProcessDefinition(dataSource.getConnection());
processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, projectIdCodeMap);
// execute schedule
Map<Integer, Long> allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection());
Map<Integer, Long> processIdCodeMap = processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode));
scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap);
// json split
Map<Integer, String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
List<ProcessDefinitionLog> processDefinitionLogs = new ArrayList<>();
List<ProcessTaskRelationLog> processTaskRelationLogs = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogs = new ArrayList<>();
splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs);
// execute json split
jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs);
jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs);
jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs);
} catch (Exception e) {
logger.error("json split error", e);
}
}
private void splitProcessDefinitionJson(List<ProcessDefinition> processDefinitions,
Map<Integer, String> processDefinitionJsonMap,
List<ProcessDefinitionLog> processDefinitionLogs,
List<ProcessTaskRelationLog> processTaskRelationLogs,
List<TaskDefinitionLog> taskDefinitionLogs) throws Exception {
Map<Integer, ProcessDefinition> processDefinitionMap = processDefinitions.stream()
.collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition));
Date now = new Date();
for (Map.Entry<Integer, String> entry : processDefinitionJsonMap.entrySet()) {
if (entry.getValue() == null) {
throw new Exception("processDefinitionJson is null");
}
ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue());
ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey());
if (processDefinition != null) {
processDefinition.setTenantId(jsonObject.get("tenantId").asInt());
processDefinition.setTimeout(jsonObject.get("timeout").asInt());
processDefinition.setGlobalParams(jsonObject.get("globalParams").toString());
} else {
throw new Exception("It can't find processDefinition, please check !");
}
Map<String, Long> taskIdCodeMap = new HashMap<>();
Map<String, List<String>> taskNamePreMap = new HashMap<>();
Map<String, Long> taskNameCodeMap = new HashMap<>();
ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString());
for (int i = 0; i < tasks.size(); i++) {
ObjectNode task = (ObjectNode) tasks.path(i);
ObjectNode param = (ObjectNode) task.get("params");
TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog();
if (param != null) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class);
if (!resourceList.isEmpty()) {
List<Integer> resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList());
taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ","));
}
param.put("conditionResult", task.get("conditionResult"));
param.put("dependence", task.get("dependence"));
taskDefinitionLog.setTaskParams(param.toString());
}
TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class);
if (timeout != null) {
taskDefinitionLog.setTimeout(timeout.getInterval());
taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE);
taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy());
}
taskDefinitionLog.setDescription(task.get("description").toString());
taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").toString()) ? Flag.YES : Flag.NO);
taskDefinitionLog.setTaskType(task.get("type").toString());
taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt());
taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt());
taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class));
String name = task.get("name").toString();
taskDefinitionLog.setName(name);
taskDefinitionLog.setWorkerGroup(task.get("workerGroup").toString());
long taskCode = SnowFlakeUtils.getInstance().nextId();
taskDefinitionLog.setCode(taskCode);
taskDefinitionLog.setVersion(Constants.VERSION_FIRST);
taskDefinitionLog.setProjectCode(processDefinition.getProjectCode());
taskDefinitionLog.setUserId(processDefinition.getUserId());
taskDefinitionLog.setEnvironmentCode(-1);
taskDefinitionLog.setDelayTime(0);
taskDefinitionLog.setOperator(1);
taskDefinitionLog.setOperateTime(now);
taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now);
taskDefinitionLogs.add(taskDefinitionLog);
taskIdCodeMap.put(task.get("id").toString(), taskCode);
List<String> preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class);
taskNamePreMap.put(name, preTasks);
taskNameCodeMap.put(name, taskCode);
}
processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap));
ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition);
processDefinitionLog.setOperator(1);
processDefinitionLog.setOperateTime(now);
processDefinitionLog.setUpdateTime(now);
processDefinitionLogs.add(processDefinitionLog);
handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs);
}
}
private String convertLocations(String locations, Map<String, Long> taskIdCodeMap) {
if (StringUtils.isBlank(locations)) {
return locations;
}
Map<String, String> locationsMap = JSONUtils.toMap(locations);
JsonNodeFactory factory = new JsonNodeFactory(false);
ArrayNode jsonNodes = factory.arrayNode();
for (Map.Entry<String, String> entry : locationsMap.entrySet()) {
ObjectNode nodes = factory.objectNode();
nodes.put("taskCode", taskIdCodeMap.get(entry.getKey()));
ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue());
nodes.put("x", oldNodes.get("x").asInt());
nodes.put("y", oldNodes.get("y").asInt());
jsonNodes.add(nodes);
}
return jsonNodes.toString();
}
private void handleProcessTaskRelation(Map<String, List<String>> taskNamePreMap,
Map<String, Long> taskNameCodeMap,
ProcessDefinition processDefinition,
List<ProcessTaskRelationLog> processTaskRelationLogs) {
Date now = new Date();
for (Map.Entry<String, List<String>> entry : taskNamePreMap.entrySet()) {
List<String> entryValue = entry.getValue();
if (CollectionUtils.isNotEmpty(entryValue)) {
for (String preTaskName : entryValue) {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName));
processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
} else {
ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now);
processTaskRelationLog.setPreTaskCode(0);
processTaskRelationLog.setPreTaskVersion(0);
processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey()));
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLogs.add(processTaskRelationLog);
}
}
}
private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog();
processTaskRelationLog.setProjectCode(processDefinition.getProjectCode());
processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode());
processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion());
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLog.setOperator(1);
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setCreateTime(now);
processTaskRelationLog.setUpdateTime(now);
return processTaskRelationLog;
} }
} }

2
sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql

@ -400,7 +400,7 @@ alter table t_ds_schedules add environment_code bigint(20) DEFAULT '-1' COMMENT
-- t_ds_process_definition -- t_ds_process_definition
alter table t_ds_process_definition add `code` bigint(20) NOT NULL COMMENT 'encoding' AFTER `id`; alter table t_ds_process_definition add `code` bigint(20) NOT NULL COMMENT 'encoding' AFTER `id`;
alter table t_ds_process_definition add `project_code` bigint(20) NOT NULL COMMENT 'encoding' AFTER `project_id`; alter table t_ds_process_definition change project_id project_code bigint(20) NOT NULL COMMENT 'project code' AFTER `description`;
alter table t_ds_process_definition add `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id' AFTER `locations`; alter table t_ds_process_definition add `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id' AFTER `locations`;
alter table t_ds_process_definition add UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE; alter table t_ds_process_definition add UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE;
alter table t_ds_process_definition modify `description` text COMMENT 'description' after `version`; alter table t_ds_process_definition modify `description` text COMMENT 'description' after `version`;

1
sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql

@ -19,7 +19,6 @@ alter table t_ds_process_definition drop primary key;
ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`); ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`);
ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`; ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`;
ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`; ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`;
alter table t_ds_process_definition drop project_id;
alter table t_ds_process_definition drop process_definition_json; alter table t_ds_process_definition drop process_definition_json;
alter table t_ds_process_definition drop connects; alter table t_ds_process_definition drop connects;
alter table t_ds_process_definition drop receivers; alter table t_ds_process_definition drop receivers;

Loading…
Cancel
Save