Browse Source

Merge remote-tracking branch 'upstream/dev' into dev

# Conflicts:
#	escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
#	escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
pull/2/head
lenboo 5 years ago
parent
commit
8b35dde214
  1. 36
      .github/ISSUE_TEMPLATE/bug_report.md
  2. 20
      .github/ISSUE_TEMPLATE/feature_request.md
  3. 93
      CONTRIBUTING.md
  4. 2
      README.md
  5. 2
      README_zh_CN.md
  6. 0
      docs/en_us/README.md
  7. 2
      escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java
  8. 6
      escheduler-api/src/main/java/cn/escheduler/api/log/LogClient.java
  9. 6
      escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java
  10. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java
  11. 25
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  12. 29
      escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java
  13. 39
      escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java
  14. 17
      escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
  15. 14
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  16. 21
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  17. 6
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java
  18. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java
  19. 73
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  20. 8
      escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java
  21. 3
      escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
  22. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java
  23. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java
  24. 196
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  25. 283
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
  26. 8
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
  27. 113
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
  28. 25
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
  29. 41
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
  30. 16
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
  31. 86
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
  32. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
  33. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
  34. 7
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
  35. 337
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
  36. 72
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
  37. 32
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
  38. 6
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
  39. 259
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

36
.github/ISSUE_TEMPLATE/bug_report.md

@ -0,0 +1,36 @@
---
name: Bug report
about: Create a report to help us improve
title: "[BUG] bug title "
labels: bug
assignees: ''
---
*For better global communication, please give priority to using English description, thx! *
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior, for example:
1. Go to '...'
2. Click on '....'
3. Scroll down to '....'
4. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Which version of Easy Scheduler:**
-[1.1.0-preview]
**Additional context**
Add any other context about the problem here.
**Requirement or improvement
- Please describe about your requirements or improvement suggestions.

20
.github/ISSUE_TEMPLATE/feature_request.md

@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: "[Feature]"
labels: new feature
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

93
CONTRIBUTING.md

@ -1,5 +1,70 @@
EasyScheduler提交代码流程
=====
* First from the remote repository *https://github.com/analysys/EasyScheduler.git* fork code to your own repository
* there are three branches in the remote repository currently:
* master normal delivery branch
After the stable version is released, the code for the stable version branch is merged into the master branch.
* dev daily development branch
The daily development branch, the newly submitted code can pull requests to this branch.
* branch-1.0.0 release version branch
Release version branch, there will be 2.0 ... and other version branches, the version
branch only changes the error, does not add new features.
* Clone your own warehouse to your local
`git clone https://github.com/analysys/EasyScheduler.git`
* Add remote repository address, named upstream
`git remote add upstream https://github.com/analysys/EasyScheduler.git`
* View repository:
`git remote -v`
> There will be two repositories at this time: origin (your own warehouse) and upstream (remote repository)
* Get/update remote repository code (already the latest code, skip it)
`git fetch upstream`
* Synchronize remote repository code to local repository
```
git checkout origin/dev
git merge --no-ff upstream/dev
```
If remote branch has a new branch `DEV-1.0`, you need to synchronize this branch to the local repository.
```
git checkout -b dev-1.0 upstream/dev-1.0
git push --set-upstream origin dev1.0
```
* After modifying the code locally, submit it to your own repository:
`git commit -m 'test commit'`
`git push`
* Submit changes to the remote repository
* On the github page, click on the new pull request.
<p align = "center">
<img src = "http://geek.analysys.cn/static/upload/221/2019-04-02/90f3abbf-70ef-4334-b8d6-9014c9cf4c7f.png"width ="60%"/>
</ p>
* Select the modified local branch and the branch to merge past to create a pull request.
<p align = "center">
<img src = "http://geek.analysys.cn/static/upload/221/2019-04-02/fe7eecfe-2720-4736-951b-b3387cf1ae41.png"width ="60%"/>
</ p>
* Next, the administrator is responsible for **merging** to complete the pull request
---
* 首先从远端仓库*https://github.com/analysys/EasyScheduler.git* fork一份代码到自己的仓库中
* 远端仓库中目前有三个分支:
@ -14,7 +79,7 @@ EasyScheduler提交代码流程
* 把自己仓库clone到本地
`git clone https://github.com/**/EasyScheduler.git`
`git clone https://github.com/analysys/EasyScheduler.git`
* 添加远端仓库地址,命名为upstream
@ -26,17 +91,10 @@ EasyScheduler提交代码流程
> 此时会有两个仓库:origin(自己的仓库)和upstream(远端仓库)
* 获取远端仓库代码(已经是最新代码,就跳过)
* 获取/更新远端仓库代码(已经是最新代码,就跳过)
`git fetch upstream `
* 更新远端仓库代码
```
git checkout upstream/dev
git pull upstream dev
```
* 同步远端仓库代码到本地仓库
@ -54,7 +112,7 @@ git push --set-upstream origin dev1.0
* 在本地修改代码以后,提交到自己仓库:
`git ca -m 'test commit'`
`git commit -m 'test commit'`
`git push`
* 将修改提交到远端仓库
@ -68,6 +126,15 @@ git push --set-upstream origin dev1.0
<p align="center">
<img src="http://geek.analysys.cn/static/upload/221/2019-04-02/fe7eecfe-2720-4736-951b-b3387cf1ae41.png" width="60%" />
</p>
* 接下来由管理员负责将**Merge**完成此次pull request
* 接下来由管理员负责将**Merge**完成此次pull request

2
README.md

