Browse Source

Merge remote-tracking branch 'upstream/branch-1.0.2' into dev-up

pull/2/head
lenboo 6 years ago
parent
commit
94bf5bb4b2
  1. 4
      docs/zh_CN/前端部署文档.md
  2. 4
      docs/zh_CN/后端部署文档.md
  3. 2
      docs/zh_CN/系统使用手册.md
  4. 2
      escheduler-alert/pom.xml
  5. 1
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/MailUtils.java
  6. 2
      escheduler-api/pom.xml
  7. 4
      escheduler-api/src/main/java/cn/escheduler/api/controller/ResourcesController.java
  8. 2
      escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java
  9. 85
      escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
  10. 34
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  11. 24
      escheduler-api/src/test/java/cn/escheduler/api/controller/ResourcesControllerTest.java
  12. 2
      escheduler-common/pom.xml
  13. 5
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  14. 35
      escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java
  15. 8
      escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java
  16. 2
      escheduler-common/src/main/java/cn/escheduler/common/utils/OSUtils.java
  17. 2
      escheduler-dao/pom.xml
  18. 46
      escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java
  19. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java
  20. 1
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java
  21. 2
      escheduler-rpc/pom.xml
  22. 2
      escheduler-server/pom.xml
  23. 25
      escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
  24. 72
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
  25. 28
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
  26. 30
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
  27. 2
      pom.xml
  28. 2
      sql/soft_version

4
docs/zh_CN/前端部署文档.md

@ -5,9 +5,9 @@
## 1、准备工作 ## 1、准备工作
#### 下载安装包 #### 下载安装包
目前最新安装包版本是1.0.2,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/) 请下载最新版本的安装包,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/)
下载 escheduler-ui-1.0.2.tar.gz 后,解压`tar -zxvf escheduler-ui-1.0.2.tar.gz ./`后,进入`escheduler-ui`目录 下载 escheduler-ui-x.x.x.tar.gz 后,解压`tar -zxvf escheduler-ui-x.x.x.tar.gz ./`后,进入`escheduler-ui`目录

4
docs/zh_CN/后端部署文档.md

@ -4,7 +4,7 @@
## 1、准备工作 ## 1、准备工作
目前最新安装包版本是1.0.3,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/) ,下载escheduler-backend-1.0.3.tar.gz(后端简称escheduler-backend),escheduler-ui-1.0.3.tar.gz(前端简称escheduler-ui) 请下载最新版本的安装包,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/) ,下载escheduler-backend-x.x.x.tar.gz(后端简称escheduler-backend),escheduler-ui-x.x.x.tar.gz(前端简称escheduler-ui)
#### 准备一: 基础软件安装(必装项请自行安装) #### 准备一: 基础软件安装(必装项请自行安装)
@ -149,7 +149,7 @@ install.sh : 一键部署脚本
### 2.2 编译源码来部署 ### 2.2 编译源码来部署
将源码包release版本1.0.3下载后,解压进入根目录 将源码包release版本下载后,解压进入根目录
* 执行编译命令: * 执行编译命令:

2
docs/zh_CN/系统使用手册.md

@ -311,7 +311,7 @@ conf/common/hadoop.properties
## 安全中心(权限系统) ## 安全中心(权限系统)
- 安全中心是只有管理员账户才有权限的功能,有队列管理、租户管理、用户管理、告警组管理、worker分组、令牌管理等功能,还可以对资源、数据源、项目等授权 - 安全中心是只有管理员账户才有权限的功能,有队列管理、租户管理、用户管理、告警组管理、worker分组、令牌管理等功能,还可以对资源、数据源、项目等授权
- 管理员登录,默认用户名密码:admin/esheduler123 - 管理员登录,默认用户名密码:admin/escheduler123
### 创建队列 ### 创建队列
- 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。 - 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。

2
escheduler-alert/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-alert</artifactId> <artifactId>escheduler-alert</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

1
escheduler-alert/src/main/java/cn/escheduler/alert/utils/MailUtils.java

@ -165,6 +165,7 @@ public class MailUtils {
return retMap; return retMap;
}catch (Exception e){ }catch (Exception e){
handleException(receivers, retMap, e); handleException(receivers, retMap, e);
return retMap;
} }
} }
return retMap; return retMap;

