Browse Source

Merge remote-tracking branch 'upstream/branch-1.0.2' into 102

pull/2/head
baoliang 6 years ago
parent
commit
13e31da0c5
  1. 25
      escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessDefinitionController.java
  2. 32
      escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java
  3. 25
      escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java
  4. 6
      escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java
  5. 60
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java
  6. 80
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
  7. 42
      escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java
  8. 15
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  9. 11
      escheduler-api/src/test/java/cn/escheduler/api/service/ProcessDefinitionServiceTest.java
  10. 12
      escheduler-api/src/test/java/cn/escheduler/api/service/ProcessInstanceServiceTest.java
  11. 14
      escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java
  12. 17
      escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
  13. 7
      escheduler-common/src/main/java/cn/escheduler/common/utils/placeholder/BusinessTimeUtils.java
  14. 8
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  15. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java
  16. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java
  17. 1
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java
  18. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  19. 6
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProjectMapper.java
  20. 10
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProjectMapperProvider.java
  21. 24
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java
  22. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java
  23. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TenantMapperProvider.java
  24. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java
  25. 6
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java
  26. 26
      escheduler-dao/src/main/java/cn/escheduler/dao/model/Project.java
  27. 2
      escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java
  28. 6
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
  29. 4
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
  30. 2
      escheduler-ui/.env
  31. 14
      escheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue
  32. 4
      escheduler-ui/src/js/conf/home/store/dag/actions.js
  33. 2
      escheduler-ui/src/js/module/i18n/locale/en_US.js
  34. 2
      escheduler-ui/src/js/module/i18n/locale/zh_CN.js

25
escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessDefinitionController.java

@ -350,4 +350,29 @@ public class ProcessDefinitionController extends BaseController{
}
}
/**
* batch delete process definition by ids
*
* @param loginUser
* @param projectName
* @param processDefinitionIds
* @return
*/
@GetMapping(value="/batch-delete")
@ResponseStatus(HttpStatus.OK)
public Result batchDeleteProcessDefinitionByIds(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable String projectName,
@RequestParam("processDefinitionIds") String processDefinitionIds
){
try{
logger.info("delete process definition by ids, login user:{}, project name:{}, process definition ids:{}",
loginUser.getUserName(), projectName, processDefinitionIds);
Map<String, Object> result = processDefinitionService.batchDeleteProcessDefinitionByIds(loginUser, projectName, processDefinitionIds);
return returnDataList(result);
}catch (Exception e){
logger.error(BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR.getMsg(),e);
return error(Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR.getCode(), Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR.getMsg());
}
}
}

32
escheduler-api/src/main/java/cn/escheduler/api/controller/ProcessInstanceController.java

@ -22,6 +22,8 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.Flag;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.model.User;
import org.slf4j.Logger;
@ -189,7 +191,9 @@ public class ProcessInstanceController extends BaseController{
try{
logger.info("delete process instance by id, login user:{}, project name:{}, process instance id:{}",
loginUser.getUserName(), projectName, processInstanceId);
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId);
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = processInstanceService.deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_PROCESS_INSTANCE_BY_ID_ERROR.getMsg(),e);
@ -282,4 +286,30 @@ public class ProcessInstanceController extends BaseController{
return error(Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR.getCode(),ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR.getMsg());
}
}
/**
* batch delete process instance by ids, at the same time,
* delete task instance and their mapping relation data
*
* @param loginUser
* @param projectName
* @param processInstanceIds
* @return
*/
@GetMapping(value="/batch-delete")
@ResponseStatus(HttpStatus.OK)
public Result batchDeleteProcessInstanceByIds(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable String projectName,
@RequestParam("processInstanceIds") String processInstanceIds
){
try{
logger.info("delete process instance by ids, login user:{}, project name:{}, process instance ids :{}",
loginUser.getUserName(), projectName, processInstanceIds);
Map<String, Object> result = processInstanceService.batchDeleteProcessInstanceByIds(loginUser, projectName, processInstanceIds);
return returnDataList(result);
}catch (Exception e){
logger.error(BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR.getMsg(),e);
return error(Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR.getCode(), Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR.getMsg());
}
}
}