@ -81,7 +81,7 @@ Work plan of Easy Scheduler: [R&D plan](https://github.com/analysys/EasySchedule
### How to contribute code
Welcome to participate in contributing code, please refer to the process of submitting the code:
https://github.com/analysys/EasyScheduler/blob/master/CONTRIBUTING.md
[[How to contribute code](https://github.com/analysys/EasyScheduler/issues/310)]
### Thanks

2
README_zh_CN.md

@ -57,7 +57,7 @@ EasyScheduler的工作计划:<a href="https://github.com/analysys/EasySchedule
### 贡献代码
非常欢迎大家来参与贡献代码,提交代码流程请参考:
https://github.com/analysys/EasyScheduler/blob/master/CONTRIBUTING.md
[[How to contribute code](https://github.com/analysys/EasyScheduler/issues/310)]
### 感谢

0
docs/en_us/README.md

2
escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java

@ -116,7 +116,7 @@ public class LoginController extends BaseController {
response.setStatus(HttpStatus.SC_OK);
response.addCookie(new Cookie(Constants.SESSION_ID, sessionId));
logger.info("sessionId = " + sessionId);
logger.info("sessionId : {}" , sessionId);
return success(LOGIN_SUCCESS.getMsg(), sessionId);
} catch (Exception e) {
logger.error(USER_LOGIN_FAILURE.getMsg(),e);

6
escheduler-api/src/main/java/cn/escheduler/api/log/LogClient.java

@ -100,7 +100,7 @@ public class LogClient {
* @return
*/
public String viewLog(String path) {
logger.info("view queryLog path {}",path);
logger.info("view log path {}",path);
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
RetStrInfo retStrInfo;
try {
@ -119,14 +119,14 @@ public class LogClient {
* @return
*/
public byte[] getLogBytes(String path) {
logger.info("get log path {}",path);
logger.info("log path {}",path);
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
RetByteInfo retByteInfo;
try {
retByteInfo = blockingStub.getLogBytes(pathParameter);
return retByteInfo.getData().toByteArray();
} catch (StatusRuntimeException e) {
logger.error("get log size error", e);
logger.error("log size error", e);
return null;
}
}

6
escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java

@ -68,7 +68,7 @@ public class SessionService extends BaseService{
String ip = BaseController.getClientIpAddress(request);
logger.info("get session: {}, ip: {}", sessionId, ip);
return sessionMapper.queryByIdAndIp(sessionId, ip);
return sessionMapper.queryByIdAndIp(sessionId);
}
/**
@ -80,7 +80,7 @@ public class SessionService extends BaseService{
*/
public String createSession(User user, String ip) {
// logined
Session session = sessionMapper.queryByUserIdAndIp(user.getId(), ip);
Session session = sessionMapper.queryByUserIdAndIp(user.getId());
Date now = new Date();
/**
@ -126,7 +126,7 @@ public class SessionService extends BaseService{
/**
* query session by user id and ip
*/
Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(), ip);
Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId());
//delete session
sessionMapper.deleteById(session.getId());
}

2
escheduler-api/src/main/java/cn/escheduler/api/service/UsersService.java

@ -322,7 +322,7 @@ public class UsersService extends BaseService {
if (user != null) {
if (PropertyUtils.getResUploadStartupState()) {
String userPath = HadoopUtils.getHdfsDataBasePath() + "/" + user.getTenantCode() + "/home/" + id;
String userPath = HadoopUtils.getHdfsUserDir(user.getTenantCode(),id);
if (HadoopUtils.getInstance().exists(userPath)) {
HadoopUtils.getInstance().delete(userPath, true);
}

25
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -119,10 +119,6 @@ public final class Constants {
*/
public static final String ESCHEDULER_ENV_PATH = "escheduler.env.path";
/**
* escheduler.env.sh
*/
public static final String ESCHEDULER_ENV_SH = ".escheduler_env.sh";
/**
* python home
@ -220,9 +216,9 @@ public final class Constants {
public static final String SEMICOLON = ";";
/**
* DOT .
* EQUAL SIGN
*/
public static final String DOT = ".";
public static final String EQUAL_SIGN = "=";
/**
* ZOOKEEPER_SESSION_TIMEOUT
@ -283,10 +279,6 @@ public final class Constants {
*/
public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
/**
* date format of yyyyMMdd
*/
public static final String YYYYMMDD = "yyyyMMdd";
/**
* date format of yyyyMMddHHmmss
@ -489,6 +481,7 @@ public final class Constants {
public static final String TASK_RECORD_PWD = "task.record.datasource.password";
public static final String DEFAULT = "Default";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String XXXXXX = "******";
@ -499,6 +492,7 @@ public final class Constants {
public static final String STATUS = "status";
/**
* command parameter keys
*/
@ -866,6 +860,11 @@ public final class Constants {
*/
public static final int PREVIEW_SCHEDULE_EXECUTE_COUNT = 5;
/**
* kerberos
*/
public static final String KERBEROS = "kerberos";
/**
* java.security.krb5.conf
*/
@ -901,4 +900,10 @@ public final class Constants {
* loginUserFromKeytab path
*/
public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path";
/**
* hive conf
*/
public static final String HIVE_CONF = "hiveconf:";
}

29
escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.escheduler.common.enums;
/**
* cycle enums
*/
public enum ServerEnum {
/**
* master server , worker server
*/
MASTER_SERVER,WORKER_SERVER
}

39
escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java

@ -21,6 +21,8 @@ import cn.escheduler.common.utils.JSONUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static cn.escheduler.common.Constants.*;
/**
* produce datasource in this custom defined datasource factory.
*/
@ -49,8 +51,43 @@ public class DataSourceFactory {
return null;
}
} catch (Exception e) {
logger.error("Get datasource object error", e);
logger.error("get datasource object error", e);
return null;
}
}
/**
* load class
* @param dbType
* @throws Exception
*/
public static void loadClass(DbType dbType) throws Exception{
switch (dbType){
case MYSQL :
Class.forName(JDBC_MYSQL_CLASS_NAME);
break;
case POSTGRESQL :
Class.forName(JDBC_POSTGRESQL_CLASS_NAME);
break;
case HIVE :
Class.forName(JDBC_HIVE_CLASS_NAME);
break;
case SPARK :
Class.forName(JDBC_SPARK_CLASS_NAME);
break;
case CLICKHOUSE :
Class.forName(JDBC_CLICKHOUSE_CLASS_NAME);
break;
case ORACLE :
Class.forName(JDBC_ORACLE_CLASS_NAME);
break;
case SQLSERVER:
Class.forName(JDBC_SQLSERVER_CLASS_NAME);
break;
default:
logger.error("not support sql type: {},can't load class", dbType);
throw new IllegalArgumentException("not support sql type,can't load class");
}
}
}

17
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java

@ -19,6 +19,8 @@ package cn.escheduler.common.utils;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ResUploadType;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,4 +76,19 @@ public class CommonUtils {
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
/**
* load kerberos configuration
* @throws Exception
*/
public static void loadKerberosConf()throws Exception{
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(JAVA_SECURITY_KRB5_CONF, getString(JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS);
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(LOGIN_USER_KEY_TAB_USERNAME),
getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
}
}

14
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.IStoppable;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.model.MasterServer;
import cn.escheduler.common.enums.ServerEnum;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.utils.ResInfo;
@ -568,8 +569,19 @@ public abstract class AbstractZKClient {
}
return path.substring(startIndex, endIndex);
}
/**
* acquire zk lock
* @param zkClient
* @param zNodeLockPath
* @throws Exception
*/
public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String zNodeLockPath)throws Exception{
InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath);
mutex.acquire();
return mutex;
}
@Override
@Override
public String toString() {
return "AbstractZKClient{" +
"zkClient=" + zkClient +

21
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -1220,6 +1220,26 @@ public class ProcessDao extends AbstractBaseDao {
return taskInstanceMapper.queryById(taskId);
}
/**
* package task instanceassociate processInstance and processDefine
* @param taskInstId
* @return
*/
public TaskInstance getTaskInstanceRelationByTaskId(int taskInstId){
// get task instance
TaskInstance taskInstance = findTaskInstanceById(taskInstId);
// get process instance
ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = findProcessDefineById(taskInstance.getProcessDefinitionId());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
return taskInstance;
}
/**
* get id list by task state
* @param instanceId
@ -1324,7 +1344,6 @@ public class ProcessDao extends AbstractBaseDao {
String executePath,
String logPath,
int taskInstId) {
TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId);
taskInstance.setState(state);
taskInstance.setStartTime(startTime);

6
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java

@ -75,7 +75,6 @@ public interface SessionMapper {
* query by session id and ip
*
* @param sessionId
* @param ip
* @return
*/
@Results(value = {
@ -85,13 +84,12 @@ public interface SessionMapper {
@Result(property = "lastLoginTime", column = "last_login_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
})
@SelectProvider(type = SessionMapperProvider.class, method = "queryByIdAndIp")
Session queryByIdAndIp(@Param("sessionId") String sessionId, @Param("ip") String ip);
Session queryByIdAndIp(@Param("sessionId") String sessionId);
/**
* query by user id and ip
* @param userId
* @param ip
* @return
*/
@Results(value = {
@ -101,6 +99,6 @@ public interface SessionMapper {
@Result(property = "lastLoginTime", column = "last_login_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
})
@SelectProvider(type = SessionMapperProvider.class, method = "queryByUserIdAndIp")
Session queryByUserIdAndIp(@Param("userId") int userId, @Param("ip") String ip);
Session queryByUserIdAndIp(@Param("userId") int userId);
}

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java

@ -114,7 +114,6 @@ public class SessionMapperProvider {
FROM(TABLE_NAME);
WHERE("`id` = #{sessionId}");
WHERE("`ip` = #{ip}");
}}.toString();
}
@ -130,7 +129,6 @@ public class SessionMapperProvider {
FROM(TABLE_NAME);
WHERE("`user_id` = #{userId}");
WHERE("`ip` = #{ip}");
}}.toString();
}
}

73
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@ -529,6 +529,39 @@ public class ProcessInstance {
this.timeout = timeout;
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
public int getTenantId() {
return this.tenantId ;
}
public String getWorkerGroupName() {
return workerGroupName;
}
public void setWorkerGroupName(String workerGroupName) {
this.workerGroupName = workerGroupName;
}
public String getReceivers() {
return receivers;
}
public void setReceivers(String receivers) {
this.receivers = receivers;
}
public String getReceiversCc() {
return receiversCc;
}
public void setReceiversCc(String receiversCc) {
this.receiversCc = receiversCc;
}
@Override
public String toString() {
return "ProcessInstance{" +
@ -555,7 +588,6 @@ public class ProcessInstance {
", processInstanceJson='" + processInstanceJson + '\'' +
", executorId=" + executorId +
", tenantCode='" + tenantCode + '\'' +
", tenantId='" + tenantId + '\'' +
", queue='" + queue + '\'' +
", isSubProcess=" + isSubProcess +
", locations='" + locations + '\'' +
@ -563,40 +595,13 @@ public class ProcessInstance {
", historyCmd='" + historyCmd + '\'' +
", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' +
", duration=" + duration +
", timeout=" + timeout +
", processInstancePriority=" + processInstancePriority +
", workerGroupId=" + workerGroupId +
", timeout=" + timeout +
", tenantId=" + tenantId +
", workerGroupName='" + workerGroupName + '\'' +
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
'}';
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
public int getTenantId() {
return this.tenantId ;
}
public String getWorkerGroupName() {
return workerGroupName;
}
public void setWorkerGroupName(String workerGroupName) {
this.workerGroupName = workerGroupName;
}
public String getReceivers() {
return receivers;
}
public void setReceivers(String receivers) {
this.receivers = receivers;
}
public String getReceiversCc() {
return receiversCc;
}
public void setReceiversCc(String receiversCc) {
this.receiversCc = receiversCc;
}
}

8
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java

@ -189,6 +189,14 @@ public class TaskInstance {
private int workerGroupId;
public void init(String host,Date startTime,String executePath){
this.host = host;
this.startTime = startTime;
this.executePath = executePath;
}
public ProcessInstance getProcessInstance() {
return processInstance;
}

3
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java

@ -314,8 +314,7 @@ public class ProcessUtils {
}
} catch (Exception e) {
logger.error("kill yarn job failed : " + e.getMessage(),e);
// throw new RuntimeException("kill yarn job fail");
logger.error("kill yarn job failure",e);
}
}
}

2
escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java

@ -254,7 +254,7 @@ public class WorkerServer implements IStoppable {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (InterruptedException e) {
logger.error("interrupted exception : " + e.getMessage(),e);
logger.error("interrupted exception",e);
}
// if set is null , return
if (CollectionUtils.isNotEmpty(taskInfoSet)){

2
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java

@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory;
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent> {
private static final Logger logger = LoggerFactory.getLogger(TaskLogAppender.class);
private String currentlyActiveFile;
@Override

196
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
@ -28,7 +29,6 @@ import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -75,8 +75,20 @@ public class FetchTaskThread implements Runnable{
*/
private int workerExecNums;
/**
* conf
*/
private Configuration conf;
/**
* task instance
*/
private TaskInstance taskInstance;
/**
* task instance id
*/
Integer taskInstId;
public FetchTaskThread(int taskNum, ZKWorkerClient zkWorkerClient,
ProcessDao processDao, Configuration conf,
@ -125,116 +137,95 @@ public class FetchTaskThread implements Runnable{
@Override
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
if(tasksQueueList.size() > 0){
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
mutex.acquire();
boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);
// task instance id str
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isNotBlank(taskQueueStr )) {
if (!checkThreadCount(poolExecutor)) {
break;
}
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[3];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
logger.info("worker fetch taskId : {} from queue ", taskId);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskId);
retryTimes--;
}
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
continue;
}
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
// set execute task worker host
taskInstance.setHost(OSUtils.getHost());
taskInstance.setStartTime(now);
if(!runCheckFlag) {
continue;
}
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
if (CollectionUtils.isEmpty(tasksQueueList)){
continue;
}
// creating distributed locks, lock path /escheduler/lock/worker
mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
zkWorkerClient.getWorkerLockPath());
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
// task instance id str
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isEmpty(taskQueueStr)) {
continue;
}
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
if (!checkThreadCount(poolExecutor)) {
break;
}
// get task instance id
taskInstId = Integer.parseInt(taskQueueStr.split(Constants.UNDERLINE)[3]);
// get task instance relation
taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId);
Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
taskInstance.getProcessDefine().getUserId());
if(tenant == null){
logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
continue;
}
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
processDefine.getId(),
processInstance.getId(),
taskInstance.getId());
logger.info("task instance local execute path : {} ", execLocalPath);
logger.info("worker fetch taskId : {} from queue ", taskInstId);
// mainly to wait for the master insert task to succeed
waitForMasterEnterQueue();
// set task execute path
taskInstance.setExecutePath(execLocalPath);
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskInstId);
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
continue;
}
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant == null){
logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}",
taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId());
continue;
}
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
tenant.getTenantCode(), logger);
// get local execute path
logger.info("task instance local execute path : {} ", getExecLocalPath());
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// init task
taskInstance.init(OSUtils.getHost(),
new Date(),
getExecLocalPath());
}
}
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(getExecLocalPath(),
tenant.getTenantCode(), logger);
}
logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// remove node from zk
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e);
logger.error("fetch task thread failure" ,e);
}finally {
AbstractZKClient.releaseMutex(mutex);
}
@ -242,16 +233,45 @@ public class FetchTaskThread implements Runnable{
}
/**
*
* get execute local path
* @return
*/
private String getExecLocalPath(){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/**
* check
* @param poolExecutor
* @return
*/
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
logger.info("thread insufficient , activeCount : {} , " +
"workerExecNums : {}, will sleep : {} millis for thread resource",
activeCount,
workerExecNums,
Constants.SLEEP_TIME_MILLIS);
return false;
}
return true;
}
/**
* mainly to wait for the master insert task to succeed
* @throws Exception
*/
private void waitForMasterEnterQueue()throws Exception{
int retryTimes = 30;
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskInstId);
retryTimes--;
}
}
}