2
escheduler-api/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-api</artifactId> <artifactId>escheduler-api</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

4
escheduler-api/src/main/java/cn/escheduler/api/controller/ResourcesController.java

@ -236,9 +236,9 @@ public class ResourcesController extends BaseController{
) { ) {
try { try {
logger.info("login user {}, verfiy resource alias: {},resource type: {}", logger.info("login user {}, verfiy resource alias: {},resource type: {}",
loginUser.getUserName(), alias); loginUser.getUserName(), alias,type);
return resourceService.verifyResourceName(alias, type); return resourceService.verifyResourceName(alias,type,loginUser);
} catch (Exception e) { } catch (Exception e) {
logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e); logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e);
return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg()); return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg());

2
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java

@ -174,6 +174,8 @@ public enum Status {
RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified"), RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified"),
UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar"), UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar"),
HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail"), HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail"),
RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!"),
RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!"),

85
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java

@ -420,6 +420,41 @@ public class ResourcesService extends BaseService {
return result; return result;
} }
/**
* verify resource by name and type
* @param name
* @param type
* @param loginUser
* @return
*/
public Result verifyResourceName(String name, ResourceType type,User loginUser) {
Result result = new Result();
putMsg(result, Status.SUCCESS);
Resource resource = resourcesMapper.queryResourceByNameAndType(name, type.ordinal());
if (resource != null) {
logger.error("resource type:{} name:{} has exist, can't create again.", type, name);
putMsg(result, Status.RESOURCE_EXIST);
} else {
// query tenant
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
try {
String hdfsFilename = getHdfsFileName(type,tenantCode,name);
if(HadoopUtils.getInstance().exists(hdfsFilename)){
logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename);
putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
putMsg(result,Status.HDFS_OPERATION_ERROR);
}
}
return result;
}
/** /**
* verify resource by name and type * verify resource by name and type
* *
@ -480,13 +515,19 @@ public class ResourcesService extends BaseService {
String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias()); String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
logger.info("resource hdfs path is {} ", hdfsFileName); logger.info("resource hdfs path is {} ", hdfsFileName);
try { try {
List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit); if(HadoopUtils.getInstance().exists(hdfsFileName)){
List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit);
putMsg(result, Status.SUCCESS);
Map<String, Object> map = new HashMap<>();
map.put(ALIAS, resource.getAlias());
map.put(CONTENT, StringUtils.join(content.toArray(), "\n"));
result.setData(map);
}else{
logger.error("read file {} not exist in hdfs", hdfsFileName);
putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName);
}
putMsg(result, Status.SUCCESS);
Map<String, Object> map = new HashMap<>();
map.put(ALIAS, resource.getAlias());
map.put(CONTENT, StringUtils.join(content.toArray(), "\n"));
result.setData(map);
} catch (Exception e) { } catch (Exception e) {
logger.error(String.format("Resource %s read failed", hdfsFileName), e); logger.error(String.format("Resource %s read failed", hdfsFileName), e);
putMsg(result, Status.HDFS_OPERATION_ERROR); putMsg(result, Status.HDFS_OPERATION_ERROR);
@ -530,17 +571,14 @@ public class ResourcesService extends BaseService {
String name = fileName.trim() + "." + nameSuffix; String name = fileName.trim() + "." + nameSuffix;
//check file already exists result = verifyResourceName(name,type,loginUser);
Resource resource = resourcesMapper.queryResourceByNameAndType(name, type.ordinal()); if (!result.getCode().equals(Status.SUCCESS.getCode())) {
if (resource != null) {
logger.error("resource {} has exist, can't recreate .", name);
putMsg(result, Status.RESOURCE_EXIST);
return result; return result;
} }
// save data // save data
Date now = new Date(); Date now = new Date();
resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now); Resource resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now);
resourcesMapper.insert(resource); resourcesMapper.insert(resource);
@ -569,6 +607,7 @@ public class ResourcesService extends BaseService {
* @param resourceId * @param resourceId
* @return * @return
*/ */
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)
public Result updateResourceContent(int resourceId, String content) { public Result updateResourceContent(int resourceId, String content) {
Result result = new Result(); Result result = new Result();
@ -597,6 +636,10 @@ public class ResourcesService extends BaseService {
} }
} }
resource.setSize(content.getBytes().length);
resource.setUpdateTime(new Date());
resourcesMapper.update(resource);
User user = userMapper.queryDetailsById(resource.getUserId()); User user = userMapper.queryDetailsById(resource.getUserId());
String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode(); String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
@ -643,6 +686,7 @@ public class ResourcesService extends BaseService {
logger.error("{} is not exist", resourcePath); logger.error("{} is not exist", resourcePath);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode()); result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
result.setMsg(String.format("%s is not exist", resourcePath)); result.setMsg(String.format("%s is not exist", resourcePath));
return result;
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
@ -809,6 +853,23 @@ public class ResourcesService extends BaseService {
return hdfsFileName; return hdfsFileName;
} }
/**
* get hdfs file name
*
* @param resourceType
* @param tenantCode
* @param hdfsFileName
* @return
*/
private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) {
if (resourceType.equals(ResourceType.FILE)) {
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName);
}
return hdfsFileName;
}
/** /**
* get authorized resource list * get authorized resource list
* *

34
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -166,7 +166,7 @@ public class TenantService extends BaseService{
Tenant tenant = tenantMapper.queryById(id); Tenant tenant = tenantMapper.queryById(id);
if (tenant == null){ if (tenant == null){
putMsg(result, Status.USER_NOT_EXIST, id); putMsg(result, Status.TENANT_NOT_EXIST);
return result; return result;
} }
@ -230,21 +230,29 @@ public class TenantService extends BaseService{
Tenant tenant = tenantMapper.queryById(id); Tenant tenant = tenantMapper.queryById(id);
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode(); if (tenant == null){
putMsg(result, Status.TENANT_NOT_EXIST);
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath);
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS);
return result;
}
fileStatus = HadoopUtils.getInstance().listFileStatus(HadoopUtils.getHdfsUdfDir(tenant.getTenantCode()));
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_UDFS_FILE_EXISTS);
return result; return result;
} }
HadoopUtils.getInstance().delete(tenantPath, true); // if hdfs startup
if (PropertyUtils.getBoolean(cn.escheduler.common.Constants.HDFS_STARTUP_STATE)){
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
FileStatus[] fileStatus = HadoopUtils.getInstance().listFileStatus(resourcePath);
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_RESOURCES_FILE_EXISTS);
return result;
}
fileStatus = HadoopUtils.getInstance().listFileStatus(HadoopUtils.getHdfsUdfDir(tenant.getTenantCode()));
if (fileStatus.length > 0) {
putMsg(result, Status.HDFS_TERANT_UDFS_FILE_EXISTS);
return result;
}
HadoopUtils.getInstance().delete(tenantPath, true);
}
tenantMapper.deleteById(id); tenantMapper.deleteById(id);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

24
escheduler-api/src/test/java/cn/escheduler/api/controller/ResourcesControllerTest.java

@ -34,6 +34,8 @@ import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.WebApplicationContext;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
@ -43,7 +45,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
public class ResourcesControllerTest { public class ResourcesControllerTest {
private static Logger logger = LoggerFactory.getLogger(QueueControllerTest.class); private static Logger logger = LoggerFactory.getLogger(ResourcesControllerTest.class);
private MockMvc mockMvc; private MockMvc mockMvc;
@ -71,4 +73,24 @@ public class ResourcesControllerTest {
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString()); logger.info(mvcResult.getResponse().getContentAsString());
} }
@Test
public void verifyResourceName() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","list_resources_1.sh");
paramsMap.add("type","FILE");
MvcResult mvcResult = mockMvc.perform(get("/resources/verify-name")
.header("sessionId", "c24ed9d9-1c20-48a0-bd9c-5cfca14a4dcb")
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
} }

2
escheduler-common/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-common</artifactId> <artifactId>escheduler-common</artifactId>
<name>escheduler-common</name> <name>escheduler-common</name>

5
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -252,6 +252,11 @@ public final class Constants {
*/ */
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
/**
* date format of yyyyMMdd
*/
public static final String YYYYMMDD = "yyyyMMdd";
/** /**
* date format of yyyyMMddHHmmss * date format of yyyyMMddHHmmss
*/ */