25
escheduler-api/src/main/java/cn/escheduler/api/controller/SchedulerController.java

@ -232,4 +232,29 @@ public class SchedulerController extends BaseController{
return error(Status.QUERY_SCHEDULE_LIST_ERROR.getCode(), Status.QUERY_SCHEDULE_LIST_ERROR.getMsg());
}
}
/**
* delete schedule by id
*
* @param loginUser
* @param projectName
* @param scheduleId
* @return
*/
@GetMapping(value="/delete")
@ResponseStatus(HttpStatus.OK)
public Result deleteScheduleById(@RequestAttribute(value = SESSION_USER) User loginUser,
@PathVariable String projectName,
@RequestParam("scheduleId") Integer scheduleId
){
try{
logger.info("delete schedule by id, login user:{}, project name:{}, schedule id:{}",
loginUser.getUserName(), projectName, scheduleId);
Map<String, Object> result = schedulerService.deleteScheduleById(loginUser, projectName, scheduleId);
return returnDataList(result);
}catch (Exception e){
logger.error(DELETE_SCHEDULE_CRON_BY_ID_ERROR.getMsg(),e);
return error(Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR.getCode(), Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR.getMsg());
}
}
}

6
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java

@ -160,6 +160,7 @@ public enum Status {
NAME_EXIST(10135, "name {0} already exists"),
SAVE_ERROR(10136, "save error"),
DELETE_PROJECT_ERROR_DEFINES_NOT_NULL(10137, "please delete the process definitions in project first!"),
BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR(10117,"batch delete process instance by ids {0} error"),
@ -204,8 +205,13 @@ public enum Status {
PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line"),
DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error"),
SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line"),
DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024,"delete schedule by id error"),
BATCH_DELETE_PROCESS_DEFINE_ERROR(50025,"batch delete process definition error"),
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026,"batch delete process definition by ids {0} error"),
HDFS_NOT_STARTUP(60001,"hdfs not startup"),
HDFS_TERANT_RESOURCES_FILE_EXISTS(60002,"resource file exists,please delete resource first"),
HDFS_TERANT_UDFS_FILE_EXISTS(60003,"udf file exists,please delete resource first"),
/**
* for monitor

60
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java

@ -38,6 +38,7 @@ import cn.escheduler.dao.mapper.*;
import cn.escheduler.dao.model.*;
import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -356,13 +357,19 @@ public class ProcessDefinitionService extends BaseDAGService {
return checkResult;
}
ProcessDefinition processDefinition = processDefineMapper.queryByDefineId(processDefinitionId);
if (processDefinition == null) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionId);
return result;
}
// Determine if the login user is the owner of the process definition
if (loginUser.getId() != processDefinition.getUserId()) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
// check process definition is already online
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE,processDefinitionId);
@ -370,7 +377,7 @@ public class ProcessDefinitionService extends BaseDAGService {
}
// get the timing according to the process definition
List<Schedule> schedules = scheduleMapper.selectAllByProcessDefineArray(new int[processDefinitionId]);
List<Schedule> schedules = scheduleMapper.queryByProcessDefinitionId(processDefinitionId);
if (!schedules.isEmpty() && schedules.size() > 1) {
logger.warn("scheduler num is {},Greater than 1",schedules.size());
putMsg(result, Status.DELETE_PROCESS_DEFINE_BY_ID_ERROR);
@ -395,6 +402,55 @@ public class ProcessDefinitionService extends BaseDAGService {
return result;
}
/**
* batch delete process definition by ids
*
* @param loginUser
* @param projectName
* @param processDefinitionIds
* @return
*/
public Map<String, Object> batchDeleteProcessDefinitionByIds(User loginUser, String projectName, String processDefinitionIds) {
Map<String, Object> result = new HashMap<>(5);
Map<String, Object> deleteReuslt = new HashMap<>(5);
List<Integer> deleteFailedIdList = new ArrayList<Integer>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
if(StringUtils.isNotEmpty(processDefinitionIds)){
String[] processInstanceIdArray = processDefinitionIds.split(",");
for (String strProcessInstanceId:processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);
try {
deleteReuslt = deleteProcessDefinitionById(loginUser, projectName, processInstanceId);
if(!Status.SUCCESS.equals(deleteReuslt.get(Constants.STATUS))){
deleteFailedIdList.add(processInstanceId);
logger.error((String)deleteReuslt.get(Constants.MSG));
}
} catch (Exception e) {
deleteFailedIdList.add(processInstanceId);
}
}
}
if(deleteFailedIdList.size() > 0){
putMsg(result, Status.BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
}else{
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* release process definition: online / offline
*

80
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@ -30,6 +30,8 @@ import cn.escheduler.common.graph.DAG;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.model.TaskNodeRelation;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.JSONUtils;
@ -446,13 +448,13 @@ public class ProcessInstanceService extends BaseDAGService {
/**
* delete process instance by id, at the same timedelete task instance and their mapping relation data
*
* @param loginUser
* @param projectName
* @param workflowId
* @param processInstanceId
* @param tasksQueue
* @return
*/
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer workflowId) {
public Map<String, Object> deleteProcessInstanceById(User loginUser, String projectName, Integer processInstanceId,ITaskQueue tasksQueue) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
@ -462,16 +464,34 @@ public class ProcessInstanceService extends BaseDAGService {
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(workflowId);
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processInstanceId);
List<TaskInstance> taskInstanceList = processDao.findValidTaskListByProcessId(processInstanceId);
//process instance priority
int processInstancePriority = processInstance.getProcessInstancePriority().ordinal();
if (processInstance == null) {
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, workflowId);
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId);
return result;
}
int delete = processDao.deleteWorkProcessInstanceById(workflowId);
processDao.deleteAllSubWorkProcessByParentId(workflowId);
processDao.deleteWorkProcessMapByParentId(workflowId);
int delete = processDao.deleteWorkProcessInstanceById(processInstanceId);
processDao.deleteAllSubWorkProcessByParentId(processInstanceId);
processDao.deleteWorkProcessMapByParentId(processInstanceId);
if (delete > 0) {
if (CollectionUtils.isNotEmpty(taskInstanceList)){
for (TaskInstance taskInstance : taskInstanceList){
// task instance priority
int taskInstancePriority = taskInstance.getTaskInstancePriority().ordinal();
String nodeValue=processInstancePriority + "_" + processInstanceId + "_" +taskInstancePriority + "_" + taskInstance.getId();
try {
logger.info("delete task queue node : {}",nodeValue);
tasksQueue.removeNode(cn.escheduler.common.Constants.SCHEDULER_TASKS_QUEUE, nodeValue);
}catch (Exception e){
logger.error("delete task queue node : {}", nodeValue);
}
}
}
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR);
@ -479,6 +499,50 @@ public class ProcessInstanceService extends BaseDAGService {
return result;
}
/**
* batch delete process instance by ids, at the same timedelete task instance and their mapping relation data
*
* @param loginUser
* @param projectName
* @param processInstanceIds
* @return
*/
public Map<String, Object> batchDeleteProcessInstanceByIds(User loginUser, String projectName, String processInstanceIds) {
// task queue
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
Map<String, Object> result = new HashMap<>(5);
List<Integer> deleteFailedIdList = new ArrayList<Integer>();
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
if(StringUtils.isNotEmpty(processInstanceIds)){
String[] processInstanceIdArray = processInstanceIds.split(",");
for (String strProcessInstanceId:processInstanceIdArray) {
int processInstanceId = Integer.parseInt(strProcessInstanceId);
try {
deleteProcessInstanceById(loginUser, projectName, processInstanceId,tasksQueue);
} catch (Exception e) {
deleteFailedIdList.add(processInstanceId);
}
}
}
if(deleteFailedIdList.size() > 0){
putMsg(result, Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR,StringUtils.join(deleteFailedIdList.toArray(),","));
}else{
putMsg(result, Status.SUCCESS);
}
return result;
}
/**
* view process instance variables
*

42
escheduler-api/src/main/java/cn/escheduler/api/service/SchedulerService.java

@ -488,4 +488,46 @@ public class SchedulerService extends BaseService {
}
return null;
}
/**
* delete schedule by id
*
* @param loginUser
* @param projectName
* @param scheduleId
* @return
*/
public Map<String, Object> deleteScheduleById(User loginUser, String projectName, Integer scheduleId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status resultEnum = (Status) checkResult.get(Constants.STATUS);
if (resultEnum != Status.SUCCESS) {
return checkResult;
}
Schedule schedule = scheduleMapper.queryById(scheduleId);
if (schedule == null) {
putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, scheduleId);
return result;
}
// check schedule is already online
if(schedule.getReleaseState() == ReleaseState.ONLINE){
putMsg(result, Status.SCHEDULE_CRON_STATE_ONLINE,schedule.getId());
return result;
}
int delete = scheduleMapper.delete(scheduleId);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_SCHEDULE_CRON_BY_ID_ERROR);
}
return result;
}
}

15
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -26,6 +26,7 @@ import cn.escheduler.dao.mapper.TenantMapper;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -219,6 +220,7 @@ public class TenantService extends BaseService{
* @param id
* @return
*/
@Transactional(value = "TransactionManager", rollbackFor = Exception.class)
public Map<String, Object> deleteTenantById(User loginUser, int id) throws Exception {
Map<String, Object> result = new HashMap<>(5);
@ -229,6 +231,19 @@ public class TenantService extends BaseService{
Tenant tenant = tenantMapper.queryById(id);
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath);
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS);
return result;
}
fileStatus = HadoopUtils.getInstance().listFileStatus(HadoopUtils.getHdfsUdfDir(tenant.getTenantCode()));
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_UDFS_FILE_EXISTS);
return result;
}
HadoopUtils.getInstance().delete(tenantPath, true);
tenantMapper.deleteById(id);

11
escheduler-api/src/test/java/cn/escheduler/api/service/ProcessDefinitionServiceTest.java

@ -75,4 +75,15 @@ public class ProcessDefinitionServiceTest {
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
@Test
public void batchDeleteProcessDefinitionByIds() throws Exception {
User loginUser = new User();
loginUser.setId(2);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> map = processDefinitionService.batchDeleteProcessDefinitionByIds(loginUser, "li_test_1", "2,3");
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
}

12
escheduler-api/src/test/java/cn/escheduler/api/service/ProcessInstanceServiceTest.java

@ -75,4 +75,16 @@ public class ProcessInstanceServiceTest {
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
@Test
public void batchDeleteProcessInstanceByIds() throws Exception {
User loginUser = new User();
loginUser.setId(2);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> map = processInstanceService.batchDeleteProcessInstanceByIds(loginUser, "li_test_1", "4,2,300");
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
logger.info(JSON.toJSONString(map));
}
}

14
escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java

@ -73,6 +73,11 @@ public class SqlParameters extends AbstractParameters {
*/
private List<String> postStatements;
/**
* title
*/
private String title;
/**
* receivers
*/
@ -139,6 +144,14 @@ public class SqlParameters extends AbstractParameters {
this.connParams = connParams;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getReceivers() {
return receivers;
}
@ -190,6 +203,7 @@ public class SqlParameters extends AbstractParameters {
", udfs='" + udfs + '\'' +
", showType='" + showType + '\'' +
", connParams='" + connParams + '\'' +
", title='" + title + '\'' +
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
", preStatements=" + preStatements +

17
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -263,6 +264,22 @@ public class HadoopUtils implements Closeable {
return fs.exists(new Path(hdfsFilePath));
}
/**
* Gets a list of files in the directory
*
* @param filePath
* @return {@link FileStatus}
*/
public FileStatus[] listFileStatus(String filePath)throws Exception{
Path path = new Path(filePath);
try {
return fs.listStatus(new Path(filePath));
} catch (IOException e) {
logger.error("Get file list exception", e);
throw new Exception("Get file list exception", e);
}
}
/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.

7
escheduler-common/src/main/java/cn/escheduler/common/utils/placeholder/BusinessTimeUtils.java

@ -50,9 +50,16 @@ public class BusinessTimeUtils {
case RECOVER_TOLERANCE_FAULT_PROCESS:
case RECOVER_SUSPENDED_PROCESS:
case START_FAILURE_TASK_PROCESS:
case REPEAT_RUNNING:
case SCHEDULER:
default:
businessDate = addDays(new Date(), -1);
if (runTime != null){
/**
* If there is a scheduled time, take the scheduling time. Recovery from failed nodes, suspension of recovery, re-run for scheduling
*/
businessDate = addDays(runTime, -1);
}
break;
}
Date businessCurrentDate = addDays(businessDate, 1);

8
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -1555,6 +1555,14 @@ public class ProcessDao extends AbstractBaseDao {
}
public void selfFaultTolerant(int ... states){
List<ProcessInstance> processInstanceList = processInstanceMapper.listByStatus(states);
for (ProcessInstance processInstance:processInstanceList){
selfFaultTolerant(processInstance);
}
}
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)
public void selfFaultTolerant(ProcessInstance processInstance){

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java

@ -111,7 +111,7 @@ public class DataSourceMapperProvider {
*/
public String queryById(Map<String, Object> parameter) {
return new SQL() {{
SELECT("r.*,u.user_name as userName");
SELECT("r.*,u.user_name");
FROM(TABLE_NAME + " r");

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java

@ -134,7 +134,7 @@ public class ProcessDefinitionMapperProvider {
public String queryByDefineId(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("pd.*,u.user_name,p.name as projectName");
SELECT("pd.*,u.user_name,p.name as project_name");
FROM(TABLE_NAME + " pd");
JOIN("t_escheduler_user u ON pd.user_id = u.id");

1
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java

@ -94,6 +94,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "queue", column = "queue", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -220,7 +220,7 @@ public class ProcessInstanceMapperProvider {
public String queryDetailById(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("inst.*,q.queue_name as queue,t.tenant_code as tenantCode,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
SELECT("inst.*,q.queue_name as queue,t.tenant_code,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_tenant t,t_escheduler_queue q");

6
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProjectMapper.java

@ -67,6 +67,7 @@ public interface ProjectMapper {
@Result(property = "userId", column = "user_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "desc", column = "desc", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
})
@ -82,6 +83,7 @@ public interface ProjectMapper {
@Result(property = "userId", column = "user_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "desc", column = "desc", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
})
@ -115,6 +117,8 @@ public interface ProjectMapper {
@Result(property = "perm", column = "perm", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "defCount", column = "def_count", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "instRunningCount", column = "inst_running_count", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
})
@SelectProvider(type = ProjectMapperProvider.class, method = "queryProjectListPaging")
List<Project> queryProjectListPaging(@Param("userId") Integer userId,
@ -145,6 +149,8 @@ public interface ProjectMapper {
@Result(property = "perm", column = "perm", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "defCount", column = "def_count", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "instRunningCount", column = "inst_running_count", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
})
@SelectProvider(type = ProjectMapperProvider.class, method = "queryAllProjectListPaging")
List<Project> queryAllProjectListPaging(

10
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProjectMapperProvider.java

@ -93,7 +93,7 @@ public class ProjectMapperProvider {
public String queryById(Map<String, Object> parameter) {
return new SQL() {{
SELECT("p.user_id");
SELECT("u.user_name as userName");
SELECT("u.user_name");
SELECT("p.*");
FROM(TABLE_NAME + " p");
@ -114,7 +114,7 @@ public class ProjectMapperProvider {
public String queryByName(Map<String, Object> parameter) {
return new SQL() {{
SELECT("p.user_id");
SELECT("u.user_name as userName");
SELECT("u.user_name");
SELECT("p.*");
FROM(TABLE_NAME + " p");
@ -157,7 +157,8 @@ public class ProjectMapperProvider {
return new SQL() {{
SELECT("p.*");
SELECT("u.user_name as user_name");
SELECT("(SELECT COUNT(*) FROM t_escheduler_process_definition AS def WHERE def.project_id = p.id) AS def_count");
SELECT("(SELECT COUNT(*) FROM t_escheduler_process_definition def, t_escheduler_process_instance inst WHERE def.id = inst.process_definition_id AND def.project_id = p.id AND inst.state=1 ) as inst_running_count");
FROM(TABLE_NAME + " p");
JOIN("t_escheduler_user u on u.id=p.user_id");
WHERE("p.id in " +
@ -199,7 +200,8 @@ public class ProjectMapperProvider {
return new SQL() {{
SELECT("p.*");
SELECT("u.user_name as user_name");
SELECT("(SELECT COUNT(*) FROM t_escheduler_process_definition AS def WHERE def.project_id = p.id) AS def_count");
SELECT("(SELECT COUNT(*) FROM t_escheduler_process_definition def, t_escheduler_process_instance inst WHERE def.id = inst.process_definition_id AND def.project_id = p.id AND inst.state=1 ) as inst_running_count");
FROM(TABLE_NAME + " p");
JOIN("t_escheduler_user u on p.user_id = u.id");

24
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapper.java

@ -173,6 +173,30 @@ public interface ScheduleMapper {
@SelectProvider(type = ScheduleMapperProvider.class, method = "selectAllByProcessDefineArray")
List<Schedule> selectAllByProcessDefineArray(@Param("processDefineIds") int[] processDefineIds);
/**
* query schedule list by definition id
* @param processDefinitionId
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", id = true, javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processDefinitionId", column = "process_definition_id", id = true, javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "startTime", column = "start_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "endTime", column = "end_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "crontab", column = "crontab", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "failureStrategy", column = "failure_strategy", typeHandler = EnumOrdinalTypeHandler.class, javaType = FailureStrategy.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningType", column = "warning_type", typeHandler = EnumOrdinalTypeHandler.class, javaType = WarningType.class, jdbcType = JdbcType.TINYINT),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "updateTime", column = "update_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "releaseState", column = "release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "warningGroupId", column = "warning_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ScheduleMapperProvider.class, method = "queryByProcessDefinitionId")
List<Schedule> queryByProcessDefinitionId(@Param("processDefinitionId") int processDefinitionId);
/**
* delete schedule by id
* @param scheduleId

14
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ScheduleMapperProvider.java

@ -181,6 +181,20 @@ public class ScheduleMapperProvider {
}}.toString();
}
/**
* query schedule by process definition id
* @param parameter
* @return
*/
public String queryByProcessDefinitionId(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(DB_NAME);
WHERE("process_definition_id = #{processDefinitionId}");
}}.toString();
}
/**
* delete schedule by id
*

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TenantMapperProvider.java

@ -150,7 +150,7 @@ public class TenantMapperProvider {
public String queryTenantPaging(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("t.*,q.queue_name as queueName");
SELECT("t.*,q.queue_name");
FROM(TABLE_NAME +" t,t_escheduler_queue q");
WHERE( " t.queue_id = q.id");
Object searchVal = parameter.get("searchVal");

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapper.java

@ -175,6 +175,8 @@ public interface UserMapper {
@Result(property = "phone", column = "phone", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userType", column = "user_type", typeHandler = EnumOrdinalTypeHandler.class, javaType = UserType.class, jdbcType = JdbcType.TINYINT),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantName", column = "tenant_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "queueName", column = "queue_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
})

6
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/UserMapperProvider.java

@ -203,7 +203,7 @@ public class UserMapperProvider {
public String queryUserPaging(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("u.*,t.tenant_name as tenantName,q.queue_name as queueName");
SELECT("u.*,t.tenant_name,q.queue_name");
FROM(TABLE_NAME + " u ");
LEFT_OUTER_JOIN("t_escheduler_tenant t on u.tenant_id = t.id");
LEFT_OUTER_JOIN("t_escheduler_queue q on t.queue_id = q.id");
@ -228,7 +228,7 @@ public class UserMapperProvider {
public String queryDetailsById(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("u.*,q.queue_name as queueName,t.tenant_name as tenantName");
SELECT("u.*,q.queue_name,t.tenant_name");
FROM(TABLE_NAME + " u,t_escheduler_tenant t,t_escheduler_queue q");
@ -262,7 +262,7 @@ public class UserMapperProvider {
public String queryTenantCodeByUserId(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("u.*,t.tenant_code as tenantCode");
SELECT("u.*,t.tenant_code");
FROM(TABLE_NAME + " u,t_escheduler_tenant t");
WHERE("u.tenant_id = t.id AND u.id = #{userId}");
}

26
escheduler-dao/src/main/java/cn/escheduler/dao/model/Project.java

@ -63,6 +63,32 @@ public class Project {
*/
private int perm;
/**
* process define count
*/
private int defCount;
/**
* process instance running count
*/
private int instRunningCount;
public int getDefCount() {
return defCount;
}
public void setDefCount(int defCount) {
this.defCount = defCount;
}
public int getInstRunningCount() {
return instRunningCount;
}
public void setInstRunningCount(int instRunningCount) {
this.instRunningCount = instRunningCount;
}
public int getId() {
return id;
}

2
escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java

@ -175,7 +175,7 @@ public class AlertManager {
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setCreateTime(new Date());
alert.setAlertGroupId(processInstance.getWarningGroupId());
alert.setAlertGroupId(processInstance.getWarningGroupId() == null ? 1:processInstance.getWarningGroupId());
alert.setReceivers(processInstance.getProcessDefinition().getReceivers());
alert.setReceiversCc(processInstance.getProcessDefinition().getReceiversCc());

6
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -279,7 +279,11 @@ public class SqlTask extends AbstractTask {
logger.info("showType is empty,don't need send email");
} else {
if (array.size() > 0) {
sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
}else{
sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
}
}
}

4
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -123,9 +123,9 @@ public class ZKMasterClient extends AbstractZKClient {
// register master
this.registMaster();
// check if fault tolerance is required
// check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) {
processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal());
processDao.selfFaultTolerant(ExecutionStatus.RUNNING_EXEUTION.ordinal(),ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal());
}
}

2
escheduler-ui/.env

@ -1,5 +1,5 @@
# 后端接口地址
# 后端接口地址11
API_BASE = http://192.168.xx.xx:12345
# 本地开发如需ip访问项目把"#"号去掉

14
escheduler-ui/src/js/conf/home/pages/projects/pages/list/_source/list.vue

@ -12,6 +12,12 @@
<th>
<span>{{$t('Owned Users')}}</span>
</th>
<th>
<span>{{$t('Process Define Count')}}</span>
</th>
<th>
<span>{{$t('Process Instance Running Count')}}</span>
</th>
<th>
<span>{{$t('Description')}}</span>
</th>
@ -37,6 +43,12 @@
<td>
<span>{{item.userName || '-'}}</span>
</td>
<td>
<span>{{item.defCount}}</span>
</td>
<td>
<span>{{item.instRunningCount}}</span>
</td>
<td>
<span>{{item.desc}}</span>
</td>
@ -150,4 +162,4 @@
},
components: { }
}
</script>
</script>

4
escheduler-ui/src/js/conf/home/store/dag/actions.js

@ -538,7 +538,7 @@ export default {
*/
getReceiver ({ state }, payload) {
return new Promise((resolve, reject) => {
io.get(`projects/{projectName}/executors/get-receiver-cc`, payload, res => {
io.get(`projects/${state.projectName}/executors/get-receiver-cc`, payload, res => {
resolve(res.data)
}).catch(e => {
reject(e)
@ -547,7 +547,7 @@ export default {
},
getTaskListDefIdAll ({ state }, payload) {
return new Promise((resolve, reject) => {
io.get(`projects/{projectName}/process/get-task-list`, payload, res => {
io.get(`projects/${state.projectName}/process/get-task-list`, payload, res => {
resolve(res.data)
}).catch(e => {
reject(e)

2
escheduler-ui/src/js/module/i18n/locale/en_US.js

@ -452,4 +452,6 @@ export default {
'Pre Statement': 'Pre Statement',
'Post Statement': 'Post Statement',
'Statement cannot be empty': 'Statement cannot be empty',
'Process Define Count':'Process Define Count',
'Process Instance Running Count':'Process Instance Running Count',
}

2
escheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -452,4 +452,6 @@ export default {
'Pre Statement': '前置sql',
'Post Statement': '后置sql',
'Statement cannot be empty': '语句不能为空',
'Process Define Count':'流程定义个数',
'Process Instance Running Count':'运行流程实例个数',
}

Loading…
Cancel
Save