283
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -59,13 +59,16 @@ import java.util.stream.Collectors;
/**
* task scheduler thread
*/
public class TaskScheduleThread implements Callable<Boolean> {
public class TaskScheduleThread implements Runnable {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class);
/**
* task prefix
*/
private static final String TASK_PREFIX = "TASK";
/**
@ -79,7 +82,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
private final ProcessDao processDao;
/**
* execute task info
* abstract task
*/
private AbstractTask task;
@ -89,115 +92,55 @@ public class TaskScheduleThread implements Callable<Boolean> {
}
@Override
public Boolean call() throws Exception {
public void run() {
// get task type
String taskType = taskInstance.getTaskType();
// set task state
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
// update task state
if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
processDao.changeTaskState(taskInstance.getState(),
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
System.getProperty("user.dir") + "/logs/" +
taskInstance.getProcessDefinitionId() +"/" +
taskInstance.getProcessInstanceId() +"/" +
taskInstance.getId() + ".log",
taskInstance.getId());
}else{
processDao.changeTaskState(taskInstance.getState(),
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
System.getProperty("user.dir") + "/logs/" +
taskInstance.getProcessDefinitionId() +"/" +
taskInstance.getProcessInstanceId() +"/" +
taskInstance.getId() + ".log",
taskInstance.getId());
}
ExecutionStatus status = ExecutionStatus.SUCCESS;
// update task state is running according to task type
updateTaskState(taskInstance.getTaskType());
try {
logger.info("script path : {}", taskInstance.getExecutePath());
// task node
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// custom param str
String customParamStr = taskInstance.getProcessInstance().getGlobalParams();
Map<String,String> allParamMap = new HashMap<>();
if (customParamStr != null) {
List<Property> customParamMap = JSONObject.parseArray(customParamStr, Property.class);
Map<String,String> userDefinedParamMap = customParamMap.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
allParamMap.putAll(userDefinedParamMap);
}
logger.info("script path : {}",taskInstance.getExecutePath());
TaskProps taskProps = new TaskProps();
taskProps.setTaskDir(taskInstance.getExecutePath());
String taskJson = taskInstance.getTaskJson();
TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
List<String> projectRes = createProjectResFiles(taskNode);
// copy hdfs file to local
// copy hdfs/minio file to local
copyHdfsToLocal(processDao,
taskInstance.getExecutePath(),
projectRes,
createProjectResFiles(taskNode),
logger);
// set task params
taskProps.setTaskParams(taskNode.getParams());
// set tenant code , execute task linux user
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
taskProps.setScheduleTime(processInstance.getScheduleTime());
taskProps.setNodeName(taskInstance.getName());
taskProps.setTaskInstId(taskInstance.getId());
taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
// get process define according to tak instance
ProcessDefinition processDefine = taskInstance.getProcessDefine();
// get tenant info
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
processDefine.getUserId());
if(tenant == null){
processInstance.setTenantCode(tenant.getTenantCode());
logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}",
processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId()
);
status = ExecutionStatus.FAILURE;
logger.error("cannot find the tenant, process definition id:{}, user id:{}",
processDefine.getId(),
processDefine.getUserId());
task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}else{
taskProps.setTenantCode(tenant.getTenantCode());
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
// set queue
if (StringUtils.isEmpty(queue)){
taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else {
taskProps.setQueue(tenant.getQueueName());
}
taskProps.setTaskStartTime(taskInstance.getStartTime());
taskProps.setDefinedParams(allParamMap);
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInstance.getExecutePath(),
processInstance.getScheduleTime(),
taskInstance.getName(),
taskInstance.getTaskType(),
taskInstance.getId(),
CommonUtils.getSystemEnvPath(),
tenant.getTenantCode(),
tenant.getQueueName(),
taskInstance.getStartTime(),
getGlobalParamsMap(),
taskInstance.getDependency(),
processInstance.getCmdTypeIfComplement());
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setDependence(taskInstance.getDependency());
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
@ -209,72 +152,98 @@ public class TaskScheduleThread implements Callable<Boolean> {
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
task = TaskManager.newTask(taskInstance.getTaskType(),
taskProps,
taskLogger);
// job init
// task init
task.init();
// job handle
// task handle
task.handle();
logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
status = ExecutionStatus.SUCCESS;
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
}
}
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL;
}else {
status = ExecutionStatus.FAILURE;
}
// task result process
task.after();
}
}catch (Exception e){
logger.error("task escheduler failure : ", e);
status = ExecutionStatus.FAILURE ;
logger.error(String.format("task process exception, process id : %s , task : %s",
taskInstance.getProcessInstanceId(),
taskInstance.getName()),e);
logger.error("task scheduler failure", e);
task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
kill();
}
logger.info("task instance id : {},task final status : {}",
taskInstance.getId(),
task.getExitStatus());
// update task instance state
processDao.changeTaskState(status,
processDao.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
return task.getExitStatusCode() > Constants.EXIT_CODE_SUCCESS;
}
/**
* set task time out
* get global paras map
* @return
*/
private Map<String, String> getGlobalParamsMap() {
Map<String,String> globalParamsMap = new HashMap<>(16);
// global params string
String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams();
if (globalParamsStr != null) {
List<Property> globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
}
return globalParamsMap;
}
/**
* update task state according to task type
* @param taskType
*/
private void updateTaskState(String taskType) {
// update task status is running
if(taskType.equals(TaskType.SQL.name()) ||
taskType.equals(TaskType.PROCEDURE.name())){
processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
getTaskLogPath(),
taskInstance.getId());
}else{
processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
taskInstance.getStartTime(),
taskInstance.getHost(),
taskInstance.getExecutePath(),
getTaskLogPath(),
taskInstance.getId());
}
}
/**
* get task log path
* @return
*/
private String getTaskLogPath() {
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
"logs" + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log";
}
/**
* set task timeout
* @param taskProps
* @param taskNode
*/
private void setTaskTimeout(TaskProps taskProps, TaskNode taskNode) {
// the default timeout is the maximum value of the integer
taskProps.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if (taskTimeoutParameter.getEnable()){
// get timeout strategy
taskProps.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy());
switch (taskTimeoutParameter.getStrategy()){
case WARN:
@ -298,38 +267,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
}
/**
* get current task parameter class
* @return
*/
private Class getCurTaskParamsClass(){
Class paramsClass = null;
TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
switch (taskType){
case SHELL:
paramsClass = ShellParameters.class;
break;
case SQL:
paramsClass = SqlParameters.class;
break;
case PROCEDURE:
paramsClass = ProcedureParameters.class;
break;
case MR:
paramsClass = MapreduceParameters.class;
break;
case SPARK:
paramsClass = SparkParameters.class;
break;
case PYTHON:
paramsClass = PythonParameters.class;
break;
default:
logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type");
}
return paramsClass;
}
/**
* kill task
@ -376,9 +314,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
File resFile = new File(execLocalPath, res);
if (!resFile.exists()) {
try {
/**
* query the tenant code of the resource according to the name of the resource
*/
// query the tenant code of the resource according to the name of the resource
String tentnCode = processDao.queryTenantCodeByResName(res);
String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode,res);
@ -388,7 +324,6 @@ public class TaskScheduleThread implements Callable<Boolean> {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
}
} else {
logger.info("file : {} exists ", resFile.getName());
}

