From 6493c9a6b8aa5f55a60f1c1700f6da215c226d55 Mon Sep 17 00:00:00 2001 From: Lu <877856611@qq.com> Date: Thu, 18 Jul 2019 19:12:39 +0800 Subject: [PATCH 1/6] =?UTF-8?q?double=20check=20=E5=8F=98=E9=87=8F?= =?UTF-8?q?=E9=9C=80=E8=A6=81volatile=E4=BF=AE=E9=A5=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../escheduler/common/queue/TaskQueueZkImpl.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index de37b5ffce..2febb6ee13 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -17,20 +17,26 @@ package cn.escheduler.common.queue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + import cn.escheduler.common.Constants; import cn.escheduler.common.utils.Bytes; import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; -import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; - /** * A singleton of a task queue implemented with zookeeper * tasks queue implemention @@ -39,7 +45,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class); - private static TaskQueueZkImpl instance; + private static volatile TaskQueueZkImpl instance; private TaskQueueZkImpl(){ init(); From fb5c864641b648321441e96a04ef6709318127ac Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Mon, 22 Jul 2019 15:38:45 +0800 Subject: [PATCH 2/6] the resource upload switch is switched, and the file system is consistent. --- .../escheduler/api/service/BaseService.java | 17 ++++++++++ .../escheduler/api/service/LoggerService.java | 15 ++++----- .../api/service/ProjectService.java | 6 ---- .../api/service/ResourcesService.java | 15 +++++---- .../escheduler/api/service/TenantService.java | 10 ++---- .../escheduler/api/service/UsersService.java | 31 +++++++++++++------ .../api/service/WorkerGroupService.java | 2 +- .../escheduler/common/utils/HadoopUtils.java | 18 ++++++++--- 8 files changed, 71 insertions(+), 43 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java index 92113a5a67..a697f22ea5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java @@ -20,6 +20,7 @@ import cn.escheduler.api.enums.Status; import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.UserType; +import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.dao.model.User; import org.apache.commons.lang3.StringUtils; @@ -110,4 +111,20 @@ public class BaseService { return null; } + + /** + * create tenant dir if not exists + * @param tenantCode + * @throws Exception + */ + protected void createTenantDirIfNotExists(String tenantCode)throws Exception{ + + String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); + String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode); + /** + * init resource path and udf path + */ + HadoopUtils.getInstance().mkdir(resourcePath); + HadoopUtils.getInstance().mkdir(udfsPath); + } } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java index 46a175ec75..558b3d21d5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java @@ -49,21 +49,22 @@ public class LoggerService { */ public Result queryLog(int taskInstId, int skipLineNum, int limit) { + TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); String host = taskInstance.getHost(); if(StringUtils.isEmpty(host)){ return new Result(Status.TASK_INSTANCE_HOST_NOT_FOUND.getCode(), Status.TASK_INSTANCE_HOST_NOT_FOUND.getMsg()); } - logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); + Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - if(host != null){ - LogClient logClient = new LogClient(host, Constants.RPC_PORT); - String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); - result.setData(log); - logger.info(log); - } + logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); + + LogClient logClient = new LogClient(host, Constants.RPC_PORT); + String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit); + result.setData(log); + logger.info(log); return result; } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java index 30d8f827aa..ca369de462 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java @@ -45,12 +45,6 @@ public class ProjectService extends BaseService{ private static final Logger logger = LoggerFactory.getLogger(ProjectService.class); - @Autowired - private UserMapper userMapper; - - @Autowired - private UsersService userService; - @Autowired private ProjectMapper projectMapper; diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java index 651d9603f4..5ec417987f 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java @@ -339,19 +339,18 @@ public class ResourcesService extends BaseService { String resourcePath = ""; if (type.equals(ResourceType.FILE)) { hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name); - resourcePath = HadoopUtils.getHdfsDir(tenantCode); + resourcePath = HadoopUtils.getHdfsResDir(tenantCode); } else if (type.equals(ResourceType.UDF)) { hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name); resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode); } try { - if (HadoopUtils.getInstance().exists(resourcePath)) { - cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename); - HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true); - } else { - logger.error("{} is not exist", resourcePath); - return false; + // if tenant dir not exists + if (!HadoopUtils.getInstance().exists(resourcePath)) { + createTenantDirIfNotExists(tenantCode); } + cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename); + HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true); } catch (Exception e) { logger.error(e.getMessage(), e); return false; @@ -678,7 +677,7 @@ public class ResourcesService extends BaseService { // get file hdfs path hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName); - String resourcePath = HadoopUtils.getHdfsDir(tenantCode); + String resourcePath = HadoopUtils.getHdfsResDir(tenantCode); logger.info("resource hdfs path is {} ", hdfsFileName); HadoopUtils hadoopUtils = HadoopUtils.getInstance(); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java index 21d5f270fb..be5ab71c72 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java @@ -97,13 +97,7 @@ public class TenantService extends BaseService{ // if hdfs startup if (PropertyUtils.getResUploadStartupState()){ - String resourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + tenantCode + "/resources"; - String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode); - /** - * init resource path and udf path - */ - HadoopUtils.getInstance().mkdir(resourcePath); - HadoopUtils.getInstance().mkdir(udfsPath); + createTenantDirIfNotExists(tenantCode); } putMsg(result, Status.SUCCESS); @@ -240,7 +234,7 @@ public class TenantService extends BaseService{ String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode(); if (HadoopUtils.getInstance().exists(tenantPath)){ - String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode()); + String resourcePath = HadoopUtils.getHdfsResDir(tenant.getTenantCode()); FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath); if (fileStatus.length > 0) { putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java index 8004117e91..e287a19a78 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java @@ -124,10 +124,13 @@ public class UsersService extends BaseService { userMapper.insert(user); Tenant tenant = tenantMapper.queryById(tenantId); - // if hdfs startup + // resource upload startup if (PropertyUtils.getResUploadStartupState()){ - String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode() + "/home/" + user.getId(); - + // if tenant not exists + if (!HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(tenant.getTenantCode()))){ + createTenantDirIfNotExists(tenant.getTenantCode()); + } + String userPath = HadoopUtils.getHdfsUserDir(tenant.getTenantCode(),user.getId()); HadoopUtils.getInstance().mkdir(userPath); } @@ -247,11 +250,12 @@ public class UsersService extends BaseService { // if hdfs startup if (PropertyUtils.getResUploadStartupState() && oldTenant != null){ String newTenantCode = newTenant.getTenantCode(); - String oldResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/resources"; + String oldResourcePath = HadoopUtils.getHdfsResDir(oldTenant.getTenantCode()); String oldUdfsPath = HadoopUtils.getHdfsUdfDir(oldTenant.getTenantCode()); + // if old tenant dir exists if (HadoopUtils.getInstance().exists(oldResourcePath)){ - String newResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenantCode + "/resources"; + String newResourcePath = HadoopUtils.getHdfsResDir(newTenantCode); String newUdfsPath = HadoopUtils.getHdfsUdfDir(newTenantCode); //file resources list @@ -271,13 +275,22 @@ public class UsersService extends BaseService { } //Delete the user from the old tenant directory - String oldUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/home/" + userId; + String oldUserPath = HadoopUtils.getHdfsUserDir(oldTenant.getTenantCode(),userId); HadoopUtils.getInstance().delete(oldUserPath, true); + }else { + // if old tenant dir not exists , create + createTenantDirIfNotExists(oldTenant.getTenantCode()); + } + + if (HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(newTenant.getTenantCode()))){ + //create user in the new tenant directory + String newUserPath = HadoopUtils.getHdfsUserDir(newTenant.getTenantCode(),user.getId()); + HadoopUtils.getInstance().mkdir(newUserPath); + }else { + // if new tenant dir not exists , create + createTenantDirIfNotExists(newTenant.getTenantCode()); } - //create user in the new tenant directory - String newUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenant.getTenantCode() + "/home/" + user.getId(); - HadoopUtils.getInstance().mkdir(newUserPath); } } user.setTenantId(tenantId); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java index 1db1bcb48d..5df1601661 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java @@ -136,7 +136,7 @@ public class WorkerGroupService extends BaseService { Map result = new HashMap<>(5); - int delete = workerGroupMapper.deleteById(id); + workerGroupMapper.deleteById(id); putMsg(result, Status.SUCCESS); return result; } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java index 6f3e5e2198..1ff41c37f7 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java @@ -410,12 +410,22 @@ public class HadoopUtils implements Closeable { * @param tenantCode tenant code * @return hdfs resource dir */ - public static String getHdfsDir(String tenantCode) { + public static String getHdfsResDir(String tenantCode) { return String.format("%s/resources", getHdfsTenantDir(tenantCode)); } /** - * get udf dir on hdfs + * hdfs user dir + * + * @param tenantCode tenant code + * @return hdfs resource dir + */ + public static String getHdfsUserDir(String tenantCode,int userId) { + return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId); + } + + /** + * hdfs udf dir * * @param tenantCode tenant code * @return get udf dir on hdfs @@ -432,7 +442,7 @@ public class HadoopUtils implements Closeable { * @return get absolute path and name for file on hdfs */ public static String getHdfsFilename(String tenantCode, String filename) { - return String.format("%s/%s", getHdfsDir(tenantCode), filename); + return String.format("%s/%s", getHdfsResDir(tenantCode), filename); } /** @@ -449,7 +459,7 @@ public class HadoopUtils implements Closeable { /** * @return file directory of tenants on hdfs */ - private static String getHdfsTenantDir(String tenantCode) { + public static String getHdfsTenantDir(String tenantCode) { return String.format("%s/%s", getHdfsDataBasePath(), tenantCode); } From f4ffd2731af608f30a032eb0a658318072535722 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Mon, 22 Jul 2019 16:24:24 +0800 Subject: [PATCH 3/6] CommonUtilsTest update --- .../test/java/cn/escheduler/common/utils/CommonUtilsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java b/escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java index bac66a7f47..f4643e1847 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java @@ -45,7 +45,7 @@ public class CommonUtilsTest { @Test public void getHdfsDir() { - logger.info(HadoopUtils.getHdfsDir("1234")); + logger.info(HadoopUtils.getHdfsResDir("1234")); } @Test From fce623271c4b51984ed808c8fc272a95ed70b311 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Mon, 22 Jul 2019 17:27:58 +0800 Subject: [PATCH 4/6] =?UTF-8?q?resource=20online=20create=EF=BC=8Cif=20ten?= =?UTF-8?q?ant=20dir=20not=20exists=EF=BC=8Cthen=20create?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/service/ResourcesService.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java index 5ec417987f..81fc7107c8 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java @@ -681,18 +681,15 @@ public class ResourcesService extends BaseService { logger.info("resource hdfs path is {} ", hdfsFileName); HadoopUtils hadoopUtils = HadoopUtils.getInstance(); - if (hadoopUtils.exists(resourcePath)) { - if (hadoopUtils.exists(hdfsFileName)) { - hadoopUtils.delete(hdfsFileName, false); - } - - hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true); - } else { - logger.error("{} is not exist", resourcePath); - result.setCode(Status.HDFS_OPERATION_ERROR.getCode()); - result.setMsg(String.format("%s is not exist", resourcePath)); - return result; + if (!hadoopUtils.exists(resourcePath)) { + // create if tenant dir not exists + createTenantDirIfNotExists(tenantCode); + } + if (hadoopUtils.exists(hdfsFileName)) { + hadoopUtils.delete(hdfsFileName, false); } + + hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true); } catch (Exception e) { logger.error(e.getMessage(), e); result.setCode(Status.HDFS_OPERATION_ERROR.getCode()); From 5d69f3f2efe7dcbe60cdb063b7ae9b04051b14d6 Mon Sep 17 00:00:00 2001 From: easyscheduler Date: Thu, 25 Jul 2019 10:12:10 +0800 Subject: [PATCH 5/6] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a6b2879783..b36606966c 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Easy Scheduler > Easy Scheduler for Big Data -[English](https://github.com/analysys/EasyScheduler/blob/dev/README.md) | [Chinese](https://github.com/analysys/EasyScheduler/blob/dev/README_zh_CN.md) +[English](README.md) | [Chinese](README_zh_CN.md) ### Design features: From be6292d50122126065ee0ea071e314e567fadadc Mon Sep 17 00:00:00 2001 From: easyscheduler Date: Thu, 25 Jul 2019 10:12:35 +0800 Subject: [PATCH 6/6] Update README_zh_CN.md --- README_zh_CN.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_zh_CN.md b/README_zh_CN.md index 0ff1568fee..fa94152f4b 100644 --- a/README_zh_CN.md +++ b/README_zh_CN.md @@ -4,7 +4,7 @@ Easy Scheduler > Easy Scheduler for Big Data -[中文](https://github.com/analysys/EasyScheduler/blob/dev/README_zh_CN.md) | [英文](https://github.com/analysys/EasyScheduler/blob/dev/README.md) +[中文](README_zh_CN.md) | [英文](README.md) **设计特点:** 一个分布式易扩展的可视化DAG工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中`开箱即用`。 其主要目标如下: