Browse Source

[Fix-3423][dao][sql]Fixed that the resource file of the task node can't be found when upgrade from 1.2.0 to 1.3.x (#3454)

1.3.2-release
lgcareer 4 years ago committed by GitHub
parent
commit
efef631b68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java
  2. 67
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java
  3. 71
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  4. 16
      sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_ddl.sql
  5. 19
      sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_dml.sql
  6. 16
      sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_ddl.sql
  7. 17
      sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_dml.sql

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java

@ -117,6 +117,8 @@ public class DolphinSchedulerManager {
upgradeDao.upgradeDolphinScheduler(schemaDir);
if ("1.3.0".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerWorkerGroup();
} else if ("1.3.2".equals(schemaVersion)) {
upgradeDao.upgradeDolphinSchedulerResourceList();
}
version = schemaVersion;
}

67
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.upgrade;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
/**
* resource dao
*/
public class ResourceDao {
public static final Logger logger = LoggerFactory.getLogger(ProcessDefinitionDao.class);
/**
* list all resources
* @param conn connection
* @return map that key is full_name and value is id
*/
Map<String,Integer> listAllResources(Connection conn){
Map<String,Integer> resourceMap = new HashMap<>();
String sql = String.format("SELECT id,full_name FROM t_ds_resources");
ResultSet rs = null;
PreparedStatement pstmt = null;
try {
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()){
Integer id = rs.getInt(1);
String fullName = rs.getString(2);
resourceMap.put(fullName,id);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + sql, e);
} finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return resourceMap;
}
}

71
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -19,10 +19,8 @@ package org.apache.dolphinscheduler.dao.upgrade;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.SchemaUtils;
import org.apache.dolphinscheduler.common.utils.ScriptRunner;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AbstractBaseDao;
import org.apache.dolphinscheduler.dao.datasource.ConnectionFactory;
import org.slf4j.Logger;
@ -36,7 +34,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class UpgradeDao extends AbstractBaseDao {
@ -270,6 +270,15 @@ public abstract class UpgradeDao extends AbstractBaseDao {
public void upgradeDolphinSchedulerWorkerGroup() {
updateProcessDefinitionJsonWorkerGroup();
}
/**
* upgrade DolphinScheduler resource list
* ds-1.3.2 modify the resource list for process definition json
*/
public void upgradeDolphinSchedulerResourceList() {
updateProcessDefinitionJsonResourceList();
}
/**
* updateProcessDefinitionJsonWorkerGroup
*/
@ -288,7 +297,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
for (int i = 0 ;i < tasks.size() ; i++){
JSONObject task = tasks.getJSONObject(i);
Integer workerGroupId = task.getInteger("workerGroupId");
if (workerGroupId == -1) {
if (workerGroupId == null || workerGroupId == -1) {
task.put("workerGroup", "default");
}else {
task.put("workerGroup", oldWorkerGroupMap.get(workerGroupId));
@ -310,6 +319,58 @@ public abstract class UpgradeDao extends AbstractBaseDao {
}
/**
* updateProcessDefinitionJsonResourceList
*/
protected void updateProcessDefinitionJsonResourceList(){
ResourceDao resourceDao = new ResourceDao();
ProcessDefinitionDao processDefinitionDao = new ProcessDefinitionDao();
Map<Integer,String> replaceProcessDefinitionMap = new HashMap<>();
try {
Map<String,Integer> resourcesMap = resourceDao.listAllResources(dataSource.getConnection());
Map<Integer,String> processDefinitionJsonMap = processDefinitionDao.queryAllProcessDefinition(dataSource.getConnection());
for (Map.Entry<Integer,String> entry : processDefinitionJsonMap.entrySet()){
JSONObject jsonObject = JSONObject.parseObject(entry.getValue());
JSONArray tasks = JSONArray.parseArray(jsonObject.getString("tasks"));
for (int i = 0 ;i < tasks.size() ; i++){
JSONObject task = tasks.getJSONObject(i);
JSONObject param = (JSONObject) task.get("params");
if (param != null) {
List<ResourceInfo> resourceList = JSONUtils.toList(param.getString("resourceList"), ResourceInfo.class);
if (CollectionUtils.isNotEmpty(resourceList)) {
List<ResourceInfo> newResourceList = resourceList.stream().map(resInfo -> {
String fullName = resInfo.getRes().startsWith("/") ? resInfo.getRes() : String.format("/%s",resInfo.getRes());
if (resInfo.getId() == 0 && resourcesMap.containsKey(fullName)) {
resInfo.setId(resourcesMap.get(fullName));
}
return resInfo;
}).collect(Collectors.toList());
param.put("resourceList",JSONArray.parse(JSONObject.toJSONString(newResourceList)));
}
}
task.put("params",param);
}
jsonObject.remove(jsonObject.getString("tasks"));
jsonObject.put("tasks",tasks);
replaceProcessDefinitionMap.put(entry.getKey(),jsonObject.toJSONString());
}
if (replaceProcessDefinitionMap.size() > 0){
processDefinitionDao.updateProcessDefinitionJson(dataSource.getConnection(),replaceProcessDefinitionMap);
}
}catch (Exception e){
logger.error("update process definition json resource list error",e);
}
}
/**
* upgradeDolphinScheduler DML
* @param schemaDir schemaDir

16
sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_ddl.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

19
sql/upgrade/1.3.2_schema/mysql/dolphinscheduler_dml.sql

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
SET FOREIGN_KEY_CHECKS=0;
UPDATE t_ds_user SET phone = '' WHERE phone = 'xx';

16
sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_ddl.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

17
sql/upgrade/1.3.2_schema/postgresql/dolphinscheduler_dml.sql

@ -0,0 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
UPDATE t_ds_user SET phone = '' WHERE phone = 'xx';
Loading…
Cancel
Save