From 5d34d9e270f98d8ea70cc162807f16c4b93cf289 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Mon, 4 Jul 2022 13:41:44 +0800 Subject: [PATCH] cherry pick #9107 (#10756) --- .../service/impl/ResourcesServiceImpl.java | 94 ++++++++++++++---- .../dao/upgrade/DolphinSchedulerManager.java | 8 +- .../dao/upgrade/ResourceDao.java | 96 ++++++++++++++++++- .../dao/upgrade/UpgradeDao.java | 57 ++++++++--- 4 files changed, 221 insertions(+), 34 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 251b8039ee..48534e63a9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -17,13 +17,10 @@ package org.apache.dolphinscheduler.api.service.impl; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.google.common.io.Files; -import org.apache.commons.beanutils.BeanMap; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.ALIAS; +import static org.apache.dolphinscheduler.common.Constants.CONTENT; +import static org.apache.dolphinscheduler.common.Constants.JAR; + import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.dto.resources.filter.ResourceFilter; import org.apache.dolphinscheduler.api.dto.resources.visitor.ResourceTreeVisitor; @@ -40,10 +37,38 @@ import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.dao.mapper.*; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.ResourcesUser; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import org.apache.commons.beanutils.BeanMap; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -52,13 +77,11 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; -import java.io.IOException; -import java.text.MessageFormat; -import java.util.*; -import java.util.regex.Matcher; -import java.util.stream.Collectors; - -import static org.apache.dolphinscheduler.common.Constants.*; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.google.common.base.Joiner; +import com.google.common.io.Files; /** * resources service impl @@ -198,6 +221,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe try { resourcesMapper.insert(resource); + updateParentResourceSize(resource, resource.getSize()); putMsg(result, Status.SUCCESS); Map dataMap = new BeanMap(resource); Map resultMap = new HashMap<>(); @@ -221,6 +245,33 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return result; } + /** + * update the folder's size of the resource + * + * @param resource the current resource + * @param size size + */ + private void updateParentResourceSize(Resource resource, long size) { + if (resource.getSize() > 0) { + String[] splits = resource.getFullName().split("/"); + for (int i = 1; i < splits.length; i++) { + String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits, 0, i)); + if (StringUtils.isNotBlank(parentFullName)) { + List resources = resourcesMapper.queryResource(parentFullName, resource.getType().ordinal()); + if (CollectionUtils.isNotEmpty(resources)) { + Resource parentResource = resources.get(0); + if (parentResource.getSize() + size >= 0) { + parentResource.setSize(parentResource.getSize() + size); + } else { + parentResource.setSize(0L); + } + resourcesMapper.updateById(parentResource); + } + } + } + } + } + /** * check resource is exists * @@ -338,6 +389,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe // updateResource data Date now = new Date(); + long originFileSize = resource.getSize(); resource.setAlias(name); resource.setFileName(name); @@ -423,6 +475,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe throw new ServiceException(String.format("delete resource: %s failed.", originFullName)); } } + + updateParentResourceSize(resource, resource.getSize() - originFileSize); return result; } @@ -705,11 +759,15 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String hdfsFilename = HadoopUtils.getHdfsFileName(resource.getType(), tenantCode, resource.getFullName()); //delete data in database + resourcesMapper.selectBatchIds(Arrays.asList(needDeleteResourceIdArray)).forEach(item -> { + updateParentResourceSize(item, item.getSize() * -1); + }); resourcesMapper.deleteIds(needDeleteResourceIdArray); resourceUserMapper.deleteResourceUserArray(0, needDeleteResourceIdArray); //delete file on hdfs HadoopUtils.getInstance().delete(hdfsFilename, true); + putMsg(result, Status.SUCCESS); return result; @@ -901,6 +959,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe Resource resource = new Resource(pid,name,fullName,false,desc,name,loginUser.getId(),type,content.getBytes().length,now,now); resourcesMapper.insert(resource); + updateParentResourceSize(resource, resource.getSize()); putMsg(result, Status.SUCCESS); Map dataMap = new BeanMap(resource); @@ -995,10 +1054,13 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe if (StringUtils.isEmpty(tenantCode)) { return result; } + long originFileSize = resource.getSize(); resource.setSize(content.getBytes().length); resource.setUpdateTime(new Date()); resourcesMapper.updateById(resource); + updateParentResourceSize(resource, resource.getSize() - originFileSize); + result = uploadContentToHdfs(resource.getFullName(), tenantCode, content); if (!result.getCode().equals(Status.SUCCESS.getCode())) { throw new ServiceException(result.getMsg()); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java index 9ac55469af..6743acb2e5 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/DolphinSchedulerManager.java @@ -75,6 +75,7 @@ public class DolphinSchedulerManager { logger.info("Start initializing the DolphinScheduler manager table structure"); upgradeDao.initSchema(); } + public void upgradeDolphinScheduler() throws IOException { // Gets a list of all upgrades List schemaList = SchemaUtils.getAllSchemaList(); @@ -97,12 +98,13 @@ public class DolphinSchedulerManager { } // The target version of the upgrade String schemaVersion = ""; + String currentVersion = version; for (String schemaDir : schemaList) { schemaVersion = schemaDir.split("_")[0]; if (SchemaUtils.isAGreatVersion(schemaVersion, version)) { logger.info("upgrade DolphinScheduler metadata version from {} to {}", version, schemaVersion); logger.info("Begin upgrading DolphinScheduler's table structure"); - upgradeDao.upgradeDolphinScheduler(schemaDir); + upgradeDao.upgradeDolphinScheduler(schemaDir); if ("1.3.0".equals(schemaVersion)) { upgradeDao.upgradeDolphinSchedulerWorkerGroup(); } else if ("1.3.2".equals(schemaVersion)) { @@ -113,6 +115,10 @@ public class DolphinSchedulerManager { version = schemaVersion; } } + + if (SchemaUtils.isAGreatVersion("2.0.6", currentVersion) && SchemaUtils.isAGreatVersion(SchemaUtils.getSoftVersion(), currentVersion)) { + upgradeDao.upgradeDolphinSchedulerResourceFileSize(); + } } // Assign the value of the version field in the version table to the version of the product diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java index 49c0e80c48..7d587e708f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/ResourceDao.java @@ -18,14 +18,21 @@ package org.apache.dolphinscheduler.dao.upgrade; import org.apache.dolphinscheduler.common.utils.ConnectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; /** * resource dao @@ -65,4 +72,89 @@ public class ResourceDao { return resourceMap; } + /** + * list all resources by the type + * + * @param conn connection + * @return map that key is full_name and value is the folder's size + */ + private Map listAllResourcesByFileType(Connection conn, int type) { + Map resourceSizeMap = new HashMap<>(); + + String sql = String.format("SELECT full_name, type, size, is_directory FROM t_ds_resources where type = %d", type); + ResultSet rs = null; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + rs = pstmt.executeQuery(); + + while (rs.next()) { + String fullName = rs.getString("full_name"); + Boolean isDirectory = rs.getBoolean("is_directory"); + long fileSize = rs.getLong("size"); + + if (StringUtils.isNotBlank(fullName) && !isDirectory) { + String[] splits = fullName.split("/"); + for (int i = 1; i < splits.length; i++) { + String parentFullName = Joiner.on("/").join(Arrays.copyOfRange(splits,0, splits.length - i)); + if (StringUtils.isNotEmpty(parentFullName)) { + long size = resourceSizeMap.getOrDefault(parentFullName, 0L); + resourceSizeMap.put(parentFullName, size + fileSize); + } + } + } + } + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + if (Objects.nonNull(pstmt)) { + try { + if (!pstmt.isClosed()) { + pstmt.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + } + return resourceSizeMap; + } + + /** + * update the folder's size + * + * @param conn connection + */ + public void updateResourceFolderSizeByFileType(Connection conn, int type) { + Map resourceSizeMap = listAllResourcesByFileType(conn, type); + + String sql = "UPDATE t_ds_resources SET size=? where type=? and full_name=? and is_directory = true"; + PreparedStatement pstmt = null; + try { + pstmt = conn.prepareStatement(sql); + for (Map.Entry entry : resourceSizeMap.entrySet()) { + pstmt.setLong(1, entry.getValue()); + pstmt.setInt(2, type); + pstmt.setString(3, entry.getKey()); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException("sql: " + sql, e); + } finally { + if (Objects.nonNull(pstmt)) { + try { + if (!pstmt.isClosed()) { + pstmt.close(); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } + ConnectionUtils.releaseResource(conn); + } + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java index da9c2e0638..0cc571bf02 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java @@ -17,15 +17,12 @@ package org.apache.dolphinscheduler.dao.upgrade; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.ConditionType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskType; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils; @@ -37,12 +34,10 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.spi.enums.DbType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.core.io.ClassPathResource; -import org.springframework.core.io.Resource; -import javax.sql.DataSource; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; @@ -51,9 +46,27 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + public abstract class UpgradeDao { public static final Logger logger = LoggerFactory.getLogger(UpgradeDao.class); private static final String T_VERSION_NAME = "t_escheduler_version"; @@ -150,6 +163,21 @@ public abstract class UpgradeDao { upgradeDolphinSchedulerDDL(schemaDir, "dolphinscheduler_ddl_post.sql"); } + /** + * upgrade DolphinScheduler to 2.0.6 + */ + public void upgradeDolphinSchedulerResourceFileSize() { + ResourceDao resourceDao = new ResourceDao(); + try { + // update the size of the folder that is the type of file. + resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 0); + // update the size of the folder that is the type of udf. + resourceDao.updateResourceFolderSizeByFileType(dataSource.getConnection(), 1); + } catch (Exception ex) { + logger.error("Failed to upgrade because of failing to update the folder's size of resource files."); + } + } + /** * updateProcessDefinitionJsonWorkerGroup */ @@ -344,7 +372,6 @@ public abstract class UpgradeDao { } } - /** * update version *