diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java index 92113a5a67..a697f22ea5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java @@ -20,6 +20,7 @@ import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.UserType; +import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.dao.model.User; import org.apache.commons.lang3.StringUtils; @@ -110,4 +111,20 @@ public class BaseService { return null; } + + /** + * create tenant dir if not exists + * @param tenantCode + * @throws Exception + */ + protected void createTenantDirIfNotExists(String tenantCode)throws Exception{ + + String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); + String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode); + /** + * init resource path and udf path + */ + HadoopUtils.getInstance().mkdir(resourcePath); + HadoopUtils.getInstance().mkdir(udfsPath); + } } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java index 46a175ec75..558b3d21d5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java @@ -49,21 +49,22 @@ public class LoggerService { */ public Result queryLog(int taskInstId, int skipLineNum, int limit) { + TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); String host = taskInstance.getHost(); if(StringUtils.isEmpty(host)){ return new Result(Status.TASK_INSTANCE_HOST_NOT_FOUND.getCode(), Status.TASK_INSTANCE_HOST_NOT_FOUND.getMsg()); } - logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); + Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - if(host != null){ - LogClient logClient = new LogClient(host, Constants.RPC_PORT); - String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); - result.setData(log); - logger.info(log); - } + logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); + + LogClient logClient = new LogClient(host, Constants.RPC_PORT); + String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); + result.setData(log); + logger.info(log); return result; } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java index 30d8f827aa..ca369de462 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java @@ -45,12 +45,6 @@ public class ProjectService extends BaseService{ private static final Logger logger = LoggerFactory.getLogger(ProjectService.class); - @Autowired - private UserMapper userMapper; - - @Autowired - private UsersService userService; - @Autowired private ProjectMapper projectMapper; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java index 651d9603f4..5ec417987f 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java @@ -339,19 +339,18 @@ public class ResourcesService extends BaseService { String resourcePath = ""; if (type.equals(ResourceType.FILE)) { hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name); - resourcePath = HadoopUtils.getHdfsDir(tenantCode); + resourcePath = HadoopUtils.getHdfsResDir(tenantCode); } else if (type.equals(ResourceType.UDF)) { hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name); resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode); } try { - if (HadoopUtils.getInstance().exists(resourcePath)) { - cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename); - HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true); - } else { - logger.error("{} is not exist", resourcePath); - return false; + // if tenant dir not exists + if (!HadoopUtils.getInstance().exists(resourcePath)) { + createTenantDirIfNotExists(tenantCode); } + cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename); + HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true); } catch (Exception e) { logger.error(e.getMessage(), e); return false; @@ -678,7 +677,7 @@ public class ResourcesService extends BaseService { // get file hdfs path hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName); - String resourcePath = HadoopUtils.getHdfsDir(tenantCode); + String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); logger.info("resource hdfs path is {} ", hdfsFileName); HadoopUtils hadoopUtils = HadoopUtils.getInstance(); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java index 21d5f270fb..be5ab71c72 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java @@ -97,13 +97,7 @@ public class TenantService extends BaseService{ // if hdfs startup if (PropertyUtils.getResUploadStartupState()){ - String resourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + tenantCode + "/resources"; - String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode); - /** - * init resource path and udf path - */ - HadoopUtils.getInstance().mkdir(resourcePath); - HadoopUtils.getInstance().mkdir(udfsPath); + createTenantDirIfNotExists(tenantCode); } putMsg(result, Status.SUCCESS); @@ -240,7 +234,7 @@ public class TenantService extends BaseService{ String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode(); if (HadoopUtils.getInstance().exists(tenantPath)){ - String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode()); + String resourcePath = HadoopUtils.getHdfsResDir(tenant.getTenantCode()); FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath); if (fileStatus.length > 0) { putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java index 8004117e91..e287a19a78 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java @@ -124,10 +124,13 @@ public class UsersService extends BaseService { userMapper.insert(user); Tenant tenant = tenantMapper.queryById(tenantId); - // if hdfs startup + // resource upload startup if (PropertyUtils.getResUploadStartupState()){ - String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode() + "/home/" + user.getId(); - + // if tenant not exists + if (!HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(tenant.getTenantCode()))){ + createTenantDirIfNotExists(tenant.getTenantCode()); + } + String userPath = HadoopUtils.getHdfsUserDir(tenant.getTenantCode(),user.getId()); HadoopUtils.getInstance().mkdir(userPath); } @@ -247,11 +250,12 @@ public class UsersService extends BaseService { // if hdfs startup if (PropertyUtils.getResUploadStartupState() && oldTenant != null){ String newTenantCode = newTenant.getTenantCode(); - String oldResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/resources"; + String oldResourcePath = HadoopUtils.getHdfsResDir(oldTenant.getTenantCode()); String oldUdfsPath = HadoopUtils.getHdfsUdfDir(oldTenant.getTenantCode()); + // if old tenant dir exists if (HadoopUtils.getInstance().exists(oldResourcePath)){ - String newResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenantCode + "/resources"; + String newResourcePath = HadoopUtils.getHdfsResDir(newTenantCode); String newUdfsPath = HadoopUtils.getHdfsUdfDir(newTenantCode); //file resources list @@ -271,13 +275,22 @@ public class UsersService extends BaseService { } //Delete the user from the old tenant directory - String oldUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/home/" + userId; + String oldUserPath = HadoopUtils.getHdfsUserDir(oldTenant.getTenantCode(),userId); HadoopUtils.getInstance().delete(oldUserPath, true); + }else { + // if old tenant dir not exists , create + createTenantDirIfNotExists(oldTenant.getTenantCode()); + } + + if (HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(newTenant.getTenantCode()))){ + //create user in the new tenant directory + String newUserPath = HadoopUtils.getHdfsUserDir(newTenant.getTenantCode(),user.getId()); + HadoopUtils.getInstance().mkdir(newUserPath); + }else { + // if new tenant dir not exists , create + createTenantDirIfNotExists(newTenant.getTenantCode()); } - //create user in the new tenant directory - String newUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenant.getTenantCode() + "/home/" + user.getId(); - HadoopUtils.getInstance().mkdir(newUserPath); } } user.setTenantId(tenantId); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java index 1db1bcb48d..5df1601661 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java @@ -136,7 +136,7 @@ public class WorkerGroupService extends BaseService { Map result = new HashMap<>(5); - int delete = workerGroupMapper.deleteById(id); + workerGroupMapper.deleteById(id); putMsg(result, Status.SUCCESS); return result; } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java index 6f3e5e2198..1ff41c37f7 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java @@ -410,12 +410,22 @@ public class HadoopUtils implements Closeable { * @param tenantCode tenant code * @return hdfs resource dir */ - public static String getHdfsDir(String tenantCode) { + public static String getHdfsResDir(String tenantCode) { return String.format("%s/resources", getHdfsTenantDir(tenantCode)); } /** - * get udf dir on hdfs + * hdfs user dir + * + * @param tenantCode tenant code + * @return hdfs resource dir + */ + public static String getHdfsUserDir(String tenantCode,int userId) { + return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId); + } + + /** + * hdfs udf dir * * @param tenantCode tenant code * @return get udf dir on hdfs @@ -432,7 +442,7 @@ public class HadoopUtils implements Closeable { * @return get absolute path and name for file on hdfs */ public static String getHdfsFilename(String tenantCode, String filename) { - return String.format("%s/%s", getHdfsDir(tenantCode), filename); + return String.format("%s/%s", getHdfsResDir(tenantCode), filename); } /** @@ -449,7 +459,7 @@ public class HadoopUtils implements Closeable { /** * @return file directory of tenants on hdfs */ - private static String getHdfsTenantDir(String tenantCode) { + public static String getHdfsTenantDir(String tenantCode) { return String.format("%s/%s", getHdfsDataBasePath(), tenantCode); }