diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
new file mode 100644
index 0000000000..be3c583f19
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/bug_report.md
@@ -0,0 +1,36 @@
+---
+name: Bug report
+about: Create a report to help us improve
+title: "[BUG] bug title "
+labels: bug
+assignees: ''
+
+---
+
+*For better global communication, please give priority to using English description, thx! *
+
+**Describe the bug**
+A clear and concise description of what the bug is.
+
+**To Reproduce**
+Steps to reproduce the behavior, for example:
+1. Go to '...'
+2. Click on '....'
+3. Scroll down to '....'
+4. See error
+
+**Expected behavior**
+A clear and concise description of what you expected to happen.
+
+**Screenshots**
+If applicable, add screenshots to help explain your problem.
+
+
+**Which version of Easy Scheduler:**
+ -[1.1.0-preview]
+
+**Additional context**
+Add any other context about the problem here.
+
+**Requirement or improvement
+- Please describe about your requirements or improvement suggestions.
diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md
new file mode 100644
index 0000000000..8cd481d442
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/feature_request.md
@@ -0,0 +1,20 @@
+---
+name: Feature request
+about: Suggest an idea for this project
+title: "[Feature]"
+labels: new feature
+assignees: ''
+
+---
+
+**Is your feature request related to a problem? Please describe.**
+A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
+
+**Describe the solution you'd like**
+A clear and concise description of what you want to happen.
+
+**Describe alternatives you've considered**
+A clear and concise description of any alternative solutions or features you've considered.
+
+**Additional context**
+Add any other context or screenshots about the feature request here.
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 6352e7f0eb..be32e77143 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -1,5 +1,70 @@
-EasyScheduler提交代码流程
-=====
+* First from the remote repository *https://github.com/analysys/EasyScheduler.git* fork code to your own repository
+
+* there are three branches in the remote repository currently:
+ * master normal delivery branch
+ After the stable version is released, the code for the stable version branch is merged into the master branch.
+
+ * dev daily development branch
+ The daily development branch, the newly submitted code can pull requests to this branch.
+
+ * branch-1.0.0 release version branch
+ Release version branch, there will be 2.0 ... and other version branches, the version
+ branch only changes the error, does not add new features.
+
+* Clone your own warehouse to your local
+
+ `git clone https://github.com/analysys/EasyScheduler.git`
+
+* Add remote repository address, named upstream
+
+ `git remote add upstream https://github.com/analysys/EasyScheduler.git`
+
+* View repository:
+
+ `git remote -v`
+
+> There will be two repositories at this time: origin (your own warehouse) and upstream (remote repository)
+
+* Get/update remote repository code (already the latest code, skip it)
+
+ `git fetch upstream`
+
+
+* Synchronize remote repository code to local repository
+
+```
+git checkout origin/dev
+git merge --no-ff upstream/dev
+```
+
+If remote branch has a new branch `DEV-1.0`, you need to synchronize this branch to the local repository.
+
+```
+git checkout -b dev-1.0 upstream/dev-1.0
+git push --set-upstream origin dev1.0
+```
+
+* After modifying the code locally, submit it to your own repository:
+
+`git commit -m 'test commit'`
+`git push`
+
+* Submit changes to the remote repository
+
+* On the github page, click on the new pull request.
+
+
+ p>
+
+* Select the modified local branch and the branch to merge past to create a pull request.
+
+
+ p>
+
+* Next, the administrator is responsible for **merging** to complete the pull request
+
+---
+
* 首先从远端仓库*https://github.com/analysys/EasyScheduler.git* fork一份代码到自己的仓库中
* 远端仓库中目前有三个分支:
@@ -14,7 +79,7 @@ EasyScheduler提交代码流程
* 把自己仓库clone到本地
- `git clone https://github.com/**/EasyScheduler.git`
+ `git clone https://github.com/analysys/EasyScheduler.git`
* 添加远端仓库地址,命名为upstream
@@ -26,17 +91,10 @@ EasyScheduler提交代码流程
> 此时会有两个仓库:origin(自己的仓库)和upstream(远端仓库)
-* 获取远端仓库代码(已经是最新代码,就跳过)
+* 获取/更新远端仓库代码(已经是最新代码,就跳过)
`git fetch upstream `
-* 更新远端仓库代码
-
-```
-git checkout upstream/dev
-
-git pull upstream dev
-```
* 同步远端仓库代码到本地仓库
@@ -54,7 +112,7 @@ git push --set-upstream origin dev1.0
* 在本地修改代码以后,提交到自己仓库:
- `git ca -m 'test commit'`
+ `git commit -m 'test commit'`
`git push`
* 将修改提交到远端仓库
@@ -68,6 +126,15 @@ git push --set-upstream origin dev1.0
- * 接下来由管理员负责将**Merge**完成此次pull request
+
+* 接下来由管理员负责将**Merge**完成此次pull request
+
+
+
+
+
+
+
+
diff --git a/README.md b/README.md
index b36606966c..182ece4c38 100644
--- a/README.md
+++ b/README.md
@@ -81,7 +81,7 @@ Work plan of Easy Scheduler: [R&D plan](https://github.com/analysys/EasySchedule
### How to contribute code
Welcome to participate in contributing code, please refer to the process of submitting the code:
-https://github.com/analysys/EasyScheduler/blob/master/CONTRIBUTING.md
+[[How to contribute code](https://github.com/analysys/EasyScheduler/issues/310)]
### Thanks
diff --git a/README_zh_CN.md b/README_zh_CN.md
index fa94152f4b..7c8b7d53fd 100644
--- a/README_zh_CN.md
+++ b/README_zh_CN.md
@@ -57,7 +57,7 @@ EasyScheduler的工作计划: {
- private static final Logger logger = LoggerFactory.getLogger(TaskLogAppender.class);
-
private String currentlyActiveFile;
@Override
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
index 8f23895547..2208cf5204 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
@@ -20,6 +20,7 @@ import cn.escheduler.common.Constants;
import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.thread.Stopper;
import cn.escheduler.common.thread.ThreadUtils;
+import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
@@ -28,7 +29,6 @@ import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,8 +75,20 @@ public class FetchTaskThread implements Runnable{
*/
private int workerExecNums;
+ /**
+ * conf
+ */
private Configuration conf;
+ /**
+ * task instance
+ */
+ private TaskInstance taskInstance;
+
+ /**
+ * task instance id
+ */
+ Integer taskInstId;
public FetchTaskThread(int taskNum, ZKWorkerClient zkWorkerClient,
ProcessDao processDao, Configuration conf,
@@ -125,116 +137,95 @@ public class FetchTaskThread implements Runnable{
@Override
public void run() {
-
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
-
//check memory and cpu usage and threads
- if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
-
- //whether have tasks, if no tasks , no need lock //get all tasks
- List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
- if(tasksQueueList.size() > 0){
- // creating distributed locks, lock path /escheduler/lock/worker
- String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
- mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
- mutex.acquire();
+ boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);
- // task instance id str
- List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
-
- for(String taskQueueStr : taskQueueStrArr){
- if (StringUtils.isNotBlank(taskQueueStr )) {
-
- if (!checkThreadCount(poolExecutor)) {
- break;
- }
-
- String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
- String taskInstIdStr = taskStringArray[3];
- Date now = new Date();
- Integer taskId = Integer.parseInt(taskInstIdStr);
-
- // find task instance by task id
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
-
- logger.info("worker fetch taskId : {} from queue ", taskId);
-
- int retryTimes = 30;
- // mainly to wait for the master insert task to succeed
- while (taskInstance == null && retryTimes > 0) {
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- taskInstance = processDao.findTaskInstanceById(taskId);
- retryTimes--;
- }
-
- if (taskInstance == null ) {
- logger.error("task instance is null. task id : {} ", taskId);
- continue;
- }
-
- if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
- continue;
- }
- taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
- logger.info("remove task:{} from queue", taskQueueStr);
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- // set execute task worker host
- taskInstance.setHost(OSUtils.getHost());
- taskInstance.setStartTime(now);
+ if(!runCheckFlag) {
+ continue;
+ }
+ //whether have tasks, if no tasks , no need lock //get all tasks
+ List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
+ if (CollectionUtils.isEmpty(tasksQueueList)){
+ continue;
+ }
+ // creating distributed locks, lock path /escheduler/lock/worker
+ mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
+ zkWorkerClient.getWorkerLockPath());
- // get process instance
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- // get process define
- ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
+ // task instance id str
+ List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
+ for(String taskQueueStr : taskQueueStrArr){
+ if (StringUtils.isEmpty(taskQueueStr)) {
+ continue;
+ }
- taskInstance.setProcessInstance(processInstance);
- taskInstance.setProcessDefine(processDefine);
+ if (!checkThreadCount(poolExecutor)) {
+ break;
+ }
+ // get task instance id
+ taskInstId = Integer.parseInt(taskQueueStr.split(Constants.UNDERLINE)[3]);
+
+ // get task instance relation
+ taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId);
+
+ Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
+ taskInstance.getProcessDefine().getUserId());
+ if(tenant == null){
+ logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}",
+ taskInstance.getProcessDefine().getId(),
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
+ continue;
+ }
- // get local execute path
- String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
- processDefine.getId(),
- processInstance.getId(),
- taskInstance.getId());
- logger.info("task instance local execute path : {} ", execLocalPath);
+ logger.info("worker fetch taskId : {} from queue ", taskInstId);
+ // mainly to wait for the master insert task to succeed
+ waitForMasterEnterQueue();
- // set task execute path
- taskInstance.setExecutePath(execLocalPath);
+ if (taskInstance == null ) {
+ logger.error("task instance is null. task id : {} ", taskInstId);
+ taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
+ continue;
+ }
- Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
- processDefine.getUserId());
- if(tenant == null){
- logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}",
- taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId());
- continue;
- }
+ if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
+ continue;
+ }
- // check and create Linux users
- FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
- tenant.getTenantCode(), logger);
+ // get local execute path
+ logger.info("task instance local execute path : {} ", getExecLocalPath());
- logger.info("task : {} ready to submit to task scheduler thread",taskId);
- // submit task
- workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
+ // init task
+ taskInstance.init(OSUtils.getHost(),
+ new Date(),
+ getExecLocalPath());
- }
- }
+ // check and create Linux users
+ FileUtils.createWorkDirAndUserIfAbsent(getExecLocalPath(),
+ tenant.getTenantCode(), logger);
- }
+ logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
+ // submit task
+ workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
+ // remove node from zk
+ taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
}
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
-
}catch (Exception e){
- logger.error("fetch task thread exception : " + e.getMessage(),e);
+ logger.error("fetch task thread failure" ,e);
}finally {
AbstractZKClient.releaseMutex(mutex);
}
@@ -242,16 +233,45 @@ public class FetchTaskThread implements Runnable{
}
/**
- *
+ * get execute local path
+ * @return
+ */
+ private String getExecLocalPath(){
+ return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
+ taskInstance.getProcessDefine().getId(),
+ taskInstance.getProcessInstance().getId(),
+ taskInstance.getId());
+ }
+
+ /**
+ * check
* @param poolExecutor
* @return
*/
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
- logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
+ logger.info("thread insufficient , activeCount : {} , " +
+ "workerExecNums : {}, will sleep : {} millis for thread resource",
+ activeCount,
+ workerExecNums,
+ Constants.SLEEP_TIME_MILLIS);
return false;
}
return true;
}
+
+ /**
+ * mainly to wait for the master insert task to succeed
+ * @throws Exception
+ */
+ private void waitForMasterEnterQueue()throws Exception{
+ int retryTimes = 30;
+
+ while (taskInstance == null && retryTimes > 0) {
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ taskInstance = processDao.findTaskInstanceById(taskInstId);
+ retryTimes--;
+ }
+ }
}
\ No newline at end of file
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
index f1b4265e30..6265097930 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
@@ -59,13 +59,16 @@ import java.util.stream.Collectors;
/**
* task scheduler thread
*/
-public class TaskScheduleThread implements Callable {
+public class TaskScheduleThread implements Runnable {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class);
+ /**
+ * task prefix
+ */
private static final String TASK_PREFIX = "TASK";
/**
@@ -79,7 +82,7 @@ public class TaskScheduleThread implements Callable {
private final ProcessDao processDao;
/**
- * execute task info
+ * abstract task
*/
private AbstractTask task;
@@ -89,115 +92,55 @@ public class TaskScheduleThread implements Callable {
}
@Override
- public Boolean call() throws Exception {
+ public void run() {
- // get task type
- String taskType = taskInstance.getTaskType();
- // set task state
- taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
-
- // update task state
- if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){
- processDao.changeTaskState(taskInstance.getState(),
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- null,
- System.getProperty("user.dir") + "/logs/" +
- taskInstance.getProcessDefinitionId() +"/" +
- taskInstance.getProcessInstanceId() +"/" +
- taskInstance.getId() + ".log",
- taskInstance.getId());
- }else{
- processDao.changeTaskState(taskInstance.getState(),
- taskInstance.getStartTime(),
- taskInstance.getHost(),
- taskInstance.getExecutePath(),
- System.getProperty("user.dir") + "/logs/" +
- taskInstance.getProcessDefinitionId() +"/" +
- taskInstance.getProcessInstanceId() +"/" +
- taskInstance.getId() + ".log",
- taskInstance.getId());
- }
-
- ExecutionStatus status = ExecutionStatus.SUCCESS;
+ // update task state is running according to task type
+ updateTaskState(taskInstance.getTaskType());
try {
+ logger.info("script path : {}", taskInstance.getExecutePath());
+ // task node
+ TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
-
- // custom param str
- String customParamStr = taskInstance.getProcessInstance().getGlobalParams();
-
-
- Map allParamMap = new HashMap<>();
-
-
- if (customParamStr != null) {
- List customParamMap = JSONObject.parseArray(customParamStr, Property.class);
-
- Map userDefinedParamMap = customParamMap.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
-
- allParamMap.putAll(userDefinedParamMap);
- }
-
- logger.info("script path : {}",taskInstance.getExecutePath());
-
- TaskProps taskProps = new TaskProps();
-
- taskProps.setTaskDir(taskInstance.getExecutePath());
-
- String taskJson = taskInstance.getTaskJson();
-
-
- TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
-
-
- List projectRes = createProjectResFiles(taskNode);
-
- // copy hdfs file to local
+ // copy hdfs/minio file to local
copyHdfsToLocal(processDao,
taskInstance.getExecutePath(),
- projectRes,
+ createProjectResFiles(taskNode),
logger);
- // set task params
- taskProps.setTaskParams(taskNode.getParams());
- // set tenant code , execute task linux user
-
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
-
- taskProps.setScheduleTime(processInstance.getScheduleTime());
- taskProps.setNodeName(taskInstance.getName());
- taskProps.setTaskInstId(taskInstance.getId());
- taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
-
- ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
+ // get process instance according to tak instance
+ ProcessInstance processInstance = taskInstance.getProcessInstance();
+ // get process define according to tak instance
+ ProcessDefinition processDefine = taskInstance.getProcessDefine();
+ // get tenant info
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
- processDefine.getUserId());
+ processDefine.getUserId());
if(tenant == null){
- processInstance.setTenantCode(tenant.getTenantCode());
- logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}",
- processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId()
- );
- status = ExecutionStatus.FAILURE;
+ logger.error("cannot find the tenant, process definition id:{}, user id:{}",
+ processDefine.getId(),
+ processDefine.getUserId());
+ task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}else{
- taskProps.setTenantCode(tenant.getTenantCode());
- String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
- // set queue
- if (StringUtils.isEmpty(queue)){
- taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
- }else {
- taskProps.setQueue(tenant.getQueueName());
- }
- taskProps.setTaskStartTime(taskInstance.getStartTime());
- taskProps.setDefinedParams(allParamMap);
+ // set task props
+ TaskProps taskProps = new TaskProps(taskNode.getParams(),
+ taskInstance.getExecutePath(),
+ processInstance.getScheduleTime(),
+ taskInstance.getName(),
+ taskInstance.getTaskType(),
+ taskInstance.getId(),
+ CommonUtils.getSystemEnvPath(),
+ tenant.getTenantCode(),
+ tenant.getQueueName(),
+ taskInstance.getStartTime(),
+ getGlobalParamsMap(),
+ taskInstance.getDependency(),
+ processInstance.getCmdTypeIfComplement());
// set task timeout
setTaskTimeout(taskProps, taskNode);
- taskProps.setDependence(taskInstance.getDependency());
-
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
@@ -209,72 +152,98 @@ public class TaskScheduleThread implements Callable {
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
- task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
+ task = TaskManager.newTask(taskInstance.getTaskType(),
+ taskProps,
+ taskLogger);
- // job init
+ // task init
task.init();
- // job handle
+ // task handle
task.handle();
- logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
-
- if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
- status = ExecutionStatus.SUCCESS;
- // task recor flat : if true , start up qianfan
- if (TaskRecordDao.getTaskRecordFlag()
- && TaskType.typeIsNormalTask(taskInstance.getTaskType())){
-
- AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
-
- // replace placeholder
- Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
- taskProps.getDefinedParams(),
- params.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
- if (paramsMap != null && !paramsMap.isEmpty()
- && paramsMap.containsKey("v_proc_date")){
- String vProcDate = paramsMap.get("v_proc_date").getValue();
- if (!StringUtils.isEmpty(vProcDate)){
- TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
- logger.info("task record status : {}",taskRecordState);
- if (taskRecordState == TaskRecordStatus.FAILURE){
- status = ExecutionStatus.FAILURE;
- }
- }
- }
- }
- }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
- status = ExecutionStatus.KILL;
- }else {
- status = ExecutionStatus.FAILURE;
- }
+ // task result process
+ task.after();
}
}catch (Exception e){
- logger.error("task escheduler failure : ", e);
- status = ExecutionStatus.FAILURE ;
- logger.error(String.format("task process exception, process id : %s , task : %s",
- taskInstance.getProcessInstanceId(),
- taskInstance.getName()),e);
+ logger.error("task scheduler failure", e);
+ task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
kill();
}
+
+ logger.info("task instance id : {},task final status : {}",
+ taskInstance.getId(),
+ task.getExitStatus());
// update task instance state
- processDao.changeTaskState(status,
+ processDao.changeTaskState(task.getExitStatus(),
new Date(),
taskInstance.getId());
- return task.getExitStatusCode() > Constants.EXIT_CODE_SUCCESS;
}
/**
- * set task time out
+ * get global paras map
+ * @return
+ */
+ private Map getGlobalParamsMap() {
+ Map globalParamsMap = new HashMap<>(16);
+
+ // global params string
+ String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams();
+
+ if (globalParamsStr != null) {
+ List globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class);
+ globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)));
+ }
+ return globalParamsMap;
+ }
+
+ /**
+ * update task state according to task type
+ * @param taskType
+ */
+ private void updateTaskState(String taskType) {
+ // update task status is running
+ if(taskType.equals(TaskType.SQL.name()) ||
+ taskType.equals(TaskType.PROCEDURE.name())){
+ processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
+ taskInstance.getStartTime(),
+ taskInstance.getHost(),
+ null,
+ getTaskLogPath(),
+ taskInstance.getId());
+ }else{
+ processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION,
+ taskInstance.getStartTime(),
+ taskInstance.getHost(),
+ taskInstance.getExecutePath(),
+ getTaskLogPath(),
+ taskInstance.getId());
+ }
+ }
+
+ /**
+ * get task log path
+ * @return
+ */
+ private String getTaskLogPath() {
+ return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
+ "logs" + Constants.SINGLE_SLASH +
+ taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
+ taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
+ taskInstance.getId() + ".log";
+ }
+
+ /**
+ * set task timeout
* @param taskProps
* @param taskNode
*/
private void setTaskTimeout(TaskProps taskProps, TaskNode taskNode) {
+ // the default timeout is the maximum value of the integer
taskProps.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if (taskTimeoutParameter.getEnable()){
+ // get timeout strategy
taskProps.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy());
switch (taskTimeoutParameter.getStrategy()){
case WARN:
@@ -298,38 +267,7 @@ public class TaskScheduleThread implements Callable {
}
- /**
- * get current task parameter class
- * @return
- */
- private Class getCurTaskParamsClass(){
- Class paramsClass = null;
- TaskType taskType = TaskType.valueOf(taskInstance.getTaskType());
- switch (taskType){
- case SHELL:
- paramsClass = ShellParameters.class;
- break;
- case SQL:
- paramsClass = SqlParameters.class;
- break;
- case PROCEDURE:
- paramsClass = ProcedureParameters.class;
- break;
- case MR:
- paramsClass = MapreduceParameters.class;
- break;
- case SPARK:
- paramsClass = SparkParameters.class;
- break;
- case PYTHON:
- paramsClass = PythonParameters.class;
- break;
- default:
- logger.error("not support this task type: {}", taskType);
- throw new IllegalArgumentException("not support this task type");
- }
- return paramsClass;
- }
+
/**
* kill task
@@ -376,9 +314,7 @@ public class TaskScheduleThread implements Callable {
File resFile = new File(execLocalPath, res);
if (!resFile.exists()) {
try {
- /**
- * query the tenant code of the resource according to the name of the resource
- */
+ // query the tenant code of the resource according to the name of the resource
String tentnCode = processDao.queryTenantCodeByResName(res);
String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode,res);
@@ -388,7 +324,6 @@ public class TaskScheduleThread implements Callable {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
}
-
} else {
logger.info("file : {} exists ", resFile.getName());
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
index 82c9be9f0e..e0aef326fd 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
@@ -67,6 +67,11 @@ public abstract class AbstractCommandExecutor {
*/
protected final String taskAppId;
+ /**
+ * task appId
+ */
+ protected final int taskInstId;
+
/**
* tenant code , execute task linux user
*/
@@ -99,11 +104,12 @@ public abstract class AbstractCommandExecutor {
public AbstractCommandExecutor(Consumer> logHandler,
- String taskDir, String taskAppId, String tenantCode, String envFile,
+ String taskDir, String taskAppId,int taskInstId,String tenantCode, String envFile,
Date startTime, int timeout, Logger logger){
this.logHandler = logHandler;
this.taskDir = taskDir;
this.taskAppId = taskAppId;
+ this.taskInstId = taskInstId;
this.tenantCode = tenantCode;
this.envFile = envFile;
this.startTime = startTime;
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
index 645a314a4c..213f4fd3f9 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java
@@ -16,10 +16,26 @@
*/
package cn.escheduler.server.worker.task;
+import cn.escheduler.common.Constants;
+import cn.escheduler.common.enums.ExecutionStatus;
+import cn.escheduler.common.enums.TaskRecordStatus;
+import cn.escheduler.common.enums.TaskType;
+import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
+import cn.escheduler.common.task.mr.MapreduceParameters;
+import cn.escheduler.common.task.procedure.ProcedureParameters;
+import cn.escheduler.common.task.python.PythonParameters;
+import cn.escheduler.common.task.shell.ShellParameters;
+import cn.escheduler.common.task.spark.SparkParameters;
+import cn.escheduler.common.task.sql.SqlParameters;
+import cn.escheduler.common.utils.JSONUtils;
+import cn.escheduler.dao.TaskRecordDao;
+import cn.escheduler.server.utils.ParamUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import java.util.List;
+import java.util.Map;
/**
* executive task
@@ -70,7 +86,7 @@ public abstract class AbstractTask {
public void cancelApplication(boolean status) throws Exception {
- cancel = true;
+ this.cancel = status;
}
/**
@@ -89,6 +105,9 @@ public abstract class AbstractTask {
return exitStatusCode;
}
+ public void setExitStatusCode(int exitStatusCode) {
+ this.exitStatusCode = exitStatusCode;
+ }
/**
* get task parameters
@@ -96,4 +115,96 @@ public abstract class AbstractTask {
public abstract AbstractParameters getParameters();
+ /**
+ * result processing
+ */
+ public void after(){
+ if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
+ // task recor flat : if true , start up qianfan
+ if (TaskRecordDao.getTaskRecordFlag()
+ && TaskType.typeIsNormalTask(taskProps.getTaskType())){
+ AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
+
+ // replace placeholder
+ Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
+ taskProps.getDefinedParams(),
+ params.getLocalParametersMap(),
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
+ if (paramsMap != null && !paramsMap.isEmpty()
+ && paramsMap.containsKey("v_proc_date")){
+ String vProcDate = paramsMap.get("v_proc_date").getValue();
+ if (!StringUtils.isEmpty(vProcDate)){
+ TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate);
+ logger.info("task record status : {}",taskRecordState);
+ if (taskRecordState == TaskRecordStatus.FAILURE){
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+ }
+ }
+ }
+ }
+
+ }else if (getExitStatusCode() == Constants.EXIT_CODE_KILL){
+ setExitStatusCode(Constants.EXIT_CODE_KILL);
+ }else {
+ setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+ }
+ }
+
+
+
+
+ /**
+ * get current task parameter class
+ * @return
+ */
+ private Class getCurTaskParamsClass(){
+ Class paramsClass = null;
+ // get task type
+ TaskType taskType = TaskType.valueOf(taskProps.getTaskType());
+ switch (taskType){
+ case SHELL:
+ paramsClass = ShellParameters.class;
+ break;
+ case SQL:
+ paramsClass = SqlParameters.class;
+ break;
+ case PROCEDURE:
+ paramsClass = ProcedureParameters.class;
+ break;
+ case MR:
+ paramsClass = MapreduceParameters.class;
+ break;
+ case SPARK:
+ paramsClass = SparkParameters.class;
+ break;
+ case PYTHON:
+ paramsClass = PythonParameters.class;
+ break;
+ default:
+ logger.error("not support this task type: {}", taskType);
+ throw new IllegalArgumentException("not support this task type");
+ }
+ return paramsClass;
+ }
+
+ /**
+ * get exit status according to exitCode
+ * @return
+ */
+ public ExecutionStatus getExitStatus(){
+ ExecutionStatus status;
+ switch (getExitStatusCode()){
+ case Constants.EXIT_CODE_SUCCESS:
+ status = ExecutionStatus.SUCCESS;
+ break;
+ case Constants.EXIT_CODE_KILL:
+ status = ExecutionStatus.KILL;
+ break;
+ default:
+ status = ExecutionStatus.FAILURE;
+ break;
+ }
+ return status;
+ }
}
\ No newline at end of file
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
index 7559864bab..a981307c48 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java
@@ -38,7 +38,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
/**
* process task
*/
- private ShellCommandExecutor processTask;
+ private ShellCommandExecutor shellCommandExecutor;
/**
* process database access
@@ -53,21 +53,25 @@ public abstract class AbstractYarnTask extends AbstractTask {
public AbstractYarnTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
- // find process instance by taskId
this.processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
- this.processTask = new ShellCommandExecutor(this::logHandle,
- taskProps.getTaskDir(), taskProps.getTaskAppId(),
- taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(),
- taskProps.getTaskTimeout(), logger);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
+ taskProps.getTaskDir(),
+ taskProps.getTaskAppId(),
+ taskProps.getTaskInstId(),
+ taskProps.getTenantCode(),
+ taskProps.getEnvFile(),
+ taskProps.getTaskStartTime(),
+ taskProps.getTaskTimeout(),
+ logger);
}
@Override
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = processTask.run(buildCommand(), processDao);
+ exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
- logger.error("yarn process failed : " + e.getMessage(), e);
+ logger.error("yarn process failure", e);
exitStatusCode = -1;
}
}
@@ -76,9 +80,8 @@ public abstract class AbstractYarnTask extends AbstractTask {
public void cancelApplication(boolean status) throws Exception {
cancel = true;
// cancel process
- processTask.cancelApplication();
- int taskInstId = taskProps.getTaskInstId();
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
+ shellCommandExecutor.cancelApplication();
+ TaskInstance taskInstance = processDao.findTaskInstanceById(taskProps.getTaskInstId());
if (status && taskInstance != null){
ProcessUtils.killYarnJob(taskInstance);
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
index e1df0b71a8..09a6add48f 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
@@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.FileUtils;
-import cn.escheduler.common.utils.PropertyUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +42,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
public PythonCommandExecutor(Consumer> logHandler,
- String taskDir, String taskAppId, String tenantCode, String envFile,
- Date startTime, int timeout, Logger logger) {
- super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger);
+ String taskDir,
+ String taskAppId,
+ int taskInstId,
+ String tenantCode,
+ String envFile,
+ Date startTime,
+ int timeout,
+ Logger logger) {
+ super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
}
@@ -67,7 +72,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
- logger.info("tenant :{}, work dir:{}", tenantCode, taskDir);
+ logger.info("tenantCode :{}, task dir:{}", tenantCode, taskDir);
if (!Files.exists(Paths.get(commandFile))) {
logger.info("generate command file:{}", commandFile);
@@ -80,16 +85,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
logger.info(sb.toString());
// write data to file
- FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
+ FileUtils.writeStringToFile(new File(commandFile),
+ sb.toString(),
+ StandardCharsets.UTF_8);
}
}
@Override
protected String commandType() {
-
- String envPath = PropertyUtils.getString(Constants.ESCHEDULER_ENV_PATH);
-
- String pythonHome = getPythonHome(envPath);
+ String pythonHome = getPythonHome(envFile);
if (StringUtils.isEmpty(pythonHome)){
return PYTHON;
}
@@ -108,16 +112,25 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
/**
- * get python home
+ * get the absolute path of the Python command
+ * note :
+ * common.properties
+ * PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself
+ *
+ * for example :
+ * your PYTHON_HOM is /opt/python3.7/
+ * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties
+ * escheduler.env.path file.
+ *
* @param envPath
* @return
*/
private static String getPythonHome(String envPath){
BufferedReader br = null;
- String line = null;
StringBuilder sb = new StringBuilder();
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath)));
+ String line;
while ((line = br.readLine()) != null){
if (line.contains(Constants.PYTHON_HOME)){
sb.append(line);
@@ -128,13 +141,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
if (org.apache.commons.lang.StringUtils.isEmpty(result)){
return null;
}
- String[] arrs = result.split("=");
+ String[] arrs = result.split(Constants.EQUAL_SIGN);
if (arrs.length == 2){
return arrs[1];
}
}catch (IOException e){
- logger.error("read file failed : " + e.getMessage(),e);
+ logger.error("read file failure",e);
}finally {
try {
if (br != null){
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
index b5e803ae80..68e36b704a 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
@@ -29,9 +29,7 @@ import java.util.List;
import java.util.function.Consumer;
/**
- * command executor
- *
- * 进程,真正在worker服务器上执行的任务
+ * shell command executor
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {
@@ -39,9 +37,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
public ShellCommandExecutor(Consumer> logHandler,
- String taskDir, String taskAppId, String tenantCode, String envFile,
- Date startTime, int timeout, Logger logger) {
- super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger);
+ String taskDir,
+ String taskAppId,
+ int taskInstId,
+ String tenantCode,
+ String envFile,
+ Date startTime,
+ int timeout,
+ Logger logger) {
+ super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger);
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
index 053b5bed24..0db35e8d0a 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java
@@ -16,6 +16,7 @@
*/
package cn.escheduler.server.worker.task;
+import cn.escheduler.common.enums.CommandType;
import cn.escheduler.common.enums.DataType;
import cn.escheduler.common.enums.Direct;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
@@ -46,6 +47,8 @@ public class TaskProps {
**/
private String tenantCode;
+ private String taskType;
+
/**
* task parameters
**/
@@ -101,6 +104,41 @@ public class TaskProps {
*/
private Date scheduleTime;
+ /**
+ * command type is complement
+ */
+ private CommandType cmdTypeIfComplement;
+
+
+ public TaskProps(){}
+ public TaskProps(String taskParams,
+ String taskDir,
+ Date scheduleTime,
+ String nodeName,
+ String taskType,
+ int taskInstId,
+ String envFile,
+ String tenantCode,
+ String queue,
+ Date taskStartTime,
+ Map definedParams,
+ String dependence,
+ CommandType cmdTypeIfComplement){
+ this.taskParams = taskParams;
+ this.taskDir = taskDir;
+ this.scheduleTime = scheduleTime;
+ this.nodeName = nodeName;
+ this.taskType = taskType;
+ this.taskInstId = taskInstId;
+ this.envFile = envFile;
+ this.tenantCode = tenantCode;
+ this.queue = queue;
+ this.taskStartTime = taskStartTime;
+ this.definedParams = definedParams;
+ this.dependence = dependence;
+ this.cmdTypeIfComplement = cmdTypeIfComplement;
+
+ }
public String getTenantCode() {
return tenantCode;
@@ -200,22 +238,12 @@ public class TaskProps {
this.taskTimeoutStrategy = taskTimeoutStrategy;
}
- /**
- * get parameters map
- * @return
- */
- public Map getUserDefParamsMap() {
- if (definedParams != null) {
- Map userDefParamsMaps = new HashMap<>();
- Iterator> iter = definedParams.entrySet().iterator();
- while (iter.hasNext()){
- Map.Entry en = iter.next();
- Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
- userDefParamsMaps.put(property.getProp(),property);
- }
- return userDefParamsMaps;
- }
- return null;
+ public String getTaskType() {
+ return taskType;
+ }
+
+ public void setTaskType(String taskType) {
+ this.taskType = taskType;
}
public String getDependence() {
@@ -233,4 +261,30 @@ public class TaskProps {
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
+
+ public CommandType getCmdTypeIfComplement() {
+ return cmdTypeIfComplement;
+ }
+
+ public void setCmdTypeIfComplement(CommandType cmdTypeIfComplement) {
+ this.cmdTypeIfComplement = cmdTypeIfComplement;
+ }
+
+ /**
+ * get parameters map
+ * @return
+ */
+ public Map getUserDefParamsMap() {
+ if (definedParams != null) {
+ Map userDefParamsMaps = new HashMap<>();
+ Iterator> iter = definedParams.entrySet().iterator();
+ while (iter.hasNext()){
+ Map.Entry en = iter.next();
+ Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue());
+ userDefParamsMaps.put(property.getProp(),property);
+ }
+ return userDefParamsMaps;
+ }
+ return null;
+ }
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
index 1e46e3e38d..7d115add8c 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java
@@ -208,6 +208,4 @@ public class DependentExecute {
return dependResultMap;
}
-
-
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
index 21e596f55f..13f00f9264 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java
@@ -104,7 +104,7 @@ public class DependentTask extends AbstractTask {
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
}
}catch (Exception e){
- logger.error("Exception " + e);
+ logger.error(e.getMessage(),e);
exitStatusCode = -1;
}
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
index 971ecd0b6d..9650e55e26 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java
@@ -70,8 +70,8 @@ public class MapReduceTask extends AbstractYarnTask {
Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
mapreduceParameters.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
if (paramsMap != null){
String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap));
mapreduceParameters.setMainArgs(args);
@@ -86,7 +86,8 @@ public class MapReduceTask extends AbstractYarnTask {
protected String buildCommand() throws Exception {
List parameterList = buildParameters(mapreduceParameters);
- String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList), taskProps.getDefinedParams());
+ String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList),
+ taskProps.getDefinedParams());
logger.info("mapreduce task command: {}", command);
return command;
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
index 98428c5389..c4803d91c0 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java
@@ -21,12 +21,7 @@ import cn.escheduler.common.enums.DataType;
import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.Direct;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
-import cn.escheduler.common.job.db.BaseDataSource;
-import cn.escheduler.common.job.db.ClickHouseDataSource;
-import cn.escheduler.common.job.db.MySQLDataSource;
-import cn.escheduler.common.job.db.OracleDataSource;
-import cn.escheduler.common.job.db.PostgreDataSource;
-import cn.escheduler.common.job.db.SQLServerDataSource;
+import cn.escheduler.common.job.db.*;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.procedure.ProcedureParameters;
@@ -49,6 +44,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import static cn.escheduler.common.enums.DataType.*;
+
/**
* procedure task
*/
@@ -64,6 +61,11 @@ public class ProcedureTask extends AbstractTask {
*/
private ProcessDao processDao;
+ /**
+ * base datasource
+ */
+ private BaseDataSource baseDataSource;
+
public ProcedureTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger);
@@ -93,176 +95,181 @@ public class ProcedureTask extends AbstractTask {
// determine whether there is a data source
if (procedureParameters.getDatasource() == 0){
- logger.error("datasource is null");
- exitStatusCode = 0;
- }else {
+ logger.error("datasource id not exists");
+ exitStatusCode = -1;
+ return;
+ }
- DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
- logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
- dataSource.getName(),dataSource.getType(),dataSource.getNote(),
- dataSource.getUserId(),dataSource.getConnectionParams());
+ DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource());
+ logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
+ dataSource.getName(),
+ dataSource.getType(),
+ dataSource.getNote(),
+ dataSource.getUserId(),
+ dataSource.getConnectionParams());
+
+ if (dataSource == null){
+ logger.error("datasource not exists");
+ exitStatusCode = -1;
+ return;
+ }
+ Connection connection = null;
+ CallableStatement stmt = null;
+ try {
+ // load class
+ DataSourceFactory.loadClass(dataSource.getType());
+ // get datasource
+ baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
+ dataSource.getConnectionParams());
- if (dataSource != null){
- Connection connection = null;
- CallableStatement stmt = null;
- try {
- BaseDataSource baseDataSource = null;
-
- if (DbType.MYSQL.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class);
- Class.forName(Constants.JDBC_MYSQL_CLASS_NAME);
- }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
- Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
- }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
- // NOTE: currently, ClickHouse don't support procedure or UDF yet,
- // but still load JDBC driver to keep source code sync with other DB
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
- Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
- }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), OracleDataSource.class);
- Class.forName(Constants.JDBC_ORACLE_CLASS_NAME);
- }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), SQLServerDataSource.class);
- Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME);
- }
+ // get jdbc connection
+ connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+ baseDataSource.getUser(),
+ baseDataSource.getPassword());
- // get jdbc connection
- connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
- baseDataSource.getUser(),
- baseDataSource.getPassword());
- // get process instance by task instance id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
- // combining local and global parameters
- Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
- taskProps.getDefinedParams(),
- procedureParameters.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
+ // combining local and global parameters
+ Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
+ taskProps.getDefinedParams(),
+ procedureParameters.getLocalParametersMap(),
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
- Collection userDefParamsList = null;
+ Collection userDefParamsList = null;
- if (procedureParameters.getLocalParametersMap() != null){
- userDefParamsList = procedureParameters.getLocalParametersMap().values();
- }
+ if (procedureParameters.getLocalParametersMap() != null){
+ userDefParamsList = procedureParameters.getLocalParametersMap().values();
+ }
- String method = "";
- // no parameters
- if (CollectionUtils.isEmpty(userDefParamsList)){
- method = "{call " + procedureParameters.getMethod() + "}";
- }else { // exists parameters
- int size = userDefParamsList.size();
- StringBuilder parameter = new StringBuilder();
- parameter.append("(");
- for (int i = 0 ;i < size - 1; i++){
- parameter.append("?,");
- }
- parameter.append("?)");
- method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
- }
+ String method = "";
+ // no parameters
+ if (CollectionUtils.isEmpty(userDefParamsList)){
+ method = "{call " + procedureParameters.getMethod() + "}";
+ }else { // exists parameters
+ int size = userDefParamsList.size();
+ StringBuilder parameter = new StringBuilder();
+ parameter.append("(");
+ for (int i = 0 ;i < size - 1; i++){
+ parameter.append("?,");
+ }
+ parameter.append("?)");
+ method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
+ }
- logger.info("call method : {}",method);
- // call method
- stmt = connection.prepareCall(method);
- if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
- stmt.setQueryTimeout(taskProps.getTaskTimeout());
- }
- Map outParameterMap = new HashMap<>();
- if (userDefParamsList != null && userDefParamsList.size() > 0){
- int index = 1;
- for (Property property : userDefParamsList){
- logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
- ,property.getProp(),
- property.getDirect(),
- property.getType(),
- property.getValue());
- // set parameters
- if (property.getDirect().equals(Direct.IN)){
- ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
- }else if (property.getDirect().equals(Direct.OUT)){
- setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
- property.setValue(paramsMap.get(property.getProp()).getValue());
- outParameterMap.put(index,property);
- }
- index++;
- }
+ logger.info("call method : {}",method);
+ // call method
+ stmt = connection.prepareCall(method);
+ if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
+ stmt.setQueryTimeout(taskProps.getTaskTimeout());
+ }
+ Map outParameterMap = new HashMap<>();
+ if (userDefParamsList != null && userDefParamsList.size() > 0){
+ int index = 1;
+ for (Property property : userDefParamsList){
+ logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
+ ,property.getProp(),
+ property.getDirect(),
+ property.getType(),
+ property.getValue());
+ // set parameters
+ if (property.getDirect().equals(Direct.IN)){
+ ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
+ }else if (property.getDirect().equals(Direct.OUT)){
+ setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
+ property.setValue(paramsMap.get(property.getProp()).getValue());
+ outParameterMap.put(index,property);
}
+ index++;
+ }
+ }
- stmt.executeUpdate();
-
- /**
- * print the output parameters to the log
- */
- Iterator> iter = outParameterMap.entrySet().iterator();
- while (iter.hasNext()){
- Map.Entry en = iter.next();
-
- int index = en.getKey();
- Property property = en.getValue();
- String prop = property.getProp();
- DataType dataType = property.getType();
-
- if (dataType.equals(DataType.VARCHAR)){
- String value = stmt.getString(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.INTEGER)){
- int value = stmt.getInt(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.LONG)){
- long value = stmt.getLong(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.FLOAT)){
- float value = stmt.getFloat(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.DOUBLE)){
- double value = stmt.getDouble(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.DATE)){
- Date value = stmt.getDate(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.TIME)){
- Time value = stmt.getTime(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.TIMESTAMP)){
- Timestamp value = stmt.getTimestamp(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }else if (dataType.equals(DataType.BOOLEAN)){
- boolean value = stmt.getBoolean(index);
- logger.info("out prameter key : {} , value : {}",prop,value);
- }
- }
+ stmt.executeUpdate();
+
+ /**
+ * print the output parameters to the log
+ */
+ Iterator> iter = outParameterMap.entrySet().iterator();
+ while (iter.hasNext()){
+ Map.Entry en = iter.next();
+
+ int index = en.getKey();
+ Property property = en.getValue();
+ String prop = property.getProp();
+ DataType dataType = property.getType();
+ // get output parameter
+ getOutputParameter(stmt, index, prop, dataType);
+ }
- exitStatusCode = 0;
- }catch (Exception e){
- logger.error(e.getMessage(),e);
+ exitStatusCode = 0;
+ }catch (Exception e){
+ logger.error(e.getMessage(),e);
+ exitStatusCode = -1;
+ throw new RuntimeException(String.format("process interrupted. exit status code is %d",exitStatusCode));
+ }
+ finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
exitStatusCode = -1;
- throw new RuntimeException("process interrupted. exit status code is : " + exitStatusCode);
+ logger.error(e.getMessage(),e);
}
- finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- exitStatusCode = -1;
- logger.error(e.getMessage(),e);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (SQLException e) {
- exitStatusCode = -1;
- logger.error(e.getMessage(), e);
- }
- }
+ }
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ exitStatusCode = -1;
+ logger.error(e.getMessage(), e);
}
}
}
}
+ /**
+ * get output parameter
+ * @param stmt
+ * @param index
+ * @param prop
+ * @param dataType
+ * @throws SQLException
+ */
+ private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
+ switch (dataType){
+ case VARCHAR:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getString(index));
+ break;
+ case INTEGER:
+ logger.info("out prameter key : {} , value : {}", prop, stmt.getInt(index));
+ break;
+ case LONG:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getLong(index));
+ break;
+ case FLOAT:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getFloat(index));
+ break;
+ case DOUBLE:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getDouble(index));
+ break;
+ case DATE:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getDate(index));
+ break;
+ case TIME:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getTime(index));
+ break;
+ case TIMESTAMP:
+ logger.info("out prameter key : {} , value : {}",prop,stmt.getTimestamp(index));
+ break;
+ case BOOLEAN:
+ logger.info("out prameter key : {} , value : {}",prop, stmt.getBoolean(index));
+ break;
+ default:
+ break;
+ }
+ }
+
@Override
public AbstractParameters getParameters() {
return procedureParameters;
@@ -277,61 +284,61 @@ public class ProcedureTask extends AbstractTask {
* @throws Exception
*/
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{
- if (dataType.equals(DataType.VARCHAR)){
+ if (dataType.equals(VARCHAR)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.VARCHAR);
}else {
stmt.registerOutParameter(index, Types.VARCHAR, value);
}
- }else if (dataType.equals(DataType.INTEGER)){
+ }else if (dataType.equals(INTEGER)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.INTEGER);
}else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}
- }else if (dataType.equals(DataType.LONG)){
+ }else if (dataType.equals(LONG)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index,Types.INTEGER);
}else {
stmt.registerOutParameter(index,Types.INTEGER ,value);
}
- }else if (dataType.equals(DataType.FLOAT)){
+ }else if (dataType.equals(FLOAT)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.FLOAT);
}else {
stmt.registerOutParameter(index, Types.FLOAT,value);
}
- }else if (dataType.equals(DataType.DOUBLE)){
+ }else if (dataType.equals(DOUBLE)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.DOUBLE);
}else {
stmt.registerOutParameter(index, Types.DOUBLE , value);
}
- }else if (dataType.equals(DataType.DATE)){
+ }else if (dataType.equals(DATE)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.DATE);
}else {
stmt.registerOutParameter(index, Types.DATE , value);
}
- }else if (dataType.equals(DataType.TIME)){
+ }else if (dataType.equals(TIME)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.TIME);
}else {
stmt.registerOutParameter(index, Types.TIME , value);
}
- }else if (dataType.equals(DataType.TIMESTAMP)){
+ }else if (dataType.equals(TIMESTAMP)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.TIMESTAMP);
}else {
stmt.registerOutParameter(index, Types.TIMESTAMP , value);
}
- }else if (dataType.equals(DataType.BOOLEAN)){
+ }else if (dataType.equals(BOOLEAN)){
if (StringUtils.isEmpty(value)){
stmt.registerOutParameter(index, Types.BOOLEAN);
}else {
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
index c446215a38..cb2bf7d27e 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java
@@ -20,27 +20,18 @@ package cn.escheduler.server.worker.task.python;
import cn.escheduler.common.process.Property;
import cn.escheduler.common.task.AbstractParameters;
import cn.escheduler.common.task.python.PythonParameters;
-import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.ProcessDao;
-import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.PythonCommandExecutor;
import cn.escheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
-import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
+
import java.util.Map;
-import java.util.Set;
/**
* python task
@@ -57,7 +48,10 @@ public class PythonTask extends AbstractTask {
*/
private String taskDir;
- private PythonCommandExecutor pythonProcessTask;
+ /**
+ * python command executor
+ */
+ private PythonCommandExecutor pythonCommandExecutor;
/**
* process database access
@@ -70,10 +64,15 @@ public class PythonTask extends AbstractTask {
this.taskDir = taskProps.getTaskDir();
- this.pythonProcessTask = new PythonCommandExecutor(this::logHandle,
- taskProps.getTaskDir(), taskProps.getTaskAppId(),
- taskProps.getTenantCode(), null, taskProps.getTaskStartTime(),
- taskProps.getTaskTimeout(), logger);
+ this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
+ taskProps.getTaskDir(),
+ taskProps.getTaskAppId(),
+ taskProps.getTaskInstId(),
+ taskProps.getTenantCode(),
+ taskProps.getEnvFile(),
+ taskProps.getTaskStartTime(),
+ taskProps.getTaskTimeout(),
+ logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@@ -92,9 +91,9 @@ public class PythonTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = pythonProcessTask.run(buildCommand(), processDao);
+ exitStatusCode = pythonCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
- logger.error("python process exception", e);
+ logger.error("python task failure", e);
exitStatusCode = -1;
}
}
@@ -102,7 +101,7 @@ public class PythonTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
- pythonProcessTask.cancelApplication();
+ pythonCommandExecutor.cancelApplication();
}
/**
@@ -111,21 +110,7 @@ public class PythonTask extends AbstractTask {
* @throws Exception
*/
private String buildCommand() throws Exception {
- // generate scripts
-// String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId());
-// Path path = new File(fileName).toPath();
-
-
-
-// if (Files.exists(path)) {
-// return fileName;
-// }
-
- String rawScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
-
-
- // find process instance by task id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+ String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
/**
* combining local and global parameters
@@ -133,27 +118,16 @@ public class PythonTask extends AbstractTask {
Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
pythonParameters.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
if (paramsMap != null){
- rawScript = ParameterUtils.convertParameterPlaceholders(rawScript, ParamUtils.convert(paramsMap));
+ rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap));
}
-
-// pythonParameters.setRawScript(rawScript);
-
- logger.info("raw script : {}", pythonParameters.getRawScript());
+ logger.info("raw python script : {}", pythonParameters.getRawScript());
logger.info("task dir : {}", taskDir);
-// Set perms = PosixFilePermissions.fromString("rwxr-xr-x");
-// FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms);
-//
-// Files.createFile(path, attr);
-//
-// Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
-//
-// return fileName;
- return rawScript;
+ return rawPythonScript;
}
@Override
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
index b8564e8f95..12ff021d03 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java
@@ -54,7 +54,7 @@ public class ShellTask extends AbstractTask {
*/
private String taskDir;
- private ShellCommandExecutor processTask;
+ private ShellCommandExecutor shellCommandExecutor;
/**
* process database access
@@ -62,15 +62,19 @@ public class ShellTask extends AbstractTask {
private ProcessDao processDao;
- public ShellTask(TaskProps props, Logger logger) {
- super(props, logger);
+ public ShellTask(TaskProps taskProps, Logger logger) {
+ super(taskProps, logger);
- this.taskDir = props.getTaskDir();
+ this.taskDir = taskProps.getTaskDir();
- this.processTask = new ShellCommandExecutor(this::logHandle,
- props.getTaskDir(), props.getTaskAppId(),
- props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
- props.getTaskTimeout(), logger);
+ this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(),
+ taskProps.getTaskAppId(),
+ taskProps.getTaskInstId(),
+ taskProps.getTenantCode(),
+ taskProps.getEnvFile(),
+ taskProps.getTaskStartTime(),
+ taskProps.getTaskTimeout(),
+ logger);
this.processDao = DaoFactory.getDaoInstance(ProcessDao.class);
}
@@ -89,9 +93,9 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- exitStatusCode = processTask.run(buildCommand(), processDao);
+ exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao);
} catch (Exception e) {
- logger.error(e.getMessage(), e);
+ logger.error("shell task failure", e);
exitStatusCode = -1;
}
}
@@ -99,7 +103,7 @@ public class ShellTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
- processTask.cancelApplication();
+ shellCommandExecutor.cancelApplication();
}
/**
@@ -118,8 +122,6 @@ public class ShellTask extends AbstractTask {
String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n");
- // find process instance by task id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
/**
* combining local and global parameters
@@ -127,8 +129,8 @@ public class ShellTask extends AbstractTask {
Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
shellParameters.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
if (paramsMap != null){
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
index 4764a96a0c..d4acf70b2c 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java
@@ -66,8 +66,6 @@ public class SparkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) {
String args = sparkParameters.getMainArgs();
- // get process instance by task instance id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
/**
* combining local and global parameters
@@ -75,8 +73,8 @@ public class SparkTask extends AbstractYarnTask {
Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sparkParameters.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
if (paramsMap != null ){
args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap));
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
index 09f6467aad..d9a8274520 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
@@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task.sql;
import cn.escheduler.alert.utils.MailUtils;
import cn.escheduler.common.Constants;
-import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ShowType;
import cn.escheduler.common.enums.TaskTimeoutStrategy;
import cn.escheduler.common.enums.UdfType;
@@ -44,8 +43,6 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import java.sql.*;
@@ -54,7 +51,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static cn.escheduler.common.utils.PropertyUtils.getString;
+import static cn.escheduler.common.Constants.*;
+import static cn.escheduler.common.enums.DbType.*;
/**
* sql task
@@ -76,12 +74,22 @@ public class SqlTask extends AbstractTask {
*/
private AlertDao alertDao;
+ /**
+ * datasource
+ */
+ private DataSource dataSource;
+
+ /**
+ * base datasource
+ */
+ private BaseDataSource baseDataSource;
+
- public SqlTask(TaskProps props, Logger logger) {
- super(props, logger);
+ public SqlTask(TaskProps taskProps, Logger logger) {
+ super(taskProps, logger);
logger.info("sql task params {}", taskProps.getTaskParams());
- this.sqlParameters = JSONObject.parseObject(props.getTaskParams(), SqlParameters.class);
+ this.sqlParameters = JSONObject.parseObject(taskProps.getTaskParams(), SqlParameters.class);
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
@@ -97,75 +105,73 @@ public class SqlTask extends AbstractTask {
Thread.currentThread().setName(threadLoggerInfoName);
logger.info(sqlParameters.toString());
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
- sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(),
- sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), sqlParameters.getConnParams());
-
- // determine whether there is a data source
+ sqlParameters.getType(),
+ sqlParameters.getDatasource(),
+ sqlParameters.getSql(),
+ sqlParameters.getLocalParams(),
+ sqlParameters.getUdfs(),
+ sqlParameters.getShowType(),
+ sqlParameters.getConnParams());
+
+ // not set data source
if (sqlParameters.getDatasource() == 0){
- logger.error("datasource is null");
+ logger.error("datasource id not exists");
exitStatusCode = -1;
- }else {
- List createFuncs = null;
- DataSource dataSource = processDao.findDataSourceById(sqlParameters.getDatasource());
- logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
- dataSource.getName(),dataSource.getType(),dataSource.getNote(),
- dataSource.getUserId(),dataSource.getConnectionParams());
-
- if (dataSource != null){
- Connection con = null;
- try {
- BaseDataSource baseDataSource = null;
- if (DbType.MYSQL.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class);
- Class.forName(Constants.JDBC_MYSQL_CLASS_NAME);
- }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class);
- Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME);
- }else if (DbType.HIVE.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),HiveDataSource.class);
- Class.forName(Constants.JDBC_HIVE_CLASS_NAME);
- }else if (DbType.SPARK.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class);
- Class.forName(Constants.JDBC_SPARK_CLASS_NAME);
- }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class);
- Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME);
- }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),OracleDataSource.class);
- Class.forName(Constants.JDBC_ORACLE_CLASS_NAME);
- }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){
- baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SQLServerDataSource.class);
- Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME);
- }
+ return;
+ }
+ dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
+ logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
+ dataSource.getName(),
+ dataSource.getType(),
+ dataSource.getNote(),
+ dataSource.getUserId(),
+ dataSource.getConnectionParams());
- // ready to execute SQL and parameter entity Map
- SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
- List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()).orElse(new ArrayList<>())
- .stream()
- .map(this::getSqlAndSqlParamsMap)
- .collect(Collectors.toList());
- List postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()).orElse(new ArrayList<>())
- .stream()
- .map(this::getSqlAndSqlParamsMap)
- .collect(Collectors.toList());
-
- if(EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) && StringUtils.isNotEmpty(sqlParameters.getUdfs())){
- List udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs());
- createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
- }
+ if (dataSource == null){
+ logger.error("datasource not exists");
+ exitStatusCode = -1;
+ return;
+ }
- // execute sql task
- con = executeFuncAndSql(baseDataSource, mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+ Connection con = null;
+ List createFuncs = null;
+ try {
+ // load class
+ DataSourceFactory.loadClass(dataSource.getType());
+ // get datasource
+ baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
+ dataSource.getConnectionParams());
+
+ // ready to execute SQL and parameter entity Map
+ SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
+ List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
+ .orElse(new ArrayList<>())
+ .stream()
+ .map(this::getSqlAndSqlParamsMap)
+ .collect(Collectors.toList());
+ List postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements())
+ .orElse(new ArrayList<>())
+ .stream()
+ .map(this::getSqlAndSqlParamsMap)
+ .collect(Collectors.toList());
+
+ // determine if it is UDF
+ boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType())
+ && StringUtils.isNotEmpty(sqlParameters.getUdfs());
+ if(udfTypeFlag){
+ List udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs());
+ createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
+ }
- } finally {
- if (con != null) {
- try {
- con.close();
- } catch (SQLException e) {
- throw e;
- }
- }
+ // execute sql task
+ con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
+ } finally {
+ if (con != null) {
+ try {
+ con.close();
+ } catch (SQLException e) {
+ throw e;
}
}
}
@@ -180,13 +186,13 @@ public class SqlTask extends AbstractTask {
StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
+
Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
- processInstance.getCmdTypeIfComplement(),
- processInstance.getScheduleTime());
+ taskProps.getCmdTypeIfComplement(),
+ taskProps.getScheduleTime());
// spell SQL according to the final user-defined variable
if(paramsMap == null){
@@ -195,14 +201,15 @@ public class SqlTask extends AbstractTask {
}
if (StringUtils.isNotEmpty(sqlParameters.getTitle())){
- String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), ParamUtils.convert(paramsMap));
- logger.info(title);
+ String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(),
+ ParamUtils.convert(paramsMap));
+ logger.info("SQL tile : {}",title);
sqlParameters.setTitle(title);
}
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
- setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap);
+ setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex,"?");
@@ -219,47 +226,45 @@ public class SqlTask extends AbstractTask {
}
/**
- * execute sql
- * @param baseDataSource
+ * execute sql
* @param mainSqlBinds
* @param preStatementsBinds
* @param postStatementsBinds
* @param createFuncs
+ * @return
*/
- public Connection executeFuncAndSql(BaseDataSource baseDataSource,
- SqlBinds mainSqlBinds,
+ public Connection executeFuncAndSql(SqlBinds mainSqlBinds,
List preStatementsBinds,
List postStatementsBinds,
List createFuncs){
Connection connection = null;
try {
- if (CommonUtils.getKerberosStartupState()) {
- System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
- getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
- Configuration configuration = new Configuration();
- configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- UserGroupInformation.setConfiguration(configuration);
- UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
- getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
- }
- if (DbType.HIVE.name().equals(sqlParameters.getType())) {
+ // if upload resource is HDFS and kerberos startup
+ CommonUtils.loadKerberosConf();
+
+ // if hive , load connection params if exists
+ if (HIVE == dataSource.getType()) {
Properties paramProp = new Properties();
- paramProp.setProperty("user", baseDataSource.getUser());
- paramProp.setProperty("password", baseDataSource.getPassword());
- Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), Constants.SEMICOLON,"hiveconf:");
+ paramProp.setProperty(USER, baseDataSource.getUser());
+ paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
+ Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(),
+ SEMICOLON,
+ HIVE_CONF);
if(connParamMap != null){
paramProp.putAll(connParamMap);
}
- connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),paramProp);
+ connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
+ paramProp);
}else{
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
- baseDataSource.getUser(), baseDataSource.getPassword());
+ baseDataSource.getUser(),
+ baseDataSource.getPassword());
}
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
- try (Statement funcStmt = connection.createStatement()) {
+ try (Statement funcStmt = connection.createStatement()) {
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
funcStmt.execute(createFunc);
@@ -270,7 +275,7 @@ public class SqlTask extends AbstractTask {
for (SqlBinds sqlBind: preStatementsBinds) {
try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
int result = stmt.executeUpdate();
- logger.info("pre statement execute result: " + result + ", for sql: " + sqlBind.getSql());
+ logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
}
}
@@ -278,7 +283,7 @@ public class SqlTask extends AbstractTask {
// decide whether to executeQuery or executeUpdate based on sqlType
if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
- JSONArray array = new JSONArray();
+ JSONArray resultJSONArray = new JSONArray();
ResultSet resultSet = stmt.executeQuery();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
@@ -288,21 +293,19 @@ public class SqlTask extends AbstractTask {
for (int i = 1; i <= num; i++) {
mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i));
}
- array.add(mapOfColValues);
+ resultJSONArray.add(mapOfColValues);
}
- logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
-
- // send as an attachment
- if (StringUtils.isEmpty(sqlParameters.getShowType())) {
- logger.info("showType is empty,don't need send email");
- } else {
- if (array.size() > 0) {
- if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
- sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
- }else{
- sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
- }
+ logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+
+ // if there is a result set
+ if (resultJSONArray.size() > 0) {
+ if (StringUtils.isNotEmpty(sqlParameters.getTitle())) {
+ sendAttachment(sqlParameters.getTitle(),
+ JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
+ }else{
+ sendAttachment(taskProps.getNodeName() + " query resultsets ",
+ JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}
}
@@ -310,7 +313,7 @@ public class SqlTask extends AbstractTask {
} else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) {
// non query statement
- int result = stmt.executeUpdate();
+ stmt.executeUpdate();
exitStatusCode = 0;
}
}
@@ -318,7 +321,7 @@ public class SqlTask extends AbstractTask {
for (SqlBinds sqlBind: postStatementsBinds) {
try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) {
int result = stmt.executeUpdate();
- logger.info("post statement execute result: " + result + ", for sql: " + sqlBind.getSql());
+ logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
}
}
} catch (Exception e) {
@@ -328,9 +331,19 @@ public class SqlTask extends AbstractTask {
return connection;
}
+ /**
+ * preparedStatement bind
+ * @param connection
+ * @param sqlBinds
+ * @return
+ * @throws Exception
+ */
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
- if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){
+ // is the timeout set
+ boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED ||
+ taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED;
+ if(timeoutFlag){
stmt.setQueryTimeout(taskProps.getTaskTimeout());
}
Map params = sqlBinds.getParamsMap();
@@ -340,7 +353,7 @@ public class SqlTask extends AbstractTask {
ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue());
}
}
- logger.info("prepare statement replace sql:{}",stmt.toString());
+ logger.info("prepare statement replace sql : {} ",stmt.toString());
return stmt;
}
@@ -354,9 +367,6 @@ public class SqlTask extends AbstractTask {
// process instance
ProcessInstance instance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
- // process define
- ProcessDefinition processDefine = processDao.findProcessDefineById(instance.getProcessDefinitionId());
-
List users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
// receiving group list
@@ -367,7 +377,7 @@ public class SqlTask extends AbstractTask {
// custom receiver
String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){
- String[] splits = receivers.split(Constants.COMMA);
+ String[] splits = receivers.split(COMMA);
for (String receiver : splits){
receviersList.add(receiver.trim());
}
@@ -378,16 +388,17 @@ public class SqlTask extends AbstractTask {
// Custom Copier
String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){
- String[] splits = receiversCc.split(Constants.COMMA);
+ String[] splits = receiversCc.split(COMMA);
for (String receiverCc : splits){
receviersCcList.add(receiverCc.trim());
}
}
- String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
+ String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
- Map mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
- if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
+ Map mailResult = MailUtils.sendMails(receviersList,
+ receviersCcList, title, content, ShowType.valueOf(showTypeName));
+ if(!(Boolean) mailResult.get(STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{
@@ -425,7 +436,7 @@ public class SqlTask extends AbstractTask {
public void printReplacedSql(String content, String formatSql,String rgex, Map sqlParamsMap){
//parameter print style
logger.info("after replace sql , preparing : {}" , formatSql);
- StringBuffer logPrint = new StringBuffer("replaced sql , parameters:");
+ StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}