8
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java

@ -67,6 +67,11 @@ public abstract class AbstractCommandExecutor {
*/
protected final String taskAppId;
/**
* task appId
*/
protected final int taskInstId;
/**
* tenant code , execute task linux user
*/
@ -99,11 +104,12 @@ public abstract class AbstractCommandExecutor {
public AbstractCommandExecutor(Consumer<List<String>> logHandler,
String taskDir, String taskAppId, String tenantCode, String envFile,
String taskDir, String taskAppId,int taskInstId,String tenantCode, String envFile,
Date startTime, int timeout, Logger logger){
this.logHandler = logHandler;
this.taskDir = taskDir;
this.taskAppId = taskAppId;
this.taskInstId = taskInstId;
this.tenantCode = tenantCode;
this.envFile = envFile;
this.startTime = startTime;

113
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java

@ -16,10 +16,26 @@
*/
package cn.escheduler.server.worker.task;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.TaskRecordStatus;
import cn.escheduler.common.enums.TaskType;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.mr.MapreduceParameters;
import cn.escheduler.common.task.procedure.ProcedureParameters;
import cn.escheduler.common.task.python.PythonParameters;
import cn.escheduler.common.task.shell.ShellParameters;
import cn.escheduler.common.task.spark.SparkParameters;
import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.server.utils.ParamUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
/**
* executive task
@ -70,7 +86,7 @@ public abstract class AbstractTask {
public void cancelApplication(boolean status) throws Exception {
cancel = true;
this.cancel = status;
}
/**
@ -89,6 +105,9 @@ public abstract class AbstractTask {
return exitStatusCode;
}
public void setExitStatusCode(int exitStatusCode) {
this.exitStatusCode = exitStatusCode;
}
/**
* get task parameters
@ -96,4 +115,96 @@ public abstract class AbstractTask {
public abstract AbstractParameters getParameters();
/**
* result processing
*/
public void after(){
if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
// task recor flat : if true , start up qianfan
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskProps.getTaskType())){
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}
}
}
}
}else if (getExitStatusCode() == Constants.EXIT_CODE_KILL){
setExitStatusCode(Constants.EXIT_CODE_KILL);
}else {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}
}
/**
* get current task parameter class
* @return
*/
private Class getCurTaskParamsClass(){
Class paramsClass = null;
// get task type
TaskType taskType = TaskType.valueOf(taskProps.getTaskType());
switch (taskType){
case SHELL:
paramsClass = ShellParameters.class;
break;
case SQL:
paramsClass = SqlParameters.class;
break;
case PROCEDURE:
paramsClass = ProcedureParameters.class;
break;
case MR:
paramsClass = MapreduceParameters.class;
break;
case SPARK:
paramsClass = SparkParameters.class;
break;
case PYTHON:
paramsClass = PythonParameters.class;
break;
default:
logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type");
}
return paramsClass;
}
/**
* get exit status according to exitCode
* @return
*/
public ExecutionStatus getExitStatus(){
ExecutionStatus status;
switch (getExitStatusCode()){
case Constants.EXIT_CODE_SUCCESS:
status = ExecutionStatus.SUCCESS;
break;
case Constants.EXIT_CODE_KILL:
status = ExecutionStatus.KILL;
break;
default:
status = ExecutionStatus.FAILURE;
break;
}
return status;
}
}

