Browse Source

Merge pull request #487 from analysys/branch-1.0.2

merge for 1.0.4
pull/2/head
bao liang 6 years ago committed by GitHub
parent
commit
ba431dbff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/zh_CN/前端部署文档.md
  2. 4
      docs/zh_CN/后端部署文档.md
  3. 2
      docs/zh_CN/系统使用手册.md
  4. 2
      escheduler-alert/pom.xml
  5. 1
      escheduler-alert/src/main/java/cn/escheduler/alert/utils/MailUtils.java
  6. 2
      escheduler-api/pom.xml
  7. 4
      escheduler-api/src/main/java/cn/escheduler/api/controller/ResourcesController.java
  8. 2
      escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java
  9. 73
      escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java
  10. 10
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  11. 24
      escheduler-api/src/test/java/cn/escheduler/api/controller/ResourcesControllerTest.java
  12. 2
      escheduler-common/pom.xml
  13. 2
      escheduler-common/src/main/java/cn/escheduler/common/utils/OSUtils.java
  14. 2
      escheduler-dao/pom.xml
  15. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java
  16. 1
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java
  17. 2
      escheduler-rpc/pom.xml
  18. 2
      escheduler-server/pom.xml
  19. 25
      escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
  20. 22
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
  21. 30
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
  22. 2
      pom.xml
  23. 2
      sql/soft_version

4
docs/zh_CN/前端部署文档.md

@ -5,9 +5,9 @@
## 1、准备工作 ## 1、准备工作
#### 下载安装包 #### 下载安装包
目前最新安装包版本是1.0.2,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/) 请下载最新版本的安装包,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/)
下载 escheduler-ui-1.0.2.tar.gz 后,解压`tar -zxvf escheduler-ui-1.0.2.tar.gz ./`后,进入`escheduler-ui`目录 下载 escheduler-ui-x.x.x.tar.gz 后,解压`tar -zxvf escheduler-ui-x.x.x.tar.gz ./`后,进入`escheduler-ui`目录

4
docs/zh_CN/后端部署文档.md

@ -4,7 +4,7 @@
## 1、准备工作 ## 1、准备工作
目前最新安装包版本是1.0.3,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/) ,下载escheduler-backend-1.0.3.tar.gz(后端简称escheduler-backend),escheduler-ui-1.0.3.tar.gz(前端简称escheduler-ui) 请下载最新版本的安装包,下载地址: [码云下载](https://gitee.com/easyscheduler/EasyScheduler/attach_files/) ,下载escheduler-backend-x.x.x.tar.gz(后端简称escheduler-backend),escheduler-ui-x.x.x.tar.gz(前端简称escheduler-ui)
#### 准备一: 基础软件安装(必装项请自行安装) #### 准备一: 基础软件安装(必装项请自行安装)
@ -149,7 +149,7 @@ install.sh : 一键部署脚本
### 2.2 编译源码来部署 ### 2.2 编译源码来部署
将源码包release版本1.0.3下载后,解压进入根目录 将源码包release版本下载后,解压进入根目录
* 执行编译命令: * 执行编译命令:

2
docs/zh_CN/系统使用手册.md

@ -311,7 +311,7 @@ conf/common/hadoop.properties
## 安全中心(权限系统) ## 安全中心(权限系统)
- 安全中心是只有管理员账户才有权限的功能,有队列管理、租户管理、用户管理、告警组管理、worker分组、令牌管理等功能,还可以对资源、数据源、项目等授权 - 安全中心是只有管理员账户才有权限的功能,有队列管理、租户管理、用户管理、告警组管理、worker分组、令牌管理等功能,还可以对资源、数据源、项目等授权
- 管理员登录,默认用户名密码:admin/esheduler123 - 管理员登录,默认用户名密码:admin/escheduler123
### 创建队列 ### 创建队列
- 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。 - 队列是在执行spark、mapreduce等程序,需要用到“队列”参数时使用的。

2
escheduler-alert/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-alert</artifactId> <artifactId>escheduler-alert</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

1
escheduler-alert/src/main/java/cn/escheduler/alert/utils/MailUtils.java

@ -165,6 +165,7 @@ public class MailUtils {
return retMap; return retMap;
}catch (Exception e){ }catch (Exception e){
handleException(receivers, retMap, e); handleException(receivers, retMap, e);
return retMap;
} }
} }
return retMap; return retMap;

2
escheduler-api/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-api</artifactId> <artifactId>escheduler-api</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>

4
escheduler-api/src/main/java/cn/escheduler/api/controller/ResourcesController.java