35
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskRecordStatus.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* task record status
*
*/
public enum TaskRecordStatus {
/**
* status
* 0 sucess
* 1 failure
* 2 exception
*/
SUCCESS,FAILURE,EXCEPTION
}

8
escheduler-common/src/main/java/cn/escheduler/common/enums/TaskType.java

@ -30,5 +30,11 @@ public enum TaskType {
* 6 PYTHON * 6 PYTHON
* 7 DEPENDENT * 7 DEPENDENT
*/ */
SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT SHELL,SQL, SUB_PROCESS,PROCEDURE,MR,SPARK,PYTHON,DEPENDENT;
public static boolean typeIsNormalTask(String typeName) {
TaskType taskType = TaskType.valueOf(typeName);
return !(taskType == TaskType.SUB_PROCESS || taskType == TaskType.DEPENDENT);
}
} }

2
escheduler-common/src/main/java/cn/escheduler/common/utils/OSUtils.java

@ -220,7 +220,7 @@ public class OSUtils {
* @throws IOException * @throws IOException
*/ */
public static String exeShell(String command) throws IOException { public static String exeShell(String command) throws IOException {
return ShellExecutor.execCommand("groups"); return ShellExecutor.execCommand(command);
} }
/** /**

2
escheduler-dao/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-dao</artifactId> <artifactId>escheduler-dao</artifactId>
<name>escheduler-dao</name> <name>escheduler-dao</name>

46
escheduler-dao/src/main/java/cn/escheduler/dao/TaskRecordDao.java

@ -17,6 +17,8 @@
package cn.escheduler.dao; package cn.escheduler.dao;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.dao.model.TaskRecord; import cn.escheduler.dao.model.TaskRecord;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
@ -28,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -43,7 +46,7 @@ public class TaskRecordDao {
/** /**
* 加载配置文件 * load conf file
*/ */
private static Configuration conf; private static Configuration conf;
@ -56,6 +59,14 @@ public class TaskRecordDao {
} }
} }
/**
* get task record flag
* @return
*/
public static boolean getTaskRecordFlag(){
return conf.getBoolean(Constants.TASK_RECORD_FLAG);
}
/** /**
* create connection * create connection
* @return * @return
@ -253,4 +264,37 @@ public class TaskRecordDao {
} }
return recordList; return recordList;
} }
/**
* according to procname and procdate query task record
* @param procName
* @param procDate
* @return
*/
public static TaskRecordStatus getTaskRecordState(String procName,String procDate){
String sql = String.format("SELECT * FROM eamp_hive_log_hd WHERE PROC_NAME='%s' and PROC_DATE like '%s'"
,procName,procDate + "%");
List<TaskRecord> taskRecordList = getQueryResult(sql);
// contains no record and sql exception
if (CollectionUtils.isEmpty(taskRecordList)){
// exception
return TaskRecordStatus.EXCEPTION;
}else if (taskRecordList.size() > 1){
return TaskRecordStatus.EXCEPTION;
}else {
TaskRecord taskRecord = taskRecordList.get(0);
if (taskRecord == null){
return TaskRecordStatus.EXCEPTION;
}
Long targetRowCount = taskRecord.getTargetRowCount();
if (targetRowCount <= 0){
return TaskRecordStatus.FAILURE;
}else {
return TaskRecordStatus.SUCCESS;
}
}
}
} }

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java

