Browse Source

Merge pull request #278 from lgcareer/branch-1.0.2

add listFileStatus and update deleteTenantById
pull/2/head
lgcareer 6 years ago committed by GitHub
parent
commit
98450c7bfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java
  2. 3
      escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java
  3. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java
  4. 42
      escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
  5. 15
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  6. 17
      escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
  7. 24
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java
  8. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java

25
escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java

@ -232,4 +232,29 @@ public class SchedulerController extends BaseController{
return error(Status.QUERY_SCHEDULE_LIST_ERROR.getCode(), Status.QUERY_SCHEDULE_LIST_ERROR.getMsg());
}
}
/**
* delete schedule by id
*
* @param loginUser
* @param projectName
* @param scheduleId
* @return
*/
@GetMapping(value="/delete")
@ResponseStatus(HttpStatus.OK)
public Result deleteScheduleById(@RequestAttribute(value = SESSION_USER) User loginUser,
@PathVariable String projectName,
@RequestParam("scheduleId") Integer scheduleId
){
try{
logger.info("delete schedule by id, login user:{}, project name:{}, schedule id:{}",
loginUser.getUserName(), projectName, scheduleId);
Map<String, Object> result = schedulerService.deleteScheduleById(loginUser, projectName, scheduleId);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_SCHEDULE_CRON_BY_ID_ERROR.getMsg(),e);
return error(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR.getCode(), Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR.getMsg());
}
}
}

3
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java

@ -204,8 +204,11 @@ public enum Status {
PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line"),
DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error"),
SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line"),
DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024,"delete schedule by id error"),
HDFS_NOT_STARTUP(60001,"hdfs not startup"),
HDFS_TERANT_RESOURCES_FILE_EXISTS(60002,"resource file exists,please delete resource first"),
HDFS_TERANT_UDFS_FILE_EXISTS(60003,"udf file exists,please delete resource first"),
/**
* for monitor

2
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java

@ -370,7 +370,7 @@ public class ProcessDefinitionService extends BaseDAGService {
}
// get the timing according to the process definition
List<Schedule> schedules = scheduleMapper.selectAllByProcessDefineArray(new int[processDefinitionId]);
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
if (!schedules.isEmpty() && schedules.size() > 1) {
logger.warn("scheduler num is {},Greater than 1",schedules.size());
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);

42
escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java

@ -488,4 +488,46 @@ public class SchedulerService extends BaseService {
}
return null;
}
/**
* delete schedule by id
*
* @param loginUser
* @param projectName
* @param scheduleId
* @return
*/
public Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
Schedule schedule = scheduleMapper.queryById(scheduleId);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
return result;
}
// check schedule is already online
if(schedule.getReleaseState() == ReleaseState.ONLINE){
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE,schedule.getId());
return result;
}
int delete = scheduleMapper.delete(scheduleId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
return result;
}
}

15
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -26,6 +26,7 @@ import cn.escheduler.dao.mapper.TenantMapper;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -219,6 +220,7 @@ public class TenantService extends BaseService{
* @param id
* @return
*/
@Transactional(value = "TransactionManager", rollbackFor = Exception.class)
public Map<String, Object> deleteTenantById(User loginUser, int id) throws Exception {
Map<String, Object> result = new HashMap<>(5);
@ -229,6 +231,19 @@ public class TenantService extends BaseService{
Tenant tenant = tenantMapper.queryById(id);
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath);
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS);
return result;
}
fileStatus = HadoopUtils.getInstance().listFileStatus(HadoopUtils.getHdfsUdfDir(tenant.getTenantCode()));
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_UDFS_FILE_EXISTS);
return result;
}
HadoopUtils.getInstance().delete(tenantPath, true);
tenantMapper.deleteById(id);

17
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -263,6 +264,22 @@ public class HadoopUtils implements Closeable {
return fs.exists(new Path(hdfsFilePath));
}
/**
* Gets a list of files in the directory
*
* @param filePath
* @return {@link FileStatus}
*/
public FileStatus[] listFileStatus(String filePath)throws Exception{
Path path = new Path(filePath);
try {
return fs.listStatus(new Path(filePath));
} catch (IOException e) {
logger.error("Get file list exception", e);
throw new Exception("Get file list exception", e);
}
}
/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.

24
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java

@ -173,6 +173,30 @@ public interface ScheduleMapper {
@SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray")
List<Schedule> selectAllByProcessDefineArray(@Param("processDefineIds") int[] processDefineIds);
/**
* query schedule list by definition id
* @param processDefinitionId
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", id = true, javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processDefinitionId", column = "process_definition_id", id = true, javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "startTime", column = "start_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "endTime", column = "end_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "crontab", column = "crontab", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "failureStrategy", column = "failure_strategy", typeHandler = EnumOrdinalTypeHandler.class, javaType = FailureStrategy.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningType", column = "warning_type", typeHandler = EnumOrdinalTypeHandler.class, javaType = WarningType.class, jdbcType = JdbcType.TINYINT),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefinitionId")
List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
/**
* delete schedule by id
* @param scheduleId

14
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java

@ -181,6 +181,20 @@ public class ScheduleMapperProvider {
}}.toString();
}
/**
* query schedule by process definition id
* @param parameter
* @return
*/
public String queryByProcessDefinitionId(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(DB_NAME);
WHERE("process_definition_id = #{processDefinitionId}");
}}.toString();
}
/**
* delete schedule by id
*

Loading…
Cancel
Save