@ -236,9 +236,9 @@ public class ResourcesController extends BaseController{
) { ) {
try { try {
logger.info("login user {}, verfiy resource alias: {},resource type: {}", logger.info("login user {}, verfiy resource alias: {},resource type: {}",
loginUser.getUserName(), alias); loginUser.getUserName(), alias,type);
return resourceService.verifyResourceName(alias, type); return resourceService.verifyResourceName(alias,type,loginUser);
} catch (Exception e) { } catch (Exception e) {
logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e); logger.error(VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg(), e);
return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg()); return error(Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getCode(), Status.VERIFY_RESOURCE_BY_NAME_AND_TYPE_ERROR.getMsg());

2
escheduler-api/src/main/java/cn/escheduler/api/enums/Status.java

@ -175,6 +175,8 @@ public enum Status {
RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified"), RESOURCE_SUFFIX_FORBID_CHANGE(20008, "resource suffix not allowed to be modified"),
UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar"), UDF_RESOURCE_SUFFIX_NOT_JAR(20009, "UDF resource suffix name must be jar"),
HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail"), HDFS_COPY_FAIL(20009, "hdfs copy {0} -> {1} fail"),
RESOURCE_FILE_EXIST(20010, "resource file {0} already exists in hdfs,please delete it or change name!"),
RESOURCE_FILE_NOT_EXIST(20011, "resource file {0} not exists in hdfs!"),

73
escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java

@ -421,6 +421,41 @@ public class ResourcesService extends BaseService {
return result; return result;
} }
/**
* verify resource by name and type
* @param name
* @param type
* @param loginUser
* @return
*/
public Result verifyResourceName(String name, ResourceType type,User loginUser) {
Result result = new Result();
putMsg(result, Status.SUCCESS);
Resource resource = resourcesMapper.queryResourceByNameAndType(name, type.ordinal());
if (resource != null) {
logger.error("resource type:{} name:{} has exist, can't create again.", type, name);
putMsg(result, Status.RESOURCE_EXIST);
} else {
// query tenant
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
try {
String hdfsFilename = getHdfsFileName(type,tenantCode,name);
if(HadoopUtils.getInstance().exists(hdfsFilename)){
logger.error("resource type:{} name:{} has exist in hdfs {}, can't create again.", type, name,hdfsFilename);
putMsg(result, Status.RESOURCE_FILE_EXIST,hdfsFilename);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
putMsg(result,Status.HDFS_OPERATION_ERROR);
}
}
return result;
}
/** /**
* verify resource by name and type * verify resource by name and type
* *
@ -481,6 +516,7 @@ public class ResourcesService extends BaseService {
String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias()); String hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, resource.getAlias());
logger.info("resource hdfs path is {} ", hdfsFileName); logger.info("resource hdfs path is {} ", hdfsFileName);
try { try {
if(HadoopUtils.getInstance().exists(hdfsFileName)){
List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit); List<String> content = HadoopUtils.getInstance().catFile(hdfsFileName, skipLineNum, limit);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
@ -488,6 +524,11 @@ public class ResourcesService extends BaseService {
map.put(ALIAS, resource.getAlias()); map.put(ALIAS, resource.getAlias());
map.put(CONTENT, StringUtils.join(content.toArray(), "\n")); map.put(CONTENT, StringUtils.join(content.toArray(), "\n"));
result.setData(map); result.setData(map);
}else{
logger.error("read file {} not exist in hdfs", hdfsFileName);
putMsg(result, Status.RESOURCE_FILE_NOT_EXIST,hdfsFileName);
}
} catch (Exception e) { } catch (Exception e) {
logger.error(String.format("Resource %s read failed", hdfsFileName), e); logger.error(String.format("Resource %s read failed", hdfsFileName), e);
putMsg(result, Status.HDFS_OPERATION_ERROR); putMsg(result, Status.HDFS_OPERATION_ERROR);
@ -531,17 +572,14 @@ public class ResourcesService extends BaseService {
String name = fileName.trim() + "." + nameSuffix; String name = fileName.trim() + "." + nameSuffix;
//check file already exists result = verifyResourceName(name,type,loginUser);
Resource resource = resourcesMapper.queryResourceByNameAndType(name, type.ordinal()); if (!result.getCode().equals(Status.SUCCESS.getCode())) {
if (resource != null) {
logger.error("resource {} has exist, can't recreate .", name);
putMsg(result, Status.RESOURCE_EXIST);
return result; return result;
} }
// save data // save data
Date now = new Date(); Date now = new Date();
resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now); Resource resource = new Resource(name,name,desc,loginUser.getId(),type,content.getBytes().length,now,now);
resourcesMapper.insert(resource); resourcesMapper.insert(resource);
@ -570,6 +608,7 @@ public class ResourcesService extends BaseService {
* @param resourceId * @param resourceId
* @return * @return
*/ */
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)
public Result updateResourceContent(int resourceId, String content) { public Result updateResourceContent(int resourceId, String content) {
Result result = new Result(); Result result = new Result();
@ -598,6 +637,10 @@ public class ResourcesService extends BaseService {
} }
} }
resource.setSize(content.getBytes().length);
resource.setUpdateTime(new Date());
resourcesMapper.update(resource);
User user = userMapper.queryDetailsById(resource.getUserId()); User user = userMapper.queryDetailsById(resource.getUserId());
String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode(); String tenantCode = tenantMapper.queryById(user.getTenantId()).getTenantCode();
@ -644,6 +687,7 @@ public class ResourcesService extends BaseService {
logger.error("{} is not exist", resourcePath); logger.error("{} is not exist", resourcePath);
result.setCode(Status.HDFS_OPERATION_ERROR.getCode()); result.setCode(Status.HDFS_OPERATION_ERROR.getCode());
result.setMsg(String.format("%s is not exist", resourcePath)); result.setMsg(String.format("%s is not exist", resourcePath));
return result;
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
@ -810,6 +854,23 @@ public class ResourcesService extends BaseService {
return hdfsFileName; return hdfsFileName;
} }
/**
* get hdfs file name
*
* @param resourceType
* @param tenantCode
* @param hdfsFileName
* @return
*/
private String getHdfsFileName(ResourceType resourceType, String tenantCode, String hdfsFileName) {
if (resourceType.equals(ResourceType.FILE)) {
hdfsFileName = HadoopUtils.getHdfsFilename(tenantCode, hdfsFileName);
} else if (resourceType.equals(ResourceType.UDF)) {
hdfsFileName = HadoopUtils.getHdfsUdfFilename(tenantCode, hdfsFileName);
}
return hdfsFileName;
}
/** /**
* get authorized resource list * get authorized resource list
* *

10
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -166,7 +166,7 @@ public class TenantService extends BaseService{
Tenant tenant = tenantMapper.queryById(id); Tenant tenant = tenantMapper.queryById(id);
if (tenant == null){ if (tenant == null){
putMsg(result, Status.USER_NOT_EXIST, id); putMsg(result, Status.TENANT_NOT_EXIST);
return result; return result;
} }
@ -230,6 +230,13 @@ public class TenantService extends BaseService{
Tenant tenant = tenantMapper.queryById(id); Tenant tenant = tenantMapper.queryById(id);
if (tenant == null){
putMsg(result, Status.TENANT_NOT_EXIST);
return result;
}
// if hdfs startup
if (PropertyUtils.getBoolean(cn.escheduler.common.Constants.HDFS_STARTUP_STATE)){
String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode(); String tenantPath = HadoopUtils.getHdfsDataBasePath() + "/" + tenant.getTenantCode();
String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode()); String resourcePath = HadoopUtils.getHdfsDir(tenant.getTenantCode());
@ -245,6 +252,7 @@ public class TenantService extends BaseService{
} }
HadoopUtils.getInstance().delete(tenantPath, true); HadoopUtils.getInstance().delete(tenantPath, true);
}
tenantMapper.deleteById(id); tenantMapper.deleteById(id);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

24
escheduler-api/src/test/java/cn/escheduler/api/controller/ResourcesControllerTest.java

@ -34,6 +34,8 @@ import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.WebApplicationContext;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
@ -43,7 +45,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
public class ResourcesControllerTest { public class ResourcesControllerTest {
private static Logger logger = LoggerFactory.getLogger(QueueControllerTest.class); private static Logger logger = LoggerFactory.getLogger(ResourcesControllerTest.class);
private MockMvc mockMvc; private MockMvc mockMvc;
@ -71,4 +73,24 @@ public class ResourcesControllerTest {
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString()); logger.info(mvcResult.getResponse().getContentAsString());
} }
@Test
public void verifyResourceName() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","list_resources_1.sh");
paramsMap.add("type","FILE");
MvcResult mvcResult = mockMvc.perform(get("/resources/verify-name")
.header("sessionId", "c24ed9d9-1c20-48a0-bd9c-5cfca14a4dcb")
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
} }

2
escheduler-common/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-common</artifactId> <artifactId>escheduler-common</artifactId>
<name>escheduler-common</name> <name>escheduler-common</name>

2
escheduler-common/src/main/java/cn/escheduler/common/utils/OSUtils.java

@ -220,7 +220,7 @@ public class OSUtils {
* @throws IOException * @throws IOException
*/ */
public static String exeShell(String command) throws IOException { public static String exeShell(String command) throws IOException {
return ShellExecutor.execCommand("groups"); return ShellExecutor.execCommand(command);
} }
/** /**

2
escheduler-dao/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-dao</artifactId> <artifactId>escheduler-dao</artifactId>
<name>escheduler-dao</name> <name>escheduler-dao</name>

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java

@ -189,7 +189,7 @@ public class ProcessDefinitionMapperProvider {
if(userId != null && 0 != Integer.parseInt(userId.toString())){ if(userId != null && 0 != Integer.parseInt(userId.toString())){
WHERE("td.user_id = #{userId}"); WHERE("td.user_id = #{userId}");
} }
ORDER_BY(" td.update_time desc limit #{offset},#{pageSize} "); ORDER_BY(" sc.schedule_release_state desc,td.update_time desc limit #{offset},#{pageSize} ");
}}.toString(); }}.toString();
} }
/** /**

1
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java

@ -118,6 +118,7 @@ public class ResourceMapperProvider {
SET("`alias` = #{resource.alias}"); SET("`alias` = #{resource.alias}");
SET("`desc` = #{resource.desc}"); SET("`desc` = #{resource.desc}");
SET("`update_time` = #{resource.updateTime}"); SET("`update_time` = #{resource.updateTime}");
SET("`size` = #{resource.size}");
WHERE("`id` = #{resource.id}"); WHERE("`id` = #{resource.id}");
}}.toString(); }}.toString();
} }

2
escheduler-rpc/pom.xml

@ -4,7 +4,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

2
escheduler-server/pom.xml

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>escheduler-server</artifactId> <artifactId>escheduler-server</artifactId>
<name>escheduler-server</name> <name>escheduler-server</name>

25
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java

@ -18,6 +18,7 @@ package cn.escheduler.server.utils;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.rpc.LogClient; import cn.escheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
@ -33,6 +34,7 @@ import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
/** /**
* mainly used to get the start command line of a process * mainly used to get the start command line of a process
*/ */
@ -139,6 +141,8 @@ public class ProcessUtils {
{' ', '\t', '<', '>'}, {' ', '\t'}}; {' ', '\t', '<', '>'}, {' ', '\t'}};
private static Matcher matcher;
private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) { private static String createCommandLine(int verificationType, final String executablePath, final String[] cmd) {
StringBuilder cmdbuf = new StringBuilder(80); StringBuilder cmdbuf = new StringBuilder(80);
@ -256,11 +260,11 @@ public class ProcessUtils {
return ; return ;
} }
String cmd = String.format("sudo kill -9 %d", processId); String cmd = String.format("sudo kill -9 %s", getPidsStr(processId));
logger.info("process id:{}, cmd:{}", processId, cmd); logger.info("process id:{}, cmd:{}", processId, cmd);
Runtime.getRuntime().exec(cmd); OSUtils.exeCmd(cmd);
// find log and kill yarn job // find log and kill yarn job
killYarnJob(taskInstance); killYarnJob(taskInstance);
@ -270,6 +274,23 @@ public class ProcessUtils {
} }
} }
/**
* get pids str
* @param processId
* @return
* @throws Exception
*/
private static String getPidsStr(int processId)throws Exception{
StringBuilder sb = new StringBuilder();
// pstree -p pid get sub pids
String pids = OSUtils.exeCmd("pstree -p " +processId+ "");
Matcher mat = Pattern.compile("(\\d+)").matcher(pids);
while (mat.find()){
sb.append(mat.group()+" ");
}
return sb.toString().trim();
}
/** /**
* find logs and kill yarn tasks * find logs and kill yarn tasks
* @param taskInstance * @param taskInstance

22
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java

@ -213,7 +213,7 @@ public abstract class AbstractCommandExecutor {
*/ */
private int updateState(ProcessDao processDao, int exitStatusCode, int pid, int taskInstId) { private int updateState(ProcessDao processDao, int exitStatusCode, int pid, int taskInstId) {
//get yarn state by log //get yarn state by log
if (exitStatusCode != -1) { if (exitStatusCode != 0) {
TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
logger.info("process id is {}", pid); logger.info("process id is {}", pid);
@ -380,14 +380,22 @@ public abstract class AbstractCommandExecutor {
boolean result = true; boolean result = true;
try { try {
for (String appId : appIds) { for (String appId : appIds) {
while(true){
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId); ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
logger.info("appId:{}, final state:{}",appId,applicationStatus.name()); logger.info("appId:{}, final state:{}",appId,applicationStatus.name());
if (!applicationStatus.equals(ExecutionStatus.SUCCESS)) { if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
result = false; applicationStatus.equals(ExecutionStatus.KILL)) {
return false;
}
if (applicationStatus.equals(ExecutionStatus.SUCCESS)){
break;
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(String.format("mapreduce applications: %s status failed : " + e.getMessage(), appIds.toString()),e); logger.error(String.format("yarn applications: %s status failed : " + e.getMessage(), appIds.toString()),e);
result = false; result = false;
} }
return result; return result;
@ -548,10 +556,4 @@ public abstract class AbstractCommandExecutor {
protected abstract boolean checkShowLog(String line); protected abstract boolean checkShowLog(String line);
protected abstract boolean checkFindApp(String line); protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
// if(line.contains(taskAppId) || !line.contains("cn.escheduler.server.worker.log.TaskLogger")){
// logs.add(line);
// }
} }

30
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -196,7 +196,7 @@ public class SqlTask extends AbstractTask {
} }
// special characters need to be escaped, ${} needs to be escaped // special characters need to be escaped, ${} needs to be escaped
String rgex = "'?\\$\\{(.*?)\\}'?"; String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap); setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap);
// replace the ${} of the SQL statement with the Placeholder // replace the ${} of the SQL statement with the Placeholder
@ -310,6 +310,7 @@ public class SqlTask extends AbstractTask {
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(e.getMessage(),e); logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
} }
return connection; return connection;
} }
@ -326,6 +327,7 @@ public class SqlTask extends AbstractTask {
ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue()); ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue());
} }
} }
logger.info("prepare statement replace sql:{}",stmt.toString());
return stmt; return stmt;
} }
@ -347,14 +349,14 @@ public class SqlTask extends AbstractTask {
// receiving group list // receiving group list
List<String> receviersList = new ArrayList<String>(); List<String> receviersList = new ArrayList<String>();
for(User user:users){ for(User user:users){
receviersList.add(user.getEmail()); receviersList.add(user.getEmail().trim());
} }
// custom receiver // custom receiver
String receivers = sqlParameters.getReceivers(); String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){ if (StringUtils.isNotEmpty(receivers)){
String[] splits = receivers.split(Constants.COMMA); String[] splits = receivers.split(Constants.COMMA);
for (String receiver : splits){ for (String receiver : splits){
receviersList.add(receiver); receviersList.add(receiver.trim());
} }
} }
@ -365,15 +367,19 @@ public class SqlTask extends AbstractTask {
if (StringUtils.isNotEmpty(receiversCc)){ if (StringUtils.isNotEmpty(receiversCc)){
String[] splits = receiversCc.split(Constants.COMMA); String[] splits = receiversCc.split(Constants.COMMA);
for (String receiverCc : splits){ for (String receiverCc : splits){
receviersCcList.add(receiverCc); receviersCcList.add(receiverCc.trim());
} }
} }
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
MailUtils.sendMails(receviersList,receviersCcList,title, content, ShowType.valueOf(showTypeName)); Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(cn.escheduler.api.utils.Constants.STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{ }else{
logger.error("showType: {} is not valid " ,showTypeName); logger.error("showType: {} is not valid " ,showTypeName);
throw new RuntimeException(String.format("showType: %s is not valid ",showTypeName));
} }
} }
@ -411,19 +417,5 @@ public class SqlTask extends AbstractTask {
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")"); logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
} }
logger.info(logPrint.toString()); logger.info(logPrint.toString());
//direct print style
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
StringBuffer sb = new StringBuffer("replaced sql , direct:");
while (m.find()) {
m.appendReplacement(sb, sqlParamsMap.get(index).getValue());
index ++;
}
m.appendTail(sb);
logger.info(sb.toString());
} }
} }

2
pom.xml

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>cn.analysys</groupId> <groupId>cn.analysys</groupId>
<artifactId>escheduler</artifactId> <artifactId>escheduler</artifactId>
<version>1.0.3-SNAPSHOT</version> <version>1.0.4-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<name>escheduler</name> <name>escheduler</name>
<url>http://maven.apache.org</url> <url>http://maven.apache.org</url>

2
sql/soft_version

@ -1 +1 @@
1.0.2 1.0.4
Loading…
Cancel
Save