Browse Source

Merge remote-tracking branch 'upstream/dev' into dev

pull/2/head
lenboo 5 years ago
parent
commit
9361cbbfdb
  1. 2
      README.md
  2. 2
      README_zh_CN.md
  3. 17
      escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java
  4. 7
      escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java
  5. 6
      escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java
  6. 24
      escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
  7. 10
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  8. 27
      escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
  9. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java
  10. 14
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  11. 18
      escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
  12. 2
      escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java

2
README.md

@ -4,7 +4,7 @@ Easy Scheduler
> Easy Scheduler for Big Data
[English](https://github.com/analysys/EasyScheduler/blob/dev/README.md) | [Chinese](https://github.com/analysys/EasyScheduler/blob/dev/README_zh_CN.md)
[English](README.md) | [Chinese](README_zh_CN.md)
### Design features:

2
README_zh_CN.md

@ -4,7 +4,7 @@ Easy Scheduler
> Easy Scheduler for Big Data
[中文](https://github.com/analysys/EasyScheduler/blob/dev/README_zh_CN.md) | [英文](https://github.com/analysys/EasyScheduler/blob/dev/README.md)
[中文](README_zh_CN.md) | [英文](README.md)
**设计特点:** 一个分布式易扩展的可视化DAG工作流任务调度系统。致力于解决数据处理流程中错综复杂的依赖关系,使调度系统在数据处理流程中`开箱即用`。
其主要目标如下:

17
escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java

@ -20,6 +20,7 @@ import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.utils.HadoopUtils;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
@ -110,4 +111,20 @@ public class BaseService {
return null;
}
/**
* create tenant dir if not exists
* @param tenantCode
* @throws Exception
*/
protected void createTenantDirIfNotExists(String tenantCode)throws Exception{
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode);
/**
* init resource path and udf path
*/
HadoopUtils.getInstance().mkdir(resourcePath);
HadoopUtils.getInstance().mkdir(udfsPath);
}
}

7
escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java

