Browse Source

Merge pull request #617 from qiaozhanwei/dev

the resource upload switch is switched, and the file system is consistent
pull/2/head
乔占卫 6 years ago committed by GitHub
parent
commit
c3065e416d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java
  2. 15
      escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java
  3. 6
      escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java
  4. 34
      escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
  5. 10
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  6. 31
      escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
  7. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java
  8. 18
      escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
  9. 2
      escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java

17
escheduler-api/src/main/java/cn/escheduler/api/service/BaseService.java

@ -20,6 +20,7 @@ import cn.escheduler.api.enums.Status;
import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.utils.HadoopUtils;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
@ -110,4 +111,20 @@ public class BaseService {
return null;
}
/**
* create tenant dir if not exists
* @param tenantCode
* @throws Exception
*/
protected void createTenantDirIfNotExists(String tenantCode)throws Exception{
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode);
/**
* init resource path and udf path
*/
HadoopUtils.getInstance().mkdir(resourcePath);
HadoopUtils.getInstance().mkdir(udfsPath);
}
}

15
escheduler-api/src/main/java/cn/escheduler/api/service/LoggerService.java

@ -49,21 +49,22 @@ public class LoggerService {
*/
public Result queryLog(int taskInstId, int skipLineNum, int limit) {
TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
String host = taskInstance.getHost();
if(StringUtils.isEmpty(host)){
return new Result(Status.TASK_INSTANCE_HOST_NOT_FOUND.getCode(), Status.TASK_INSTANCE_HOST_NOT_FOUND.getMsg());
}
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
if(host != null){
LogClient logClient = new LogClient(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
}
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
LogClient logClient = new LogClient(host, Constants.RPC_PORT);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
return result;
}

6
escheduler-api/src/main/java/cn/escheduler/api/service/ProjectService.java

@ -45,12 +45,6 @@ public class ProjectService extends BaseService{
private static final Logger logger = LoggerFactory.getLogger(ProjectService.class);
@Autowired
private UserMapper userMapper;
@Autowired
private UsersService userService;
@Autowired
private ProjectMapper projectMapper;

34
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java

@ -339,19 +339,18 @@ public class ResourcesService extends BaseService {
String resourcePath = "";
if (type.equals(ResourceType.FILE)) {
hdfsFilename = HadoopUtils.getHdfsFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsDir(tenantCode);
resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
} else if (type.equals(ResourceType.UDF)) {
hdfsFilename = HadoopUtils.getHdfsUdfFilename(tenantCode, name);
resourcePath = HadoopUtils.getHdfsUdfDir(tenantCode);
}
try {
if (HadoopUtils.getInstance().exists(resourcePath)) {
cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename);
HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true);
} else {
logger.error("{} is not exist", resourcePath);
return false;
// if tenant dir not exists
if (!HadoopUtils.getInstance().exists(resourcePath)) {
createTenantDirIfNotExists(tenantCode);
}
cn.escheduler.api.utils.FileUtils.copyFile(file, localFilename);
HadoopUtils.getInstance().copyLocalToHdfs(localFilename, hdfsFilename, true, true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
@ -678,22 +677,19 @@ public class ResourcesService extends BaseService {
// get file hdfs path
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resourceName);
String resourcePath = HadoopUtils.getHdfsDir(tenantCode);
String resourcePath = HadoopUtils.getHdfsResDir(tenantCode);
logger.info("resource hdfs path is {} ", hdfsFileName);
HadoopUtils hadoopUtils = HadoopUtils.getInstance();
if (hadoopUtils.exists(resourcePath)) {
if (hadoopUtils.exists(hdfsFileName)) {
hadoopUtils.delete(hdfsFileName, false);
}
hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true);
} else {
logger.error("{} is not exist", resourcePath);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
result.setMsg(String.format("%s is not exist", resourcePath));
return result;
if (!hadoopUtils.exists(resourcePath)) {
// create if tenant dir not exists
createTenantDirIfNotExists(tenantCode);
}
if (hadoopUtils.exists(hdfsFileName)) {
hadoopUtils.delete(hdfsFileName, false);
}
hadoopUtils.copyLocalToHdfs(localFilename, hdfsFileName, true, true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode());

10
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -97,13 +97,7 @@ public class TenantService extends BaseService{
// if hdfs startup
if (PropertyUtils.getResUploadStartupState()){
String resourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + tenantCode + "/resources";
String udfsPath = HadoopUtils.getHdfsUdfDir(tenantCode);
/**
* init resource path and udf path
*/
HadoopUtils.getInstance().mkdir(resourcePath);
HadoopUtils.getInstance().mkdir(udfsPath);
createTenantDirIfNotExists(tenantCode);
}
putMsg(result, Status.SUCCESS);
@ -240,7 +234,7 @@ public class TenantService extends BaseService{
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
if (HadoopUtils.getInstance().exists(tenantPath)){
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
String resourcePath = HadoopUtils.getHdfsResDir(tenant.getTenantCode());
FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath);
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS);

31
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java

@ -124,10 +124,13 @@ public class UsersService extends BaseService {
userMapper.insert(user);
Tenant tenant = tenantMapper.queryById(tenantId);
// if hdfs startup
// resource upload startup
if (PropertyUtils.getResUploadStartupState()){
String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode() + "/home/" + user.getId();
// if tenant not exists
if (!HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(tenant.getTenantCode()))){
createTenantDirIfNotExists(tenant.getTenantCode());
}
String userPath = HadoopUtils.getHdfsUserDir(tenant.getTenantCode(),user.getId());
HadoopUtils.getInstance().mkdir(userPath);
}
@ -247,11 +250,12 @@ public class UsersService extends BaseService {
// if hdfs startup
if (PropertyUtils.getResUploadStartupState() && oldTenant != null){
String newTenantCode = newTenant.getTenantCode();
String oldResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/resources";
String oldResourcePath = HadoopUtils.getHdfsResDir(oldTenant.getTenantCode());
String oldUdfsPath = HadoopUtils.getHdfsUdfDir(oldTenant.getTenantCode());
// if old tenant dir exists
if (HadoopUtils.getInstance().exists(oldResourcePath)){
String newResourcePath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenantCode + "/resources";
String newResourcePath = HadoopUtils.getHdfsResDir(newTenantCode);
String newUdfsPath = HadoopUtils.getHdfsUdfDir(newTenantCode);
//file resources list
@ -271,13 +275,22 @@ public class UsersService extends BaseService {
}
//Delete the user from the old tenant directory
String oldUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + oldTenant.getTenantCode() + "/home/" + userId;
String oldUserPath = HadoopUtils.getHdfsUserDir(oldTenant.getTenantCode(),userId);
HadoopUtils.getInstance().delete(oldUserPath, true);
}else {
// if old tenant dir not exists , create
createTenantDirIfNotExists(oldTenant.getTenantCode());
}
if (HadoopUtils.getInstance().exists(HadoopUtils.getHdfsTenantDir(newTenant.getTenantCode()))){
//create user in the new tenant directory
String newUserPath = HadoopUtils.getHdfsUserDir(newTenant.getTenantCode(),user.getId());
HadoopUtils.getInstance().mkdir(newUserPath);
}else {
// if new tenant dir not exists , create
createTenantDirIfNotExists(newTenant.getTenantCode());
}
//create user in the new tenant directory
String newUserPath = HadoopUtils.getHdfsDataBasePath() + "/" + newTenant.getTenantCode() + "/home/" + user.getId();
HadoopUtils.getInstance().mkdir(newUserPath);
}
}
user.setTenantId(tenantId);

2
escheduler-api/src/main/java/cn/escheduler/api/service/WorkerGroupService.java

@ -136,7 +136,7 @@ public class WorkerGroupService extends BaseService {
Map<String, Object> result = new HashMap<>(5);
int delete = workerGroupMapper.deleteById(id);
workerGroupMapper.deleteById(id);
putMsg(result, Status.SUCCESS);
return result;
}

18
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java

@ -410,12 +410,22 @@ public class HadoopUtils implements Closeable {
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsDir(String tenantCode) {
public static String getHdfsResDir(String tenantCode) {
return String.format("%s/resources", getHdfsTenantDir(tenantCode));
}
/**
* get udf dir on hdfs
* hdfs user dir
*
* @param tenantCode tenant code
* @return hdfs resource dir
*/
public static String getHdfsUserDir(String tenantCode,int userId) {
return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId);
}
/**
* hdfs udf dir
*
* @param tenantCode tenant code
* @return get udf dir on hdfs
@ -432,7 +442,7 @@ public class HadoopUtils implements Closeable {
* @return get absolute path and name for file on hdfs
*/
public static String getHdfsFilename(String tenantCode, String filename) {
return String.format("%s/%s", getHdfsDir(tenantCode), filename);
return String.format("%s/%s", getHdfsResDir(tenantCode), filename);
}
/**
@ -449,7 +459,7 @@ public class HadoopUtils implements Closeable {
/**
* @return file directory of tenants on hdfs
*/
private static String getHdfsTenantDir(String tenantCode) {
public static String getHdfsTenantDir(String tenantCode) {
return String.format("%s/%s", getHdfsDataBasePath(), tenantCode);
}

2
escheduler-common/src/test/java/cn/escheduler/common/utils/CommonUtilsTest.java

@ -45,7 +45,7 @@ public class CommonUtilsTest {
@Test
public void getHdfsDir() {
logger.info(HadoopUtils.getHdfsDir("1234"));
logger.info(HadoopUtils.getHdfsResDir("1234"));
}
@Test

Loading…
Cancel
Save