25
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java

@ -38,7 +38,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
/**
* process task
*/
private ShellCommandExecutor processTask;
private ShellCommandExecutor shellCommandExecutor;
/**
* process database access
@ -53,21 +53,25 @@ public abstract class AbstractYarnTask extends AbstractTask {
public AbstractYarnTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
// find process instance by taskId
this.processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
this.processTask = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(), taskProps.getTaskAppId(),
taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
taskProps.getTaskAppId(),
taskProps.getTaskInstId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
}
@Override
public void handle() throws Exception {
try {
// construct process
exitStatusCode = processTask.run(buildCommand(), processDao);
exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
logger.error("yarn process failed : " + e.getMessage(), e);
logger.error("yarn process failure", e);
exitStatusCode = -1;
}
}
@ -76,9 +80,8 @@ public abstract class AbstractYarnTask extends AbstractTask {
public void cancelApplication(boolean status) throws Exception {
cancel = true;
// cancel process
processTask.cancelApplication();
int taskInstId = taskProps.getTaskInstId();
TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
shellCommandExecutor.cancelApplication();
TaskInstance taskInstance = processDao.findTaskInstanceById(taskProps.getTaskInstId());
if (status && taskInstance != null){
ProcessUtils.killYarnJob(taskInstance);
}

41
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java

@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.PropertyUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,9 +42,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
public PythonCommandExecutor(Consumer<List<String>> logHandler,
String taskDir, String taskAppId, String tenantCode, String envFile,
Date startTime, int timeout, Logger logger) {
super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger);
String taskDir,
String taskAppId,
int taskInstId,
String tenantCode,
String envFile,
Date startTime,
int timeout,
Logger logger) {
super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
}
@ -67,7 +72,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("tenant :{}, work dir:{}", tenantCode, taskDir);
logger.info("tenantCode :{}, task dir:{}", tenantCode, taskDir);
if (!Files.exists(Paths.get(commandFile))) {
logger.info("generate command file:{}", commandFile);
@ -80,16 +85,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
logger.info(sb.toString());
// write data to file
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
FileUtils.writeStringToFile(new File(commandFile),
sb.toString(),
StandardCharsets.UTF_8);
}
}
@Override
protected String commandType() {
String envPath = PropertyUtils.getString(Constants.ESCHEDULER_ENV_PATH);
String pythonHome = getPythonHome(envPath);
String pythonHome = getPythonHome(envFile);
if (StringUtils.isEmpty(pythonHome)){
return PYTHON;
}
@ -108,16 +112,25 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
/**
* get python home
* get the absolute path of the Python command
* note :
* common.properties
* PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
*
* for example :
* your PYTHON_HOM is /opt/python3.7/
* you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
* escheduler.env.path file.
*
* @param envPath
* @return
*/
private static String getPythonHome(String envPath){
BufferedReader br = null;
String line = null;
StringBuilder sb = new StringBuilder();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
String line;
while ((line = br.readLine()) != null){
if (line.contains(Constants.PYTHON_HOME)){
sb.append(line);
@ -128,13 +141,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
if (org.apache.commons.lang.StringUtils.isEmpty(result)){
return null;
}
String[] arrs = result.split("=");
String[] arrs = result.split(Constants.EQUAL_SIGN);
if (arrs.length == 2){
return arrs[1];
}
}catch (IOException e){
logger.error("read file failed : " + e.getMessage(),e);
logger.error("read file failure",e);
}finally {
try {
if (br != null){

16
escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java

@ -29,9 +29,7 @@ import java.util.List;
import java.util.function.Consumer;
/**
* command executor
*
* 进程真正在worker服务器上执行的任务
* shell command executor
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {
@ -39,9 +37,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
public ShellCommandExecutor(Consumer<List<String>> logHandler,
String taskDir, String taskAppId, String tenantCode, String envFile,
Date startTime, int timeout, Logger logger) {
super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger);
String taskDir,
String taskAppId,
int taskInstId,
String tenantCode,
String envFile,
Date startTime,
int timeout,
Logger logger) {
super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
}

86
escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java

@ -16,6 +16,7 @@
*/
package cn.escheduler.server.worker.task;
import cn.escheduler.common.enums.CommandType;
import cn.escheduler.common.enums.DataType;
import cn.escheduler.common.enums.Direct;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
@ -46,6 +47,8 @@ public class TaskProps {
**/
private String tenantCode;
private String taskType;
/**
* task parameters
**/
@ -101,6 +104,41 @@ public class TaskProps {
*/
private Date scheduleTime;
/**
* command type is complement
*/
private CommandType cmdTypeIfComplement;
public TaskProps(){}
public TaskProps(String taskParams,
String taskDir,
Date scheduleTime,
String nodeName,
String taskType,
int taskInstId,
String envFile,
String tenantCode,
String queue,
Date taskStartTime,
Map<String, String> definedParams,
String dependence,
CommandType cmdTypeIfComplement){
this.taskParams = taskParams;
this.taskDir = taskDir;
this.scheduleTime = scheduleTime;
this.nodeName = nodeName;
this.taskType = taskType;
this.taskInstId = taskInstId;
this.envFile = envFile;
this.tenantCode = tenantCode;
this.queue = queue;
this.taskStartTime = taskStartTime;
this.definedParams = definedParams;
this.dependence = dependence;
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getTenantCode() {
return tenantCode;
@ -200,22 +238,12 @@ public class TaskProps {
this.taskTimeoutStrategy = taskTimeoutStrategy;
}
/**
* get parameters map
* @return
*/
public Map<String,Property> getUserDefParamsMap() {
if (definedParams != null) {
Map<String,Property> userDefParamsMaps = new HashMap<>();
Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<String, String> en = iter.next();
Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
userDefParamsMaps.put(property.getProp(),property);
}
return userDefParamsMaps;
}
return null;
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getDependence() {
@ -233,4 +261,30 @@ public class TaskProps {
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public CommandType getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(CommandType cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
/**
* get parameters map
* @return
*/
public Map<String,Property> getUserDefParamsMap() {
if (definedParams != null) {
Map<String,Property> userDefParamsMaps = new HashMap<>();
Iterator<Map.Entry<String, String>> iter = definedParams.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<String, String> en = iter.next();
Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
userDefParamsMaps.put(property.getProp(),property);
}
return userDefParamsMaps;
}
return null;
}
}

2
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java

@ -208,6 +208,4 @@ public class DependentExecute {
return dependResultMap;
}
}

2
escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java

@ -104,7 +104,7 @@ public class DependentTask extends AbstractTask {
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
}catch (Exception e){
logger.error("Exception " + e);
logger.error(e.getMessage(),e);
exitStatusCode = -1;
}
}

7
escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java

@ -70,8 +70,8 @@ public class MapReduceTask extends AbstractYarnTask {
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
mapreduceParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null){
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
@ -86,7 +86,8 @@ public class MapReduceTask extends AbstractYarnTask {
protected String buildCommand() throws Exception {
List<String> parameterList = buildParameters(mapreduceParameters);
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList), taskProps.getDefinedParams());
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
taskProps.getDefinedParams());
logger.info("mapreduce task command: {}", command);
return command;

337
escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java

@ -21,12 +21,7 @@ import cn.escheduler.common.enums.DataType;
import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.Direct;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
import cn.escheduler.common.job.db.BaseDataSource;
import cn.escheduler.common.job.db.ClickHouseDataSource;
import cn.escheduler.common.job.db.MySQLDataSource;
import cn.escheduler.common.job.db.OracleDataSource;
import cn.escheduler.common.job.db.PostgreDataSource;
import cn.escheduler.common.job.db.SQLServerDataSource;
import cn.escheduler.common.job.db.*;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.procedure.ProcedureParameters;
@ -49,6 +44,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static cn.escheduler.common.enums.DataType.*;
/**
* procedure task
*/
@ -64,6 +61,11 @@ public class ProcedureTask extends AbstractTask {
*/
private ProcessDao processDao;
/**
* base datasource
*/
private BaseDataSource baseDataSource;
public ProcedureTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
@ -93,176 +95,181 @@ public class ProcedureTask extends AbstractTask {
// determine whether there is a data source
if (procedureParameters.getDatasource() == 0){
logger.error("datasource is null");
exitStatusCode = 0;
}else {
logger.error("datasource id not exists");
exitStatusCode = -1;
return;
}
DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),dataSource.getType(),dataSource.getNote(),
dataSource.getUserId(),dataSource.getConnectionParams());
DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
dataSource.getNote(),
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection connection = null;
CallableStatement stmt = null;
try {
// load class
DataSourceFactory.loadClass(dataSource.getType());
// get datasource
baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
if (dataSource != null){
Connection connection = null;
CallableStatement stmt = null;
try {
BaseDataSource baseDataSource = null;
if (DbType.MYSQL.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class);
Class.forName(Constants.JDBC_MYSQL_CLASS_NAME);
}else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
}else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
// NOTE: currently, ClickHouse don't support procedure or UDF yet,
// but still load JDBC driver to keep source code sync with other DB
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
}else if (DbType.ORACLE.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), OracleDataSource.class);
Class.forName(Constants.JDBC_ORACLE_CLASS_NAME);
}else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), SQLServerDataSource.class);
Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME);
}
// get jdbc connection
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(),
baseDataSource.getPassword());
// get jdbc connection
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(),
baseDataSource.getPassword());
// get process instance by task instance id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
procedureParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
procedureParameters.getLocalParametersMap(),
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
Collection<Property> userDefParamsList = null;
Collection<Property> userDefParamsList = null;
if (procedureParameters.getLocalParametersMap() != null){
userDefParamsList = procedureParameters.getLocalParametersMap().values();
}
if (procedureParameters.getLocalParametersMap() != null){
userDefParamsList = procedureParameters.getLocalParametersMap().values();
}
String method = "";
// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)){
method = "{call " + procedureParameters.getMethod() + "}";
}else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0 ;i < size - 1; i++){
parameter.append("?,");
}
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
}
String method = "";
// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)){
method = "{call " + procedureParameters.getMethod() + "}";
}else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0 ;i < size - 1; i++){
parameter.append("?,");
}
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
}
logger.info("call method : {}",method);
// call method
stmt = connection.prepareCall(method);
if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0){
int index = 1;
for (Property property : userDefParamsList){
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)){
ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
}else if (property.getDirect().equals(Direct.OUT)){
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
}
index++;
}
logger.info("call method : {}",method);
// call method
stmt = connection.prepareCall(method);
if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0){
int index = 1;
for (Property property : userDefParamsList){
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)){
ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
}else if (property.getDirect().equals(Direct.OUT)){
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
}
index++;
}
}
stmt.executeUpdate();
/**
* print the output parameters to the log
*/
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<Integer, Property> en = iter.next();
int index = en.getKey();
Property property = en.getValue();
String prop = property.getProp();
DataType dataType = property.getType();
if (dataType.equals(DataType.VARCHAR)){
String value = stmt.getString(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.INTEGER)){
int value = stmt.getInt(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.LONG)){
long value = stmt.getLong(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.FLOAT)){
float value = stmt.getFloat(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.DOUBLE)){
double value = stmt.getDouble(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.DATE)){
Date value = stmt.getDate(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.TIME)){
Time value = stmt.getTime(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.TIMESTAMP)){
Timestamp value = stmt.getTimestamp(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}else if (dataType.equals(DataType.BOOLEAN)){
boolean value = stmt.getBoolean(index);
logger.info("out prameter key : {} , value : {}",prop,value);
}
}
stmt.executeUpdate();
/**
* print the output parameters to the log
*/
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()){
Map.Entry<Integer, Property> en = iter.next();
int index = en.getKey();
Property property = en.getValue();
String prop = property.getProp();
DataType dataType = property.getType();
// get output parameter
getOutputParameter(stmt, index, prop, dataType);
}
exitStatusCode = 0;
}catch (Exception e){
logger.error(e.getMessage(),e);
exitStatusCode = 0;
}catch (Exception e){
logger.error(e.getMessage(),e);
exitStatusCode = -1;
throw new RuntimeException(String.format("process interrupted. exit status code is %d",exitStatusCode));
}
finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
exitStatusCode = -1;
throw new RuntimeException("process interrupted. exit status code is : " + exitStatusCode);
logger.error(e.getMessage(),e);
}
finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
exitStatusCode = -1;
logger.error(e.getMessage(),e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
exitStatusCode = -1;
logger.error(e.getMessage(), e);
}
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
exitStatusCode = -1;
logger.error(e.getMessage(), e);
}
}
}
}
/**
* get output parameter
* @param stmt
* @param index
* @param prop
* @param dataType
* @throws SQLException
*/
private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
switch (dataType){
case VARCHAR:
logger.info("out prameter key : {} , value : {}",prop,stmt.getString(index));
break;
case INTEGER:
logger.info("out prameter key : {} , value : {}", prop, stmt.getInt(index));
break;
case LONG:
logger.info("out prameter key : {} , value : {}",prop,stmt.getLong(index));
break;
case FLOAT:
logger.info("out prameter key : {} , value : {}",prop,stmt.getFloat(index));
break;
case DOUBLE:
logger.info("out prameter key : {} , value : {}",prop,stmt.getDouble(index));
break;
case DATE:
logger.info("out prameter key : {} , value : {}",prop,stmt.getDate(index));
break;
case TIME:
logger.info("out prameter key : {} , value : {}",prop,stmt.getTime(index));
break;
case TIMESTAMP:
logger.info("out prameter key : {} , value : {}",prop,stmt.getTimestamp(index));
break;
case BOOLEAN:
logger.info("out prameter key : {} , value : {}",prop, stmt.getBoolean(index));
break;
default:
break;
}
}
@Override
public AbstractParameters getParameters() {
return procedureParameters;
@ -277,61 +284,61 @@ public class ProcedureTask extends AbstractTask {
* @throws Exception
*/
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{
if (dataType.equals(DataType.VARCHAR)){
if (dataType.equals(VARCHAR)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.VARCHAR);
}else {
stmt.registerOutParameter(index, Types.VARCHAR, value);
}
}else if (dataType.equals(DataType.INTEGER)){
}else if (dataType.equals(INTEGER)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.INTEGER);
}else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}
}else if (dataType.equals(DataType.LONG)){
}else if (dataType.equals(LONG)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index,Types.INTEGER);
}else {
stmt.registerOutParameter(index,Types.INTEGER ,value);
}
}else if (dataType.equals(DataType.FLOAT)){
}else if (dataType.equals(FLOAT)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.FLOAT);
}else {
stmt.registerOutParameter(index, Types.FLOAT,value);
}
}else if (dataType.equals(DataType.DOUBLE)){
}else if (dataType.equals(DOUBLE)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.DOUBLE);
}else {
stmt.registerOutParameter(index, Types.DOUBLE , value);
}
}else if (dataType.equals(DataType.DATE)){
}else if (dataType.equals(DATE)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.DATE);
}else {
stmt.registerOutParameter(index, Types.DATE , value);
}
}else if (dataType.equals(DataType.TIME)){
}else if (dataType.equals(TIME)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.TIME);
}else {
stmt.registerOutParameter(index, Types.TIME , value);
}
}else if (dataType.equals(DataType.TIMESTAMP)){
}else if (dataType.equals(TIMESTAMP)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.TIMESTAMP);
}else {
stmt.registerOutParameter(index, Types.TIMESTAMP , value);
}
}else if (dataType.equals(DataType.BOOLEAN)){
}else if (dataType.equals(BOOLEAN)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.BOOLEAN);
}else {

72
escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java

@ -20,27 +20,18 @@ package cn.escheduler.server.worker.task.python;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.python.PythonParameters;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.PythonCommandExecutor;
import cn.escheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Map;
import java.util.Set;
/**
* python task
@ -57,7 +48,10 @@ public class PythonTask extends AbstractTask {
*/
private String taskDir;
private PythonCommandExecutor pythonProcessTask;
/**
* python command executor
*/
private PythonCommandExecutor pythonCommandExecutor;
/**
* process database access
@ -70,10 +64,15 @@ public class PythonTask extends AbstractTask {
this.taskDir = taskProps.getTaskDir();
this.pythonProcessTask = new PythonCommandExecutor(this::logHandle,
taskProps.getTaskDir(), taskProps.getTaskAppId(),
taskProps.getTenantCode(), null, taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), logger);
this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
taskProps.getTaskDir(),
taskProps.getTaskAppId(),
taskProps.getTaskInstId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@ -92,9 +91,9 @@ public class PythonTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
exitStatusCode = pythonProcessTask.run(buildCommand(), processDao);
exitStatusCode = pythonCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
logger.error("python process exception", e);
logger.error("python task failure", e);
exitStatusCode = -1;
}
}
@ -102,7 +101,7 @@ public class PythonTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
pythonProcessTask.cancelApplication();
pythonCommandExecutor.cancelApplication();
}
/**
@ -111,21 +110,7 @@ public class PythonTask extends AbstractTask {
* @throws Exception
*/
private String buildCommand() throws Exception {
// generate scripts
// String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId());
// Path path = new File(fileName).toPath();
// if (Files.exists(path)) {
// return fileName;
// }
String rawScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
// find process instance by task id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
/**
* combining local and global parameters
@ -133,27 +118,16 @@ public class PythonTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null){
rawScript = ParameterUtils.convertParameterPlaceholders(rawScript, ParamUtils.convert(paramsMap));
rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
// pythonParameters.setRawScript(rawScript);
logger.info("raw script : {}", pythonParameters.getRawScript());
logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
// Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-xr-x");
// FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
//
// Files.createFile(path, attr);
//
// Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
//
// return fileName;
return rawScript;
return rawPythonScript;
}
@Override

32
escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java

@ -54,7 +54,7 @@ public class ShellTask extends AbstractTask {
*/
private String taskDir;
private ShellCommandExecutor processTask;
private ShellCommandExecutor shellCommandExecutor;
/**
* process database access
@ -62,15 +62,19 @@ public class ShellTask extends AbstractTask {
private ProcessDao processDao;
public ShellTask(TaskProps props, Logger logger) {
super(props, logger);
public ShellTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.taskDir = props.getTaskDir();
this.taskDir = taskProps.getTaskDir();
this.processTask = new ShellCommandExecutor(this::logHandle,
props.getTaskDir(), props.getTaskAppId(),
props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
props.getTaskTimeout(), logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(),
taskProps.getTaskAppId(),
taskProps.getTaskInstId(),
taskProps.getTenantCode(),
taskProps.getEnvFile(),
taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(),
logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@ -89,9 +93,9 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
exitStatusCode = processTask.run(buildCommand(), processDao);
exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("shell task failure", e);
exitStatusCode = -1;
}
}
@ -99,7 +103,7 @@ public class ShellTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
processTask.cancelApplication();
shellCommandExecutor.cancelApplication();
}
/**
@ -118,8 +122,6 @@ public class ShellTask extends AbstractTask {
String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
// find process instance by task id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
/**
* combining local and global parameters
@ -127,8 +129,8 @@ public class ShellTask extends AbstractTask {
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
shellParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null){
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}

6
escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java

@ -66,8 +66,6 @@ public class SparkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
// get process instance by task instance id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
/**
* combining local and global parameters
@ -75,8 +73,8 @@ public class SparkTask extends AbstractYarnTask {
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}

259
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task.sql;
import cn.escheduler.alert.utils.MailUtils;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ShowType;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
import cn.escheduler.common.enums.UdfType;
@ -44,8 +43,6 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import java.sql.*;
@ -54,7 +51,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static cn.escheduler.common.utils.PropertyUtils.getString;
import static cn.escheduler.common.Constants.*;
import static cn.escheduler.common.enums.DbType.*;
/**
* sql task
@ -76,12 +74,22 @@ public class SqlTask extends AbstractTask {
*/
private AlertDao alertDao;
/**
* datasource
*/
private DataSource dataSource;
/**
* base datasource
*/
private BaseDataSource baseDataSource;
public SqlTask(TaskProps props, Logger logger) {
super(props, logger);
public SqlTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
logger.info("sql task params {}", taskProps.getTaskParams());
this.sqlParameters = JSONObject.parseObject(props.getTaskParams(), SqlParameters.class);
this.sqlParameters = JSONObject.parseObject(taskProps.getTaskParams(), SqlParameters.class);
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
@ -97,75 +105,73 @@ public class SqlTask extends AbstractTask {
Thread.currentThread().setName(threadLoggerInfoName);
logger.info(sqlParameters.toString());
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(),
sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), sqlParameters.getConnParams());
// determine whether there is a data source
sqlParameters.getType(),
sqlParameters.getDatasource(),
sqlParameters.getSql(),
sqlParameters.getLocalParams(),
sqlParameters.getUdfs(),
sqlParameters.getShowType(),
sqlParameters.getConnParams());
// not set data source
if (sqlParameters.getDatasource() == 0){
logger.error("datasource is null");
logger.error("datasource id not exists");
exitStatusCode = -1;
}else {
List<String> createFuncs = null;
DataSource dataSource = processDao.findDataSourceById(sqlParameters.getDatasource());
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),dataSource.getType(),dataSource.getNote(),
dataSource.getUserId(),dataSource.getConnectionParams());
if (dataSource != null){
Connection con = null;
try {
BaseDataSource baseDataSource = null;
if (DbType.MYSQL.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class);
Class.forName(Constants.JDBC_MYSQL_CLASS_NAME);
}else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
}else if (DbType.HIVE.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),HiveDataSource.class);
Class.forName(Constants.JDBC_HIVE_CLASS_NAME);
}else if (DbType.SPARK.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class);
Class.forName(Constants.JDBC_SPARK_CLASS_NAME);
}else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
}else if (DbType.ORACLE.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),OracleDataSource.class);
Class.forName(Constants.JDBC_ORACLE_CLASS_NAME);
}else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){
baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SQLServerDataSource.class);
Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME);
}
return;
}
dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
dataSource.getNote(),
dataSource.getUserId(),
dataSource.getConnectionParams());
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()).orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()).orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
if(EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) && StringUtils.isNotEmpty(sqlParameters.getUdfs())){
List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs());
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
}
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
// execute sql task
con = executeFuncAndSql(baseDataSource, mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
Connection con = null;
List<String> createFuncs = null;
try {
// load class
DataSourceFactory.loadClass(dataSource.getType());
// get datasource
baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
.orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
// determine if it is UDF
boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
&& StringUtils.isNotEmpty(sqlParameters.getUdfs());
if(udfTypeFlag){
List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs());
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
}
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
throw e;
}
}
// execute sql task
con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
throw e;
}
}
}
@ -180,13 +186,13 @@ public class SqlTask extends AbstractTask {
StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
taskProps.getCmdTypeIfComplement(),
taskProps.getScheduleTime());
// spell SQL according to the final user-defined variable
if(paramsMap == null){
@ -195,14 +201,15 @@ public class SqlTask extends AbstractTask {
}
if (StringUtils.isNotEmpty(sqlParameters.getTitle())){
String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), ParamUtils.convert(paramsMap));
logger.info(title);
String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
ParamUtils.convert(paramsMap));
logger.info("SQL tile : {}",title);
sqlParameters.setTitle(title);
}
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap);
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex,"?");
@ -219,47 +226,45 @@ public class SqlTask extends AbstractTask {
}
/**
* execute sql
* @param baseDataSource
* execute sql
* @param mainSqlBinds
* @param preStatementsBinds
* @param postStatementsBinds
* @param createFuncs
* @return
*/
public Connection executeFuncAndSql(BaseDataSource baseDataSource,
SqlBinds mainSqlBinds,
public Connection executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs){
Connection connection = null;
try {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
if (DbType.HIVE.name().equals(sqlParameters.getType())) {
// if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf();
// if hive , load connection params if exists
if (HIVE == dataSource.getType()) {
Properties paramProp = new Properties();
paramProp.setProperty("user", baseDataSource.getUser());
paramProp.setProperty("password", baseDataSource.getPassword());
Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), Constants.SEMICOLON,"hiveconf:");
paramProp.setProperty(USER, baseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
Map<String, String> connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
SEMICOLON,
HIVE_CONF);
if(connParamMap != null){
paramProp.putAll(connParamMap);
}
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),paramProp);
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
paramProp);
}else{
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(), baseDataSource.getPassword());
baseDataSource.getUser(),
baseDataSource.getPassword());
}
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
try (Statement funcStmt = connection.createStatement()) {
try (Statement funcStmt = connection.createStatement()) {
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
funcStmt.execute(createFunc);
@ -270,7 +275,7 @@ public class SqlTask extends AbstractTask {
for (SqlBinds sqlBind: preStatementsBinds) {
try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
int result = stmt.executeUpdate();
logger.info("pre statement execute result: " + result + ", for sql: " + sqlBind.getSql());
logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
}
}
@ -278,7 +283,7 @@ public class SqlTask extends AbstractTask {
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
JSONArray array = new JSONArray();
JSONArray resultJSONArray = new JSONArray();
ResultSet resultSet = stmt.executeQuery();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
@ -288,21 +293,19 @@ public class SqlTask extends AbstractTask {
for (int i = 1; i <= num; i++) {
mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
}
array.add(mapOfColValues);
resultJSONArray.add(mapOfColValues);
}
logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
// send as an attachment
if (StringUtils.isEmpty(sqlParameters.getShowType())) {
logger.info("showType is empty,don't need send email");
} else {
if (array.size() > 0) {
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
}else{
sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
}
logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
// if there is a result set
if (resultJSONArray.size() > 0) {
if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}else{
sendAttachment(taskProps.getNodeName() + " query resultsets ",
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}
}
@ -310,7 +313,7 @@ public class SqlTask extends AbstractTask {
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement
int result = stmt.executeUpdate();
stmt.executeUpdate();
exitStatusCode = 0;
}
}
@ -318,7 +321,7 @@ public class SqlTask extends AbstractTask {
for (SqlBinds sqlBind: postStatementsBinds) {
try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
int result = stmt.executeUpdate();
logger.info("post statement execute result: " + result + ", for sql: " + sqlBind.getSql());
logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
}
}
} catch (Exception e) {
@ -328,9 +331,19 @@ public class SqlTask extends AbstractTask {
return connection;
}
/**
* preparedStatement bind
* @param connection
* @param sqlBinds
* @return
* @throws Exception
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
// is the timeout set
boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED ||
taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
if(timeoutFlag){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map<Integer, Property> params = sqlBinds.getParamsMap();
@ -340,7 +353,7 @@ public class SqlTask extends AbstractTask {
ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue());
}
}
logger.info("prepare statement replace sql:{}",stmt.toString());
logger.info("prepare statement replace sql : {} ",stmt.toString());
return stmt;
}
@ -354,9 +367,6 @@ public class SqlTask extends AbstractTask {
// process instance
ProcessInstance instance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
// process define
ProcessDefinition processDefine = processDao.findProcessDefineById(instance.getProcessDefinitionId());
List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
// receiving group list
@ -367,7 +377,7 @@ public class SqlTask extends AbstractTask {
// custom receiver
String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){
String[] splits = receivers.split(Constants.COMMA);
String[] splits = receivers.split(COMMA);
for (String receiver : splits){
receviersList.add(receiver.trim());
}
@ -378,16 +388,17 @@ public class SqlTask extends AbstractTask {
// Custom Copier
String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){
String[] splits = receiversCc.split(Constants.COMMA);
String[] splits = receiversCc.split(COMMA);
for (String receiverCc : splits){
receviersCcList.add(receiverCc.trim());
}
}
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
Map<String, Object> mailResult = MailUtils.sendMails(receviersList,
receviersCcList, title, content, ShowType.valueOf(showTypeName));
if(!(Boolean) mailResult.get(STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{
@ -425,7 +436,7 @@ public class SqlTask extends AbstractTask {
public void printReplacedSql(String content, String formatSql,String rgex, Map<Integer,Property> sqlParamsMap){
//parameter print style
logger.info("after replace sql , preparing : {}" , formatSql);
StringBuffer logPrint = new StringBuffer("replaced sql , parameters:");
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}

Loading…
Cancel
Save