diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java index 7b8c4b391b..a6ad6ca8c3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java @@ -119,7 +119,7 @@ public class DolphinSchedulerManager { } else if ("1.3.2".equals(schemaVersion)) { upgradeDao.upgradeDolphinSchedulerResourceList(); } else if ("2.0.0".equals(schemaVersion)) { - upgradeDao.upgradeDolphinSchedulerJsonSplit(); + upgradeDao.upgradeDolphinSchedulerTo200(schemaDir); } version = schemaVersion; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java new file mode 100644 index 0000000000..790d33a8f2 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/JsonSplitDao.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonSplitDao { + + public static final Logger logger = LoggerFactory.getLogger(JsonSplitDao.class); + + /** + * executeJsonSplitProcessDefinition + * + * @param conn jdbc connection + * @param processDefinitionLogs processDefinitionLogs + */ + public void executeJsonSplitProcessDefinition(Connection conn, List processDefinitionLogs) { + String updateSql = "UPDATE t_ds_process_definition SET global_params=?,timeout=?,tenant_id=?,locations=?,update_time=? where id=?"; + String insertLogSql = "insert into t_ds_process_definition_log (code,name,version,description,project_code,release_state,user_id," + + "global_params,flag,locations,timeout,tenant_id,operator,operate_time,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + try { + PreparedStatement processUpdate = conn.prepareStatement(updateSql); + PreparedStatement insertLog = conn.prepareStatement(insertLogSql); + int i = 0; + for (ProcessDefinitionLog processDefinitionLog : processDefinitionLogs) { + processUpdate.setString(1, processDefinitionLog.getGlobalParams()); + processUpdate.setInt(2, processDefinitionLog.getTimeout()); + processUpdate.setInt(3, processDefinitionLog.getTenantId()); + processUpdate.setString(4, processDefinitionLog.getLocations()); + processUpdate.setDate(5, (Date) processDefinitionLog.getUpdateTime()); + processUpdate.setInt(6, processDefinitionLog.getId()); + processUpdate.addBatch(); + + insertLog.setLong(1, processDefinitionLog.getCode()); + insertLog.setString(2, processDefinitionLog.getName()); + insertLog.setInt(3, processDefinitionLog.getVersion()); + insertLog.setString(4, processDefinitionLog.getDescription()); + insertLog.setLong(5, processDefinitionLog.getProjectCode()); + insertLog.setInt(6, processDefinitionLog.getReleaseState().getCode()); + insertLog.setInt(7, processDefinitionLog.getUserId()); + insertLog.setString(8, processDefinitionLog.getGlobalParams()); + insertLog.setInt(9, processDefinitionLog.getFlag().getCode()); + insertLog.setString(10, processDefinitionLog.getLocations()); + insertLog.setInt(11, processDefinitionLog.getTimeout()); + insertLog.setInt(12, processDefinitionLog.getTenantId()); + insertLog.setInt(13, processDefinitionLog.getOperator()); + insertLog.setDate(14, (Date) processDefinitionLog.getOperateTime()); + insertLog.setDate(15, (Date) processDefinitionLog.getCreateTime()); + insertLog.setDate(16, (Date) processDefinitionLog.getUpdateTime()); + insertLog.addBatch(); + + i++; + if (i % 1000 == 0) { + processUpdate.executeBatch(); + processUpdate.clearBatch(); + insertLog.executeBatch(); + insertLog.clearBatch(); + } + } + processUpdate.executeBatch(); + insertLog.executeBatch(); + processUpdate.close(); + insertLog.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } + + /** + * executeJsonSplitProcessDefinition + * + * @param conn jdbc connection + * @param processTaskRelationLogs processTaskRelationLogs + */ + public void executeJsonSplitProcessTaskRelation(Connection conn, List processTaskRelationLogs) { + String insertSql = "insert into t_ds_process_task_relation (project_code,process_definition_code,process_definition_version,pre_task_code,pre_task_version," + + "post_task_code,post_task_version,condition_type,condition_params,create_time,update_time) values (?,?,?,?,?,?,?,?,?,?,?)"; + String insertLogSql = "insert into t_ds_process_task_relation_log (project_code,process_definition_code,process_definition_version,pre_task_code," + + "pre_task_version,post_task_code,post_task_version,condition_type,condition_params,operator,operate_time,create_time,update_time) " + + "values (?,?,?,?,?,?,?,?,?,?,?,?,?)"; + try { + PreparedStatement insert = conn.prepareStatement(insertSql); + PreparedStatement insertLog = conn.prepareStatement(insertLogSql); + int i = 0; + for (ProcessTaskRelationLog processTaskRelationLog : processTaskRelationLogs) { + insert.setLong(1, processTaskRelationLog.getProjectCode()); + insert.setLong(2, processTaskRelationLog.getProcessDefinitionCode()); + insert.setInt(3, processTaskRelationLog.getProcessDefinitionVersion()); + insert.setLong(4, processTaskRelationLog.getPreTaskCode()); + insert.setInt(5, processTaskRelationLog.getPreTaskVersion()); + insert.setLong(6, processTaskRelationLog.getPostTaskCode()); + insert.setInt(7, processTaskRelationLog.getPostTaskVersion()); + insert.setInt(8, processTaskRelationLog.getConditionType().getCode()); + insert.setString(9, processTaskRelationLog.getConditionParams()); + insert.setDate(10, (Date) processTaskRelationLog.getCreateTime()); + insert.setDate(11, (Date) processTaskRelationLog.getUpdateTime()); + insert.addBatch(); + + insertLog.setLong(1, processTaskRelationLog.getProjectCode()); + insertLog.setLong(2, processTaskRelationLog.getProcessDefinitionCode()); + insertLog.setInt(3, processTaskRelationLog.getProcessDefinitionVersion()); + insertLog.setLong(4, processTaskRelationLog.getPreTaskCode()); + insertLog.setInt(5, processTaskRelationLog.getPreTaskVersion()); + insertLog.setLong(6, processTaskRelationLog.getPostTaskCode()); + insertLog.setInt(7, processTaskRelationLog.getPostTaskVersion()); + insertLog.setInt(8, processTaskRelationLog.getConditionType().getCode()); + insertLog.setString(9, processTaskRelationLog.getConditionParams()); + insertLog.setInt(10, processTaskRelationLog.getOperator()); + insertLog.setDate(11, (Date) processTaskRelationLog.getOperateTime()); + insertLog.setDate(12, (Date) processTaskRelationLog.getCreateTime()); + insertLog.setDate(13, (Date) processTaskRelationLog.getUpdateTime()); + insertLog.addBatch(); + + i++; + if (i % 1000 == 0) { + insert.executeBatch(); + insert.clearBatch(); + insertLog.executeBatch(); + insertLog.clearBatch(); + } + } + insert.executeBatch(); + insertLog.executeBatch(); + insert.close(); + insertLog.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } + + /** + * executeJsonSplitTaskDefinition + * + * @param conn jdbc connection + * @param taskDefinitionLogs taskDefinitionLogs + */ + public void executeJsonSplitTaskDefinition(Connection conn, List taskDefinitionLogs) { + String insertSql = "insert into t_ds_task_definition (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority," + + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids," + + "create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + String insertLogSql = "insert into t_ds_task_definition_log (code,name,version,description,project_code,user_id,task_type,task_params,flag,task_priority," + + "worker_group,environment_code,fail_retry_times,fail_retry_interval,timeout_flag,timeout_notify_strategy,timeout,delay_time,resource_ids,operator," + + "operate_time,create_time,update_time) values values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + try { + PreparedStatement insert = conn.prepareStatement(insertSql); + PreparedStatement insertLog = conn.prepareStatement(insertLogSql); + int i = 0; + for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { + insert.setLong(1, taskDefinitionLog.getCode()); + insert.setString(2, taskDefinitionLog.getName()); + insert.setInt(3, taskDefinitionLog.getVersion()); + insert.setString(4, taskDefinitionLog.getDescription()); + insert.setLong(5, taskDefinitionLog.getProjectCode()); + insert.setInt(6, taskDefinitionLog.getUserId()); + insert.setString(7, taskDefinitionLog.getTaskType()); + insert.setString(8, taskDefinitionLog.getTaskParams()); + insert.setInt(9, taskDefinitionLog.getFlag().getCode()); + insert.setInt(10, taskDefinitionLog.getTaskPriority().getCode()); + insert.setString(11, taskDefinitionLog.getWorkerGroup()); + insert.setLong(12, taskDefinitionLog.getEnvironmentCode()); + insert.setInt(13, taskDefinitionLog.getFailRetryTimes()); + insert.setInt(14, taskDefinitionLog.getFailRetryInterval()); + insert.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode()); + insert.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); + insert.setInt(17, taskDefinitionLog.getTimeout()); + insert.setInt(18, taskDefinitionLog.getDelayTime()); + insert.setString(19, taskDefinitionLog.getResourceIds()); + insert.setDate(20, (Date) taskDefinitionLog.getCreateTime()); + insert.setDate(21, (Date) taskDefinitionLog.getUpdateTime()); + insert.addBatch(); + + insertLog.setLong(1, taskDefinitionLog.getCode()); + insertLog.setString(2, taskDefinitionLog.getName()); + insertLog.setInt(3, taskDefinitionLog.getVersion()); + insertLog.setString(4, taskDefinitionLog.getDescription()); + insertLog.setLong(5, taskDefinitionLog.getProjectCode()); + insertLog.setInt(6, taskDefinitionLog.getUserId()); + insertLog.setString(7, taskDefinitionLog.getTaskType()); + insertLog.setString(8, taskDefinitionLog.getTaskParams()); + insertLog.setInt(9, taskDefinitionLog.getFlag().getCode()); + insertLog.setInt(10, taskDefinitionLog.getTaskPriority().getCode()); + insertLog.setString(11, taskDefinitionLog.getWorkerGroup()); + insertLog.setLong(12, taskDefinitionLog.getEnvironmentCode()); + insertLog.setInt(13, taskDefinitionLog.getFailRetryTimes()); + insertLog.setInt(14, taskDefinitionLog.getFailRetryInterval()); + insertLog.setInt(15, taskDefinitionLog.getTimeoutFlag().getCode()); + insertLog.setInt(16, taskDefinitionLog.getTimeoutNotifyStrategy().getCode()); + insertLog.setInt(17, taskDefinitionLog.getTimeout()); + insertLog.setInt(18, taskDefinitionLog.getDelayTime()); + insertLog.setString(19, taskDefinitionLog.getResourceIds()); + insertLog.setInt(20, taskDefinitionLog.getOperator()); + insertLog.setDate(21, (Date) taskDefinitionLog.getOperateTime()); + insertLog.setDate(22, (Date) taskDefinitionLog.getCreateTime()); + insertLog.setDate(23, (Date) taskDefinitionLog.getUpdateTime()); + insertLog.addBatch(); + + i++; + if (i % 1000 == 0) { + insert.executeBatch(); + insert.clearBatch(); + insertLog.executeBatch(); + insertLog.clearBatch(); + } + } + insert.executeBatch(); + insertLog.executeBatch(); + insert.close(); + insertLog.close(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java index 8b6d762413..9cb03b8917 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProcessDefinitionDao.java @@ -17,12 +17,19 @@ package org.apache.dolphinscheduler.dao.upgrade; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.slf4j.Logger; @@ -43,7 +50,7 @@ public class ProcessDefinitionDao { Map processDefinitionJsonMap = new HashMap<>(); - String sql = String.format("SELECT id,process_definition_json FROM t_ds_process_definition"); + String sql = "SELECT id,process_definition_json FROM t_ds_process_definition"; ResultSet rs = null; PreparedStatement pstmt = null; try { @@ -66,7 +73,6 @@ public class ProcessDefinitionDao { return processDefinitionJsonMap; } - /** * updateProcessDefinitionJson * @@ -82,9 +88,78 @@ public class ProcessDefinitionDao { pstmt.setInt(2, entry.getKey()); pstmt.executeUpdate(); } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } + public List queryProcessDefinition(Connection conn) { + List processDefinitions = new ArrayList<>(); + String sql = "SELECT id,code,project_code,user_id,locations,name,description,release_state,flag,create_time FROM t_ds_process_definition"; + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + while (rs.next()) { + ProcessDefinition processDefinition = new ProcessDefinition(); + processDefinition.setId(rs.getInt(1)); + long code = rs.getLong(2); + if (code == 0L) { + code = SnowFlakeUtils.getInstance().nextId(); + } + processDefinition.setCode(code); + processDefinition.setVersion(Constants.VERSION_FIRST); + processDefinition.setProjectCode(rs.getLong(3)); + processDefinition.setUserId(rs.getInt(4)); + processDefinition.setLocations(rs.getString(5)); + processDefinition.setName(rs.getString(6)); + processDefinition.setDescription(rs.getString(7)); + processDefinition.setReleaseState(ReleaseState.getEnum(rs.getInt(8))); + processDefinition.setFlag(rs.getInt(9) == 1 ? Flag.YES : Flag.NO); + processDefinition.setCreateTime(rs.getDate(10)); + processDefinitions.add(processDefinition); } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + return processDefinitions; + } + /** + * updateProcessDefinitionCode + * + * @param conn jdbc connection + * @param processDefinitions processDefinitions + * @param projectIdCodeMap projectIdCodeMap + */ + public void updateProcessDefinitionCode(Connection conn, List processDefinitions, Map projectIdCodeMap) { + String sql = "UPDATE t_ds_process_definition SET code=?, project_code=?, version=? where id=?"; + try { + for (ProcessDefinition processDefinition : processDefinitions) { + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { + pstmt.setLong(1, processDefinition.getCode()); + long projectCode = processDefinition.getProjectCode(); + if (String.valueOf(projectCode).length() <= 10) { + Integer projectId = Integer.getInteger(String.valueOf(projectCode)); + if (projectIdCodeMap.containsKey(projectId)) { + projectCode = projectIdCodeMap.get(projectId); + processDefinition.setProjectCode(projectCode); + } + } + pstmt.setLong(2, projectCode); + pstmt.setInt(3, processDefinition.getVersion()); + pstmt.setInt(4, processDefinition.getId()); + pstmt.executeUpdate(); + } + } } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException("sql: " + sql, e); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java new file mode 100644 index 0000000000..794d71a292 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ProjectDao.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProjectDao { + + public static final Logger logger = LoggerFactory.getLogger(ProjectDao.class); + + /** + * queryAllProject + * + * @param conn jdbc connection + * @return Project List + */ + public Map queryAllProject(Connection conn) { + Map projectMap = new HashMap<>(); + String sql = "SELECT id,code FROM t_ds_project"; + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + while (rs.next()) { + Integer id = rs.getInt(1); + long code = rs.getLong(2); + if (code == 0L) { + code = SnowFlakeUtils.getInstance().nextId(); + } + projectMap.put(id, code); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + return projectMap; + } + + /** + * updateProjectCode + * + * @param conn jdbc connection + * @param projectMap projectMap + */ + public void updateProjectCode(Connection conn, Map projectMap) { + String sql = "UPDATE t_ds_project SET code=? where id=?"; + try { + for (Map.Entry entry : projectMap.entrySet()) { + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { + pstmt.setLong(1, entry.getValue()); + pstmt.setInt(2, entry.getKey()); + pstmt.executeUpdate(); + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java new file mode 100644 index 0000000000..097020089d --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ScheduleDao.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.upgrade; + +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.time.Clock; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScheduleDao { + + public static final Logger logger = LoggerFactory.getLogger(ScheduleDao.class); + + /** + * queryAllSchedule + * + * @param conn jdbc connection + * @return Schedule List + */ + public Map queryAllSchedule(Connection conn) { + Map scheduleMap = new HashMap<>(); + String sql = "SELECT id,process_definition_code FROM t_ds_schedules"; + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + while (rs.next()) { + Integer id = rs.getInt(1); + long processDefinitionCode = rs.getLong(2); + scheduleMap.put(id, processDefinitionCode); + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(rs, pstmt, conn); + } + return scheduleMap; + } + + /** + * update schedule + * + * @param conn jdbc connection + * @param scheduleMap scheduleMap + * @param processIdCodeMap processIdCodeMap + */ + public void updateScheduleCode(Connection conn, Map scheduleMap, Map processIdCodeMap) { + String sql = "UPDATE t_ds_schedules SET process_definition_code=?,timezone_id=?,environment_code=-1 where id=?"; + try { + Clock clock = Clock.systemDefaultZone(); + String timezoneId = clock.getZone().getId(); + for (Map.Entry entry : scheduleMap.entrySet()) { + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { + long projectDefinitionCode = entry.getValue(); + if (String.valueOf(projectDefinitionCode).length() <= 10) { + Integer projectDefinitionId = Integer.getInteger(String.valueOf(projectDefinitionCode)); + if (processIdCodeMap.containsKey(projectDefinitionId)) { + projectDefinitionCode = processIdCodeMap.get(projectDefinitionId); + } + } + pstmt.setLong(1, projectDefinitionCode); + pstmt.setString(2, timezoneId); + pstmt.setInt(3, entry.getKey()); + pstmt.executeUpdate(); + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + ConnectionUtils.releaseResource(conn); + } + } +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index 47eeedb604..a7acafc475 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -16,31 +16,55 @@ */ package org.apache.dolphinscheduler.dao.upgrade; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ConditionType; import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.process.ResourceInfo; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.ConnectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.SchemaUtils; +import org.apache.dolphinscheduler.common.utils.ScriptRunner; +import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils; import org.apache.dolphinscheduler.dao.AbstractBaseDao; import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.sql.DataSource; -import java.io.*; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.Reader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + public abstract class UpgradeDao extends AbstractBaseDao { public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); @@ -266,9 +290,7 @@ public abstract class UpgradeDao extends AbstractBaseDao { * @param schemaDir schema dir */ public void upgradeDolphinScheduler(String schemaDir) { - - upgradeDolphinSchedulerDDL(schemaDir); - + upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl.sql"); upgradeDolphinSchedulerDML(schemaDir); } @@ -292,8 +314,9 @@ public abstract class UpgradeDao extends AbstractBaseDao { /** * upgrade DolphinScheduler to 2.0.0 */ - public void upgradeDolphinSchedulerJsonSplit() { + public void upgradeDolphinSchedulerTo200(String schemaDir) { processDefinitionJsonSplit(); + upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql"); } /** @@ -481,11 +504,11 @@ public abstract class UpgradeDao extends AbstractBaseDao { * * @param schemaDir schemaDir */ - private void upgradeDolphinSchedulerDDL(String schemaDir) { + private void upgradeDolphinSchedulerDDL(String schemaDir, String scriptFile) { if (StringUtils.isEmpty(rootDir)) { throw new RuntimeException("Environment variable user.dir not found"); } - String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/dolphinscheduler_ddl.sql", rootDir, schemaDir, getDbType().name().toLowerCase()); + String sqlFilePath = MessageFormat.format("{0}/sql/upgrade/{1}/{2}/{3}", rootDir, schemaDir, getDbType().name().toLowerCase(), scriptFile); Connection conn = null; PreparedStatement pstmt = null; try { @@ -517,7 +540,6 @@ public abstract class UpgradeDao extends AbstractBaseDao { } finally { ConnectionUtils.releaseResource(pstmt, conn); } - } @@ -550,7 +572,181 @@ public abstract class UpgradeDao extends AbstractBaseDao { } - public void processDefinitionJsonSplit() { + /** + * upgrade DolphinScheduler to 2.0.0, json split + */ + private void processDefinitionJsonSplit() { + ProjectDao projectDao = new ProjectDao(); + ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao(); + ScheduleDao scheduleDao = new ScheduleDao(); + JsonSplitDao jsonSplitDao = new JsonSplitDao(); + try { + // execute project + Map projectIdCodeMap = projectDao.queryAllProject(dataSource.getConnection()); + projectDao.updateProjectCode(dataSource.getConnection(), projectIdCodeMap); + + // execute process definition code + List processDefinitions = processDefinitionDao.queryProcessDefinition(dataSource.getConnection()); + processDefinitionDao.updateProcessDefinitionCode(dataSource.getConnection(), processDefinitions, projectIdCodeMap); + + // execute schedule + Map allSchedule = scheduleDao.queryAllSchedule(dataSource.getConnection()); + Map processIdCodeMap = processDefinitions.stream().collect(Collectors.toMap(ProcessDefinition::getId, ProcessDefinition::getCode)); + scheduleDao.updateScheduleCode(dataSource.getConnection(), allSchedule, processIdCodeMap); + + // json split + Map processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection()); + List processDefinitionLogs = new ArrayList<>(); + List processTaskRelationLogs = new ArrayList<>(); + List taskDefinitionLogs = new ArrayList<>(); + splitProcessDefinitionJson(processDefinitions, processDefinitionJsonMap, processDefinitionLogs, processTaskRelationLogs, taskDefinitionLogs); + + // execute json split + jsonSplitDao.executeJsonSplitProcessDefinition(dataSource.getConnection(), processDefinitionLogs); + jsonSplitDao.executeJsonSplitProcessTaskRelation(dataSource.getConnection(), processTaskRelationLogs); + jsonSplitDao.executeJsonSplitTaskDefinition(dataSource.getConnection(), taskDefinitionLogs); + } catch (Exception e) { + logger.error("json split error", e); + } + } + + private void splitProcessDefinitionJson(List processDefinitions, + Map processDefinitionJsonMap, + List processDefinitionLogs, + List processTaskRelationLogs, + List taskDefinitionLogs) throws Exception { + Map processDefinitionMap = processDefinitions.stream() + .collect(Collectors.toMap(ProcessDefinition::getId, processDefinition -> processDefinition)); + Date now = new Date(); + for (Map.Entry entry : processDefinitionJsonMap.entrySet()) { + if (entry.getValue() == null) { + throw new Exception("processDefinitionJson is null"); + } + ObjectNode jsonObject = JSONUtils.parseObject(entry.getValue()); + ProcessDefinition processDefinition = processDefinitionMap.get(entry.getKey()); + if (processDefinition != null) { + processDefinition.setTenantId(jsonObject.get("tenantId").asInt()); + processDefinition.setTimeout(jsonObject.get("timeout").asInt()); + processDefinition.setGlobalParams(jsonObject.get("globalParams").toString()); + } else { + throw new Exception("It can't find processDefinition, please check !"); + } + Map taskIdCodeMap = new HashMap<>(); + Map> taskNamePreMap = new HashMap<>(); + Map taskNameCodeMap = new HashMap<>(); + ArrayNode tasks = JSONUtils.parseArray(jsonObject.get("tasks").toString()); + for (int i = 0; i < tasks.size(); i++) { + ObjectNode task = (ObjectNode) tasks.path(i); + ObjectNode param = (ObjectNode) task.get("params"); + TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(); + if (param != null) { + List resourceList = JSONUtils.toList(param.get("resourceList").toString(), ResourceInfo.class); + if (!resourceList.isEmpty()) { + List resourceIds = resourceList.stream().map(ResourceInfo::getId).collect(Collectors.toList()); + taskDefinitionLog.setResourceIds(StringUtils.join(resourceIds, ",")); + } + param.put("conditionResult", task.get("conditionResult")); + param.put("dependence", task.get("dependence")); + taskDefinitionLog.setTaskParams(param.toString()); + } + TaskTimeoutParameter timeout = JSONUtils.parseObject(JSONUtils.toJsonString(task.get("timeout")), TaskTimeoutParameter.class); + if (timeout != null) { + taskDefinitionLog.setTimeout(timeout.getInterval()); + taskDefinitionLog.setTimeoutFlag(timeout.getEnable() ? TimeoutFlag.OPEN : TimeoutFlag.CLOSE); + taskDefinitionLog.setTimeoutNotifyStrategy(timeout.getStrategy()); + } + taskDefinitionLog.setDescription(task.get("description").toString()); + taskDefinitionLog.setFlag(Constants.FLOWNODE_RUN_FLAG_NORMAL.equals(task.get("runFlag").toString()) ? Flag.YES : Flag.NO); + taskDefinitionLog.setTaskType(task.get("type").toString()); + taskDefinitionLog.setFailRetryInterval(task.get("retryInterval").asInt()); + taskDefinitionLog.setFailRetryTimes(task.get("maxRetryTimes").asInt()); + taskDefinitionLog.setTaskPriority(JSONUtils.parseObject(JSONUtils.toJsonString(task.get("taskInstancePriority")), Priority.class)); + String name = task.get("name").toString(); + taskDefinitionLog.setName(name); + taskDefinitionLog.setWorkerGroup(task.get("workerGroup").toString()); + long taskCode = SnowFlakeUtils.getInstance().nextId(); + taskDefinitionLog.setCode(taskCode); + taskDefinitionLog.setVersion(Constants.VERSION_FIRST); + taskDefinitionLog.setProjectCode(processDefinition.getProjectCode()); + taskDefinitionLog.setUserId(processDefinition.getUserId()); + taskDefinitionLog.setEnvironmentCode(-1); + taskDefinitionLog.setDelayTime(0); + taskDefinitionLog.setOperator(1); + taskDefinitionLog.setOperateTime(now); + taskDefinitionLog.setCreateTime(now); + taskDefinitionLog.setUpdateTime(now); + taskDefinitionLogs.add(taskDefinitionLog); + taskIdCodeMap.put(task.get("id").toString(), taskCode); + List preTasks = JSONUtils.toList(task.get("preTasks").toString(), String.class); + taskNamePreMap.put(name, preTasks); + taskNameCodeMap.put(name, taskCode); + } + processDefinition.setLocations(convertLocations(processDefinition.getLocations(), taskIdCodeMap)); + ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(processDefinition); + processDefinitionLog.setOperator(1); + processDefinitionLog.setOperateTime(now); + processDefinitionLog.setUpdateTime(now); + processDefinitionLogs.add(processDefinitionLog); + handleProcessTaskRelation(taskNamePreMap, taskNameCodeMap, processDefinition, processTaskRelationLogs); + } + } + + private String convertLocations(String locations, Map taskIdCodeMap) { + if (StringUtils.isBlank(locations)) { + return locations; + } + Map locationsMap = JSONUtils.toMap(locations); + JsonNodeFactory factory = new JsonNodeFactory(false); + ArrayNode jsonNodes = factory.arrayNode(); + for (Map.Entry entry : locationsMap.entrySet()) { + ObjectNode nodes = factory.objectNode(); + nodes.put("taskCode", taskIdCodeMap.get(entry.getKey())); + ObjectNode oldNodes = JSONUtils.parseObject(entry.getValue()); + nodes.put("x", oldNodes.get("x").asInt()); + nodes.put("y", oldNodes.get("y").asInt()); + jsonNodes.add(nodes); + } + return jsonNodes.toString(); + } + + private void handleProcessTaskRelation(Map> taskNamePreMap, + Map taskNameCodeMap, + ProcessDefinition processDefinition, + List processTaskRelationLogs) { + Date now = new Date(); + for (Map.Entry> entry : taskNamePreMap.entrySet()) { + List entryValue = entry.getValue(); + if (CollectionUtils.isNotEmpty(entryValue)) { + for (String preTaskName : entryValue) { + ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); + processTaskRelationLog.setPreTaskCode(taskNameCodeMap.get(preTaskName)); + processTaskRelationLog.setPreTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLogs.add(processTaskRelationLog); + } + } else { + ProcessTaskRelationLog processTaskRelationLog = setProcessTaskRelationLog(processDefinition, now); + processTaskRelationLog.setPreTaskCode(0); + processTaskRelationLog.setPreTaskVersion(0); + processTaskRelationLog.setPostTaskCode(taskNameCodeMap.get(entry.getKey())); + processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST); + processTaskRelationLogs.add(processTaskRelationLog); + } + } + } + private ProcessTaskRelationLog setProcessTaskRelationLog(ProcessDefinition processDefinition, Date now) { + ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(); + processTaskRelationLog.setProjectCode(processDefinition.getProjectCode()); + processTaskRelationLog.setProcessDefinitionCode(processDefinition.getCode()); + processTaskRelationLog.setProcessDefinitionVersion(processDefinition.getVersion()); + processTaskRelationLog.setConditionType(ConditionType.NONE); + processTaskRelationLog.setConditionParams("{}"); + processTaskRelationLog.setOperator(1); + processTaskRelationLog.setOperateTime(now); + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + return processTaskRelationLog; } } diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql index 9dbd6a0747..2b8d4942ce 100644 --- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl.sql @@ -400,7 +400,7 @@ alter table t_ds_schedules add environment_code bigint(20) DEFAULT '-1' COMMENT -- t_ds_process_definition alter table t_ds_process_definition add `code` bigint(20) NOT NULL COMMENT 'encoding' AFTER `id`; -alter table t_ds_process_definition add `project_code` bigint(20) NOT NULL COMMENT 'encoding' AFTER `project_id`; +alter table t_ds_process_definition change project_id project_code bigint(20) NOT NULL COMMENT 'project code' AFTER `description`; alter table t_ds_process_definition add `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id' AFTER `locations`; alter table t_ds_process_definition add UNIQUE KEY `process_unique` (`name`,`project_code`) USING BTREE; alter table t_ds_process_definition modify `description` text COMMENT 'description' after `version`; diff --git a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql index b0f00a0b06..5f3f65fba4 100644 --- a/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql +++ b/sql/upgrade/2.0.0_schema/mysql/dolphinscheduler_ddl_post.sql @@ -19,7 +19,6 @@ alter table t_ds_process_definition drop primary key; ALTER TABLE t_ds_process_definition ADD PRIMARY KEY (`id`,`code`); ALTER TABLE t_ds_process_definition drop KEY `process_definition_unique`; ALTER TABLE t_ds_process_definition drop KEY `process_definition_index`; -alter table t_ds_process_definition drop project_id; alter table t_ds_process_definition drop process_definition_json; alter table t_ds_process_definition drop connects; alter table t_ds_process_definition drop receivers;