@ -189,7 +189,7 @@ public class ProcessDefinitionMapperProvider {
if(userId != null && 0 != Integer.parseInt(userId.toString())){ if(userId != null && 0 != Integer.parseInt(userId.toString())){
WHERE("td.user_id = #{userId}"); WHERE("td.user_id = #{userId}");
} }
ORDER_BY(" td.update_time desc limit #{offset},#{pageSize} "); ORDER_BY(" sc.schedule_release_state desc,td.update_time desc limit #{offset},#{pageSize} ");
}}.toString(); }}.toString();
} }
/** /**

1
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java

@ -118,6 +118,7 @@ public class ResourceMapperProvider {
SET("`alias` = #{resource.alias}"); SET("`alias` = #{resource.alias}");
SET("`desc` = #{resource.desc}"); SET("`desc` = #{resource.desc}");
SET("`update_time` = #{resource.updateTime}"); SET("`update_time` = #{resource.updateTime}");
SET("`size` = #{resource.size}");
WHERE("`id` = #{resource.id}"); WHERE("`id` = #{resource.id}");
}}.toString(); }}.toString();
} }

2
escheduler-rpc/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

2
escheduler-server/pom.xml

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-server</artifactId> <artifactId>escheduler-server</artifactId>
<name>escheduler-server</name> <name>escheduler-server</name>

25
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java

@ -18,6 +18,7 @@ package cn.escheduler.server.utils;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.rpc.LogClient; import cn.escheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -33,6 +34,7 @@ import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* mainly used to get the start command line of a process * mainly used to get the start command line of a process
*/ */
@ -139,6 +141,8 @@ public class ProcessUtils {
{' ', '\t', '<', '>'}, {' ', '\t'}}; {' ', '\t', '<', '>'}, {' ', '\t'}};
private static Matcher matcher;
private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) { private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) {
StringBuilder cmdbuf = new StringBuilder(80); StringBuilder cmdbuf = new StringBuilder(80);
@ -256,11 +260,11 @@ public class ProcessUtils {
return ; return ;
} }
String cmd = String.format("sudo kill -9 %d", processId); String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
logger.info("process id:{}, cmd:{}", processId, cmd); logger.info("process id:{}, cmd:{}", processId, cmd);
Runtime.getRuntime().exec(cmd); OSUtils.exeCmd(cmd);
// find log and kill yarn job // find log and kill yarn job
killYarnJob(taskInstance); killYarnJob(taskInstance);
@ -270,6 +274,23 @@ public class ProcessUtils {
} }
} }
/**
* get pids str
* @param processId
* @return
* @throws Exception
*/
private static String getPidsStr(int processId)throws Exception{
StringBuilder sb = new StringBuilder();
// pstree -p pid get sub pids
String pids = OSUtils.exeCmd("pstree -p " +processId+ "");
Matcher mat = Pattern.compile("(\\d+)").matcher(pids);
while (mat.find()){
sb.append(mat.group()+" ");
}
return sb.toString().trim();
}
/** /**
* find logs and kill yarn tasks * find logs and kill yarn tasks
* @param taskInstance * @param taskInstance

72
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -19,18 +19,25 @@ package cn.escheduler.server.worker.runner;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus; import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.enums.TaskType; import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.model.TaskNode; import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.process.Property; import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.TaskTimeoutParameter; import cn.escheduler.common.task.TaskTimeoutParameter;
import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.task.mr.MapreduceParameters;
import cn.escheduler.common.utils.HadoopUtils; import cn.escheduler.common.task.procedure.ProcedureParameters;
import cn.escheduler.common.utils.TaskParametersUtils; import cn.escheduler.common.task.python.PythonParameters;
import cn.escheduler.common.task.shell.ShellParameters;
import cn.escheduler.common.task.spark.SparkParameters;
import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.utils.*;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.log.TaskLogger;
import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskManager;
@ -141,6 +148,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
List<String> projectRes = createProjectResFiles(taskNode); List<String> projectRes = createProjectResFiles(taskNode);
// copy hdfs file to local // copy hdfs file to local
@ -199,6 +207,31 @@ public class TaskScheduleThread implements Callable<Boolean> {
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS; status = ExecutionStatus.SUCCESS;
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
}
}
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL; status = ExecutionStatus.KILL;
}else { }else {
@ -251,6 +284,39 @@ public class TaskScheduleThread implements Callable<Boolean> {
} }
/**
* get current task parameter class
* @return
*/
private Class getCurTaskParamsClass(){
Class paramsClass = null;
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
switch (taskType){
case SHELL:
paramsClass = ShellParameters.class;
break;
case SQL:
paramsClass = SqlParameters.class;
break;
case PROCEDURE:
paramsClass = ProcedureParameters.class;
break;
case MR:
paramsClass = MapreduceParameters.class;
break;
case SPARK:
paramsClass = SparkParameters.class;
break;
case PYTHON:
paramsClass = PythonParameters.class;
break;
default:
logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type");
}
return paramsClass;
}
/** /**
* kill task * kill task
*/ */