@ -49,21 +49,22 @@ public class LoggerService {
*/
public Result queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
String host = taskInstance.getHost();
if(StringUtils.isEmpty(host)){
return new Result(Status.TASK_INSTANCE_HOST_NOT_FOUND.getCode(), Status.TASK_INSTANCE_HOST_NOT_FOUND.getMsg());
}
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
if(host != null){
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
LogClient logClient = new LogClient(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
}
return result;
}

6
escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java

@ -45,12 +45,6 @@ public class ProjectService extends BaseService{
private static final Logger logger = LoggerFactory.getLogger(ProjectService.class);
@Autowired
private UserMapper userMapper;
@Autowired
private UsersService userService;
@Autowired
private ProjectMapper projectMapper;

24
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java

@ -339,19 +339,18 @@ public class ResourcesService extends BaseService {
String resourcePath = "";
if (type.equals(ResourceType.FILE)) {
hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsDir(tenantCode);
resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
} else if (type.equals(ResourceType.UDF)) {
hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode);
}
try {
if (HadoopUtils.getInstance().exists(resourcePath)) {
// if tenant dir not exists
if (!HadoopUtils.getInstance().exists(resourcePath)) {
createTenantDirIfNotExists(tenantCode);
}
cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename);
HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true);
} else {
logger.error("{} is not exist", resourcePath);
return false;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
@ -678,22 +677,19 @@ public class ResourcesService extends BaseService {
// get file hdfs path
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName);
String resourcePath = HadoopUtils.getHdfsDir(tenantCode);
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
logger.info("resource hdfs path is {} ", hdfsFileName);
HadoopUtils hadoopUtils = HadoopUtils.getInstance();
if (hadoopUtils.exists(resourcePath)) {
if (!hadoopUtils.exists(resourcePath)) {
// create if tenant dir not exists
createTenantDirIfNotExists(tenantCode);
}
if (hadoopUtils.exists(hdfsFileName)) {
hadoopUtils.delete(hdfsFileName, false);
}
hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true);
} else {
logger.error("{} is not exist", resourcePath);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
result.setMsg(String.format("%s is not exist", resourcePath));
return result;
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode());

10
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -97,13 +97,7 @@ public class TenantService extends BaseService{
// if hdfs startup
if (PropertyUtils.getResUploadStartupState()){
String resourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + tenantCode + "/resources";
String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode);
/**
* init resource path and udf path
*/
HadoopUtils.getInstance().mkdir(resourcePath);
HadoopUtils.getInstance().mkdir(udfsPath);
createTenantDirIfNotExists(tenantCode);
}
putMsg(result, Status.SUCCESS);
@ -240,7 +234,7 @@ public class TenantService extends BaseService{
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
if (HadoopUtils.getInstance().exists(tenantPath)){
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
String resourcePath = HadoopUtils.getHdfsResDir(tenant.getTenantCode());
FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath);
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS);

27
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java

@ -124,10 +124,13 @@ public class UsersService extends BaseService {
userMapper.insert(user);
Tenant tenant = tenantMapper.queryById(tenantId);
// if hdfs startup
// resource upload startup
if (PropertyUtils.getResUploadStartupState()){
String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode() + "/home/" + user.getId();
// if tenant not exists
if (!HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(tenant.getTenantCode()))){
createTenantDirIfNotExists(tenant.getTenantCode());
}
String userPath = HadoopUtils.getHdfsUserDir(tenant.getTenantCode(),user.getId());
HadoopUtils.getInstance().mkdir(userPath);
}
@ -247,11 +250,12 @@ public class UsersService extends BaseService {
// if hdfs startup
if (PropertyUtils.getResUploadStartupState() && oldTenant != null){
String newTenantCode = newTenant.getTenantCode();
String oldResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/resources";
String oldResourcePath = HadoopUtils.getHdfsResDir(oldTenant.getTenantCode());
String oldUdfsPath = HadoopUtils.getHdfsUdfDir(oldTenant.getTenantCode());
// if old tenant dir exists
if (HadoopUtils.getInstance().exists(oldResourcePath)){
String newResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenantCode + "/resources";
String newResourcePath = HadoopUtils.getHdfsResDir(newTenantCode);
String newUdfsPath = HadoopUtils.getHdfsUdfDir(newTenantCode);
//file resources list
@ -271,13 +275,22 @@ public class UsersService extends BaseService {
}
//Delete the user from the old tenant directory
String oldUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/home/" + userId;
String oldUserPath = HadoopUtils.getHdfsUserDir(oldTenant.getTenantCode(),userId);
HadoopUtils.getInstance().delete(oldUserPath, true);
}else {
// if old tenant dir not exists , create
createTenantDirIfNotExists(oldTenant.getTenantCode());
}
if (HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(newTenant.getTenantCode()))){
//create user in the new tenant directory
String newUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenant.getTenantCode() + "/home/" + user.getId();
String newUserPath = HadoopUtils.getHdfsUserDir(newTenant.getTenantCode(),user.getId());
HadoopUtils.getInstance().mkdir(newUserPath);
}else {
// if new tenant dir not exists , create
createTenantDirIfNotExists(newTenant.getTenantCode());
}
}
}
user.setTenantId(tenantId);

2
escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java

@ -136,7 +136,7 @@ public class WorkerGroupService extends BaseService {
Map<String, Object> result = new HashMap<>(5);
int delete = workerGroupMapper.deleteById(id);
workerGroupMapper.deleteById(id);
putMsg(result, Status.SUCCESS);
return result;
}

14
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -17,20 +17,26 @@
package cn.escheduler.common.queue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implemention
@ -39,7 +45,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
private static TaskQueueZkImpl instance;
private static volatile TaskQueueZkImpl instance;
private TaskQueueZkImpl(){
init();

18
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java

@ -410,12 +410,22 @@ public class HadoopUtils implements Closeable {
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(String tenantCode) {
public static String getHdfsResDir(String tenantCode) {
return String.format("%s/resources", getHdfsTenantDir(tenantCode));
}
/**
* get udf dir on hdfs
* hdfs user dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsUserDir(String tenantCode,int userId) {
return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId);
}
/**
* hdfs udf dir
*
* @param tenantCode tenant code
* @return get udf dir on hdfs
@ -432,7 +442,7 @@ public class HadoopUtils implements Closeable {
* @return get absolute path and name for file on hdfs
*/
public static String getHdfsFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsDir(tenantCode), filename);
return String.format("%s/%s", getHdfsResDir(tenantCode), filename);
}
/**
@ -449,7 +459,7 @@ public class HadoopUtils implements Closeable {
/**
* @return file directory of tenants on hdfs
*/
private static String getHdfsTenantDir(String tenantCode) {
public static String getHdfsTenantDir(String tenantCode) {
return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
}

2
escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java

@ -45,7 +45,7 @@ public class CommonUtilsTest {
@Test
public void getHdfsDir() {
logger.info(HadoopUtils.getHdfsDir("1234"));
logger.info(HadoopUtils.getHdfsResDir("1234"));
}
@Test

Loading…
Cancel
Save