28
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java

@ -213,7 +213,7 @@ public abstract class AbstractCommandExecutor {
*/ */
private int updateState(ProcessDao processDao, int exitStatusCode, int pid, int taskInstId) { private int updateState(ProcessDao processDao, int exitStatusCode, int pid, int taskInstId) {
//get yarn state by log //get yarn state by log
if (exitStatusCode != -1) { if (exitStatusCode != 0) {
TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
logger.info("process id is {}", pid); logger.info("process id is {}", pid);
@ -380,14 +380,22 @@ public abstract class AbstractCommandExecutor {
boolean result = true; boolean result = true;
try { try {
for (String appId : appIds) { for (String appId : appIds) {
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId); while(true){
logger.info("appId:{}, final state:{}",appId,applicationStatus.name()); ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
if (!applicationStatus.equals(ExecutionStatus.SUCCESS)) { logger.info("appId:{}, final state:{}",appId,applicationStatus.name());
result = false; if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
applicationStatus.equals(ExecutionStatus.KILL)) {
return false;
}
if (applicationStatus.equals(ExecutionStatus.SUCCESS)){
break;
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(String.format("mapreduce applications: %s status failed : " + e.getMessage(), appIds.toString()),e); logger.error(String.format("yarn applications: %s status failed : " + e.getMessage(), appIds.toString()),e);
result = false; result = false;
} }
return result; return result;
@ -548,10 +556,4 @@ public abstract class AbstractCommandExecutor {
protected abstract boolean checkShowLog(String line); protected abstract boolean checkShowLog(String line);
protected abstract boolean checkFindApp(String line); protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
// if(line.contains(taskAppId) || !line.contains("cn.escheduler.server.worker.log.TaskLogger")){
// logs.add(line);
// }
} }

30
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -196,7 +196,7 @@ public class SqlTask extends AbstractTask {
} }
// special characters need to be escaped, ${} needs to be escaped // special characters need to be escaped, ${} needs to be escaped
String rgex = "'?\\$\\{(.*?)\\}'?"; String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap); setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap);
// replace the ${} of the SQL statement with the Placeholder // replace the ${} of the SQL statement with the Placeholder
@ -310,6 +310,7 @@ public class SqlTask extends AbstractTask {
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
} }
return connection; return connection;
} }
@ -326,6 +327,7 @@ public class SqlTask extends AbstractTask {
ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue()); ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue());
} }
} }
logger.info("prepare statement replace sql:{}",stmt.toString());
return stmt; return stmt;
} }
@ -347,14 +349,14 @@ public class SqlTask extends AbstractTask {
// receiving group list // receiving group list
List<String> receviersList = new ArrayList<String>(); List<String> receviersList = new ArrayList<String>();
for(User user:users){ for(User user:users){
receviersList.add(user.getEmail()); receviersList.add(user.getEmail().trim());
} }
// custom receiver // custom receiver
String receivers = sqlParameters.getReceivers(); String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){ if (StringUtils.isNotEmpty(receivers)){
String[] splits = receivers.split(Constants.COMMA); String[] splits = receivers.split(Constants.COMMA);
for (String receiver : splits){ for (String receiver : splits){
receviersList.add(receiver); receviersList.add(receiver.trim());
} }
} }
@ -365,15 +367,19 @@ public class SqlTask extends AbstractTask {
if (StringUtils.isNotEmpty(receiversCc)){ if (StringUtils.isNotEmpty(receiversCc)){
String[] splits = receiversCc.split(Constants.COMMA); String[] splits = receiversCc.split(Constants.COMMA);
for (String receiverCc : splits){ for (String receiverCc : splits){
receviersCcList.add(receiverCc); receviersCcList.add(receiverCc.trim());
} }
} }
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
MailUtils.sendMails(receviersList,receviersCcList,title, content, ShowType.valueOf(showTypeName)); Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(cn.escheduler.api.utils.Constants.STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{ }else{
logger.error("showType: {} is not valid " ,showTypeName); logger.error("showType: {} is not valid " ,showTypeName);
throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName));
} }
} }
@ -411,19 +417,5 @@ public class SqlTask extends AbstractTask {
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")"); logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
} }
logger.info(logPrint.toString()); logger.info(logPrint.toString());
//direct print style
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
StringBuffer sb = new StringBuffer("replaced sql , direct:");
while (m.find()) {
m.appendReplacement(sb, sqlParamsMap.get(index).getValue());
index ++;
}
m.appendTail(sb);
logger.info(sb.toString());
} }
} }

2
pom.xml

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>escheduler</name> <name>escheduler</name>
<url>http://maven.apache.org</url> <url>http://maven.apache.org</url>

2
sql/soft_version

@ -1 +1 @@
1.0.2 1.0.4
Loading…
Cancel
Save