From 67c28c76a04e393ee7baa5450096308ab3739e18 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Thu, 25 Jul 2019 18:33:08 +0800 Subject: [PATCH] worker code optimization --- .../api/controller/LoginController.java | 2 +- .../api/service/SessionService.java | 6 +- .../java/cn/escheduler/common/Constants.java | 25 +- .../escheduler/common/enums/ServerEnum.java | 29 ++ .../common/job/db/DataSourceFactory.java | 39 +- .../escheduler/common/utils/CommonUtils.java | 17 + .../common/zk/AbstractZKClient.java | 13 + .../java/cn/escheduler/dao/ProcessDao.java | 21 +- .../escheduler/dao/mapper/SessionMapper.java | 6 +- .../dao/mapper/SessionMapperProvider.java | 2 - .../escheduler/dao/model/ProcessInstance.java | 73 ++-- .../cn/escheduler/dao/model/TaskInstance.java | 8 + .../escheduler/server/utils/ProcessUtils.java | 3 +- .../server/worker/WorkerServer.java | 2 +- .../server/worker/log/TaskLogAppender.java | 2 - .../server/worker/runner/FetchTaskThread.java | 201 ++++++----- .../worker/runner/TaskScheduleThread.java | 283 ++++++--------- .../worker/task/AbstractCommandExecutor.java | 8 +- .../server/worker/task/AbstractTask.java | 113 +++++- .../server/worker/task/AbstractYarnTask.java | 25 +- .../worker/task/PythonCommandExecutor.java | 41 ++- .../worker/task/ShellCommandExecutor.java | 16 +- .../server/worker/task/TaskProps.java | 86 ++++- .../task/dependent/DependentExecute.java | 2 - .../worker/task/dependent/DependentTask.java | 2 +- .../server/worker/task/mr/MapReduceTask.java | 7 +- .../task/processdure/ProcedureTask.java | 337 +++++++++--------- .../server/worker/task/python/PythonTask.java | 72 ++-- .../server/worker/task/shell/ShellTask.java | 32 +- .../server/worker/task/spark/SparkTask.java | 6 +- .../server/worker/task/sql/SqlTask.java | 259 +++++++------- 31 files changed, 998 insertions(+), 740 deletions(-) create mode 100644 escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java index f5521c6293..0118add35d 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/LoginController.java @@ -116,7 +116,7 @@ public class LoginController extends BaseController { response.setStatus(HttpStatus.SC_OK); response.addCookie(new Cookie(Constants.SESSION_ID, sessionId)); - logger.info("sessionId = " + sessionId); + logger.info("sessionId : {}" , sessionId); return success(LOGIN_SUCCESS.getMsg(), sessionId); } catch (Exception e) { logger.error(USER_LOGIN_FAILURE.getMsg(),e); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java index a1bf74395a..e57535cf7a 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/SessionService.java @@ -68,7 +68,7 @@ public class SessionService extends BaseService{ String ip = BaseController.getClientIpAddress(request); logger.info("get session: {}, ip: {}", sessionId, ip); - return sessionMapper.queryByIdAndIp(sessionId, ip); + return sessionMapper.queryByIdAndIp(sessionId); } /** @@ -80,7 +80,7 @@ public class SessionService extends BaseService{ */ public String createSession(User user, String ip) { // logined - Session session = sessionMapper.queryByUserIdAndIp(user.getId(), ip); + Session session = sessionMapper.queryByUserIdAndIp(user.getId()); Date now = new Date(); /** @@ -126,7 +126,7 @@ public class SessionService extends BaseService{ /** * query session by user id and ip */ - Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId(), ip); + Session session = sessionMapper.queryByUserIdAndIp(loginUser.getId()); //delete session sessionMapper.deleteById(session.getId()); } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index 6c39e0a264..10a4460678 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -119,10 +119,6 @@ public final class Constants { */ public static final String ESCHEDULER_ENV_PATH = "escheduler.env.path"; - /** - * escheduler.env.sh - */ - public static final String ESCHEDULER_ENV_SH = ".escheduler_env.sh"; /** * python home @@ -220,9 +216,9 @@ public final class Constants { public static final String SEMICOLON = ";"; /** - * DOT . + * EQUAL SIGN */ - public static final String DOT = "."; + public static final String EQUAL_SIGN = "="; /** * ZOOKEEPER_SESSION_TIMEOUT @@ -283,10 +279,6 @@ public final class Constants { */ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss"; - /** - * date format of yyyyMMdd - */ - public static final String YYYYMMDD = "yyyyMMdd"; /** * date format of yyyyMMddHHmmss @@ -489,6 +481,7 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; public static final String DEFAULT = "Default"; + public static final String USER = "user"; public static final String PASSWORD = "password"; public static final String XXXXXX = "******"; @@ -499,6 +492,7 @@ public final class Constants { public static final String STATUS = "status"; + /** * command parameter keys */ @@ -866,6 +860,11 @@ public final class Constants { */ public static final int PREVIEW_SCHEDULE_EXECUTE_COUNT = 5; + /** + * kerberos + */ + public static final String KERBEROS = "kerberos"; + /** * java.security.krb5.conf */ @@ -901,4 +900,10 @@ public final class Constants { * loginUserFromKeytab path */ public static final String LOGIN_USER_KEY_TAB_PATH = "login.user.keytab.path"; + + + /** + * hive conf + */ + public static final String HIVE_CONF = "hiveconf:"; } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java b/escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java new file mode 100644 index 0000000000..5c27f6567b --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/enums/ServerEnum.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.enums; + +/** + * cycle enums + */ +public enum ServerEnum { + + /** + * master server , worker server + */ + MASTER_SERVER,WORKER_SERVER + +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java index 316be26d89..c710944bd2 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/job/db/DataSourceFactory.java @@ -21,6 +21,8 @@ import cn.escheduler.common.utils.JSONUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static cn.escheduler.common.Constants.*; + /** * produce datasource in this custom defined datasource factory. */ @@ -49,8 +51,43 @@ public class DataSourceFactory { return null; } } catch (Exception e) { - logger.error("Get datasource object error", e); + logger.error("get datasource object error", e); return null; } } + + /** + * load class + * @param dbType + * @throws Exception + */ + public static void loadClass(DbType dbType) throws Exception{ + switch (dbType){ + case MYSQL : + Class.forName(JDBC_MYSQL_CLASS_NAME); + break; + case POSTGRESQL : + Class.forName(JDBC_POSTGRESQL_CLASS_NAME); + break; + case HIVE : + Class.forName(JDBC_HIVE_CLASS_NAME); + break; + case SPARK : + Class.forName(JDBC_SPARK_CLASS_NAME); + break; + case CLICKHOUSE : + Class.forName(JDBC_CLICKHOUSE_CLASS_NAME); + break; + case ORACLE : + Class.forName(JDBC_ORACLE_CLASS_NAME); + break; + case SQLSERVER: + Class.forName(JDBC_SQLSERVER_CLASS_NAME); + break; + default: + logger.error("not support sql type: {},can't load class", dbType); + throw new IllegalArgumentException("not support sql type,can't load class"); + + } + } } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java index 43087fbd9c..b1d084f446 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java @@ -19,6 +19,8 @@ package cn.escheduler.common.utils; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ResUploadType; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,4 +76,19 @@ public class CommonUtils { Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); return resUploadType == ResUploadType.HDFS && kerberosStartupState; } + + /** + * load kerberos configuration + * @throws Exception + */ + public static void loadKerberosConf()throws Exception{ + if (CommonUtils.getKerberosStartupState()) { + System.setProperty(JAVA_SECURITY_KRB5_CONF, getString(JAVA_SECURITY_KRB5_CONF_PATH)); + Configuration configuration = new Configuration(); + configuration.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS); + UserGroupInformation.setConfiguration(configuration); + UserGroupInformation.loginUserFromKeytab(getString(LOGIN_USER_KEY_TAB_USERNAME), + getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH)); + } + } } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 3c58996298..468efb326d 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -18,6 +18,7 @@ package cn.escheduler.common.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.IStoppable; +import cn.escheduler.common.enums.ServerEnum; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; import org.apache.commons.configuration.Configuration; @@ -27,6 +28,7 @@ import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -415,6 +417,17 @@ public abstract class AbstractZKClient { return conf.getString(Constants.ZOOKEEPER_ESCHEDULER_LOCK_FAILOVER_WORKERS); } + /** + * acquire zk lock + * @param zkClient + * @param zNodeLockPath + * @throws Exception + */ + public InterProcessMutex acquireZkLock(CuratorFramework zkClient,String zNodeLockPath)throws Exception{ + InterProcessMutex mutex = new InterProcessMutex(zkClient, zNodeLockPath); + mutex.acquire(); + return mutex; + } @Override public String toString() { diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 6de893379d..b07cd92d83 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -1220,6 +1220,26 @@ public class ProcessDao extends AbstractBaseDao { return taskInstanceMapper.queryById(taskId); } + + /** + * package task instance,associate processInstance and processDefine + * @param taskInstId + * @return + */ + public TaskInstance getTaskInstanceRelationByTaskId(int taskInstId){ + // get task instance + TaskInstance taskInstance = findTaskInstanceById(taskInstId); + // get process instance + ProcessInstance processInstance = findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + // get process define + ProcessDefinition processDefine = findProcessDefineById(taskInstance.getProcessDefinitionId()); + + taskInstance.setProcessInstance(processInstance); + taskInstance.setProcessDefine(processDefine); + return taskInstance; + } + + /** * get id list by task state * @param instanceId @@ -1324,7 +1344,6 @@ public class ProcessDao extends AbstractBaseDao { String executePath, String logPath, int taskInstId) { - TaskInstance taskInstance = taskInstanceMapper.queryById(taskInstId); taskInstance.setState(state); taskInstance.setStartTime(startTime); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java index bd67458d5f..748ee474f7 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapper.java @@ -75,7 +75,6 @@ public interface SessionMapper { * query by session id and ip * * @param sessionId - * @param ip * @return */ @Results(value = { @@ -85,13 +84,12 @@ public interface SessionMapper { @Result(property = "lastLoginTime", column = "last_login_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) }) @SelectProvider(type = SessionMapperProvider.class, method = "queryByIdAndIp") - Session queryByIdAndIp(@Param("sessionId") String sessionId, @Param("ip") String ip); + Session queryByIdAndIp(@Param("sessionId") String sessionId); /** * query by user id and ip * @param userId - * @param ip * @return */ @Results(value = { @@ -101,6 +99,6 @@ public interface SessionMapper { @Result(property = "lastLoginTime", column = "last_login_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) }) @SelectProvider(type = SessionMapperProvider.class, method = "queryByUserIdAndIp") - Session queryByUserIdAndIp(@Param("userId") int userId, @Param("ip") String ip); + Session queryByUserIdAndIp(@Param("userId") int userId); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java index 9bf9f8f943..07567fd9ea 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/SessionMapperProvider.java @@ -114,7 +114,6 @@ public class SessionMapperProvider { FROM(TABLE_NAME); WHERE("`id` = #{sessionId}"); - WHERE("`ip` = #{ip}"); }}.toString(); } @@ -130,7 +129,6 @@ public class SessionMapperProvider { FROM(TABLE_NAME); WHERE("`user_id` = #{userId}"); - WHERE("`ip` = #{ip}"); }}.toString(); } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index 5c9418ca72..95b42acd58 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -529,6 +529,39 @@ public class ProcessInstance { this.timeout = timeout; } + + public void setTenantId(int tenantId) { + this.tenantId = tenantId; + } + + public int getTenantId() { + return this.tenantId ; + } + + public String getWorkerGroupName() { + return workerGroupName; + } + + public void setWorkerGroupName(String workerGroupName) { + this.workerGroupName = workerGroupName; + } + + public String getReceivers() { + return receivers; + } + + public void setReceivers(String receivers) { + this.receivers = receivers; + } + + public String getReceiversCc() { + return receiversCc; + } + + public void setReceiversCc(String receiversCc) { + this.receiversCc = receiversCc; + } + @Override public String toString() { return "ProcessInstance{" + @@ -555,7 +588,6 @@ public class ProcessInstance { ", processInstanceJson='" + processInstanceJson + '\'' + ", executorId=" + executorId + ", tenantCode='" + tenantCode + '\'' + - ", tenantId='" + tenantId + '\'' + ", queue='" + queue + '\'' + ", isSubProcess=" + isSubProcess + ", locations='" + locations + '\'' + @@ -563,40 +595,13 @@ public class ProcessInstance { ", historyCmd='" + historyCmd + '\'' + ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + ", duration=" + duration + - ", timeout=" + timeout + ", processInstancePriority=" + processInstancePriority + + ", workerGroupId=" + workerGroupId + + ", timeout=" + timeout + + ", tenantId=" + tenantId + + ", workerGroupName='" + workerGroupName + '\'' + + ", receivers='" + receivers + '\'' + + ", receiversCc='" + receiversCc + '\'' + '}'; } - - public void setTenantId(int tenantId) { - this.tenantId = tenantId; - } - - public int getTenantId() { - return this.tenantId ; - } - - public String getWorkerGroupName() { - return workerGroupName; - } - - public void setWorkerGroupName(String workerGroupName) { - this.workerGroupName = workerGroupName; - } - - public String getReceivers() { - return receivers; - } - - public void setReceivers(String receivers) { - this.receivers = receivers; - } - - public String getReceiversCc() { - return receiversCc; - } - - public void setReceiversCc(String receiversCc) { - this.receiversCc = receiversCc; - } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java index 7f7981ef7c..7edce1c21d 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java @@ -189,6 +189,14 @@ public class TaskInstance { private int workerGroupId; + + public void init(String host,Date startTime,String executePath){ + this.host = host; + this.startTime = startTime; + this.executePath = executePath; + } + + public ProcessInstance getProcessInstance() { return processInstance; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java index 2a7643844f..3a07f69d9a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java @@ -314,8 +314,7 @@ public class ProcessUtils { } } catch (Exception e) { - logger.error("kill yarn job failed : " + e.getMessage(),e); -// throw new RuntimeException("kill yarn job fail"); + logger.error("kill yarn job failure",e); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java index 05e3d21c45..0c6b5adb54 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/WorkerServer.java @@ -254,7 +254,7 @@ public class WorkerServer implements IStoppable { try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); } catch (InterruptedException e) { - logger.error("interrupted exception : " + e.getMessage(),e); + logger.error("interrupted exception",e); } // if set is null , return if (CollectionUtils.isNotEmpty(taskInfoSet)){ diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java index 9e557127b5..6ee8f6af9e 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java @@ -26,8 +26,6 @@ import org.slf4j.LoggerFactory; */ public class TaskLogAppender extends FileAppender { - private static final Logger logger = LoggerFactory.getLogger(TaskLogAppender.class); - private String currentlyActiveFile; @Override diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 5495096161..fb96763add 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -20,6 +20,7 @@ import cn.escheduler.common.Constants; import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.thread.Stopper; import cn.escheduler.common.thread.ThreadUtils; +import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.FileUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.dao.ProcessDao; @@ -27,7 +28,6 @@ import cn.escheduler.dao.model.*; import cn.escheduler.server.zk.ZKWorkerClient; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,8 +74,20 @@ public class FetchTaskThread implements Runnable{ */ private int workerExecNums; + /** + * conf + */ private Configuration conf; + /** + * task instance + */ + private TaskInstance taskInstance; + + /** + * task instance id + */ + Integer taskInstId; public FetchTaskThread(int taskNum, ZKWorkerClient zkWorkerClient, ProcessDao processDao, Configuration conf, @@ -124,126 +136,101 @@ public class FetchTaskThread implements Runnable{ @Override public void run() { - while (Stopper.isRunning()){ InterProcessMutex mutex = null; try { ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; - //check memory and cpu usage and threads - if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) { - - //whether have tasks, if no tasks , no need lock //get all tasks - List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE); - if(tasksQueueList.size() > 0){ - // creating distributed locks, lock path /escheduler/lock/worker - String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); - mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); - mutex.acquire(); + boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor); - // task instance id str - List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum); - - for(String taskQueueStr : taskQueueStrArr){ - if (StringUtils.isNotBlank(taskQueueStr )) { - - if (!checkThreadCount(poolExecutor)) { - break; - } - - String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); - String taskInstIdStr = taskStringArray[3]; - Date now = new Date(); - Integer taskId = Integer.parseInt(taskInstIdStr); - - // find task instance by task id - TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); - - logger.info("worker fetch taskId : {} from queue ", taskId); - - int retryTimes = 30; - // mainly to wait for the master insert task to succeed - while (taskInstance == null && retryTimes > 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - taskInstance = processDao.findTaskInstanceById(taskId); - retryTimes--; - } - - if (taskInstance == null ) { - logger.error("task instance is null. task id : {} ", taskId); - continue; - } - - if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ - continue; - } - taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); - logger.info("remove task:{} from queue", taskQueueStr); + Thread.sleep(Constants.SLEEP_TIME_MILLIS); - // set execute task worker host - taskInstance.setHost(OSUtils.getHost()); - taskInstance.setStartTime(now); + if(!runCheckFlag) { + continue; + } + //whether have tasks, if no tasks , no need lock //get all tasks + List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE); + if (CollectionUtils.isEmpty(tasksQueueList)){ + continue; + } + // creating distributed locks, lock path /escheduler/lock/worker + mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(), + zkWorkerClient.getWorkerLockPath()); - // get process instance - ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - // get process define - ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); + // task instance id str + List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum); + for(String taskQueueStr : taskQueueStrArr){ + if (StringUtils.isEmpty(taskQueueStr)) { + continue; + } - taskInstance.setProcessInstance(processInstance); - taskInstance.setProcessDefine(processDefine); + if (!checkThreadCount(poolExecutor)) { + break; + } + // get task instance id + taskInstId = Integer.parseInt(taskQueueStr.split(Constants.UNDERLINE)[3]); + + // get task instance relation + TaskInstance taskInstance = processDao.getTaskInstanceRelationByTaskId(taskInstId); + + Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), + taskInstance.getProcessDefine().getUserId()); + if(tenant == null){ + logger.error("tenant not exists,process define id : {},process instance id : {},task instance id : {}", + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId()); + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + continue; + } - // get local execute path - String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), - processDefine.getId(), - processInstance.getId(), - taskInstance.getId()); - logger.info("task instance local execute path : {} ", execLocalPath); + logger.info("worker fetch taskId : {} from queue ", taskInstId); + // mainly to wait for the master insert task to succeed + waitForMasterEnterQueue(); - // set task execute path - taskInstance.setExecutePath(execLocalPath); + if (taskInstance == null ) { + logger.error("task instance is null. task id : {} ", taskInstId); + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + continue; + } - Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), - processDefine.getUserId()); - if(tenant == null){ - logger.error("cannot find suitable tenant for the task:{}, process instance tenant:{}, process definition tenant:{}", - taskInstance.getName(),processInstance.getTenantId(), processDefine.getTenantId()); - continue; - } + if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ + continue; + } - // check and create Linux users - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, - tenant.getTenantCode(), logger); + // get local execute path + logger.info("task instance local execute path : {} ", getExecLocalPath()); - logger.info("task : {} ready to submit to task scheduler thread",taskId); - // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + // init task + taskInstance.init(OSUtils.getHost(), + new Date(), + getExecLocalPath()); - } - } + // check and create Linux users + FileUtils.createWorkDirAndUserIfAbsent(getExecLocalPath(), + tenant.getTenantCode(), logger); - } + logger.info("task : {} ready to submit to task scheduler thread",taskInstId); + // submit task + workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + // remove node from zk + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); } - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - }catch (Exception e){ - logger.error("fetch task thread exception : " + e.getMessage(),e); + logger.error("fetch task thread failure" ,e); }finally { if (mutex != null){ try { mutex.release(); } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("fetch task lock release"); - }else{ - logger.error("fetch task lock release failed : " + e.getMessage(),e); - } + logger.error("fetch task lock release failure ",e); } } } @@ -251,16 +238,44 @@ public class FetchTaskThread implements Runnable{ } /** - * + * get execute local path + * @return + */ + private String getExecLocalPath(){ + return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(), + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessDefine().getId(), + taskInstance.getId()); + } + /** + * check * @param poolExecutor * @return */ private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) { int activeCount = poolExecutor.getActiveCount(); if (activeCount >= workerExecNums) { - logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS); + logger.info("thread insufficient , activeCount : {} , " + + "workerExecNums : {}, will sleep : {} millis for thread resource", + activeCount, + workerExecNums, + Constants.SLEEP_TIME_MILLIS); return false; } return true; } + + /** + * mainly to wait for the master insert task to succeed + * @throws Exception + */ + private void waitForMasterEnterQueue()throws Exception{ + int retryTimes = 30; + + while (taskInstance == null && retryTimes > 0) { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + taskInstance = processDao.findTaskInstanceById(taskInstId); + retryTimes--; + } + } } \ No newline at end of file diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index f1b4265e30..6265097930 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -59,13 +59,16 @@ import java.util.stream.Collectors; /** * task scheduler thread */ -public class TaskScheduleThread implements Callable { +public class TaskScheduleThread implements Runnable { /** * logger */ private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class); + /** + * task prefix + */ private static final String TASK_PREFIX = "TASK"; /** @@ -79,7 +82,7 @@ public class TaskScheduleThread implements Callable { private final ProcessDao processDao; /** - * execute task info + * abstract task */ private AbstractTask task; @@ -89,115 +92,55 @@ public class TaskScheduleThread implements Callable { } @Override - public Boolean call() throws Exception { + public void run() { - // get task type - String taskType = taskInstance.getTaskType(); - // set task state - taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION); - - // update task state - if(taskType.equals(TaskType.SQL.name()) || taskType.equals(TaskType.PROCEDURE.name())){ - processDao.changeTaskState(taskInstance.getState(), - taskInstance.getStartTime(), - taskInstance.getHost(), - null, - System.getProperty("user.dir") + "/logs/" + - taskInstance.getProcessDefinitionId() +"/" + - taskInstance.getProcessInstanceId() +"/" + - taskInstance.getId() + ".log", - taskInstance.getId()); - }else{ - processDao.changeTaskState(taskInstance.getState(), - taskInstance.getStartTime(), - taskInstance.getHost(), - taskInstance.getExecutePath(), - System.getProperty("user.dir") + "/logs/" + - taskInstance.getProcessDefinitionId() +"/" + - taskInstance.getProcessInstanceId() +"/" + - taskInstance.getId() + ".log", - taskInstance.getId()); - } - - ExecutionStatus status = ExecutionStatus.SUCCESS; + // update task state is running according to task type + updateTaskState(taskInstance.getTaskType()); try { + logger.info("script path : {}", taskInstance.getExecutePath()); + // task node + TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class); - - // custom param str - String customParamStr = taskInstance.getProcessInstance().getGlobalParams(); - - - Map allParamMap = new HashMap<>(); - - - if (customParamStr != null) { - List customParamMap = JSONObject.parseArray(customParamStr, Property.class); - - Map userDefinedParamMap = customParamMap.stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); - - allParamMap.putAll(userDefinedParamMap); - } - - logger.info("script path : {}",taskInstance.getExecutePath()); - - TaskProps taskProps = new TaskProps(); - - taskProps.setTaskDir(taskInstance.getExecutePath()); - - String taskJson = taskInstance.getTaskJson(); - - - TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class); - - - List projectRes = createProjectResFiles(taskNode); - - // copy hdfs file to local + // copy hdfs/minio file to local copyHdfsToLocal(processDao, taskInstance.getExecutePath(), - projectRes, + createProjectResFiles(taskNode), logger); - // set task params - taskProps.setTaskParams(taskNode.getParams()); - // set tenant code , execute task linux user - - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - - taskProps.setScheduleTime(processInstance.getScheduleTime()); - taskProps.setNodeName(taskInstance.getName()); - taskProps.setTaskInstId(taskInstance.getId()); - taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); - - ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); + // get process instance according to tak instance + ProcessInstance processInstance = taskInstance.getProcessInstance(); + // get process define according to tak instance + ProcessDefinition processDefine = taskInstance.getProcessDefine(); + // get tenant info Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), - processDefine.getUserId()); + processDefine.getUserId()); if(tenant == null){ - processInstance.setTenantCode(tenant.getTenantCode()); - logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}", - processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId() - ); - status = ExecutionStatus.FAILURE; + logger.error("cannot find the tenant, process definition id:{}, user id:{}", + processDefine.getId(), + processDefine.getUserId()); + task.setExitStatusCode(Constants.EXIT_CODE_FAILURE); }else{ - taskProps.setTenantCode(tenant.getTenantCode()); - String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); - // set queue - if (StringUtils.isEmpty(queue)){ - taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); - }else { - taskProps.setQueue(tenant.getQueueName()); - } - taskProps.setTaskStartTime(taskInstance.getStartTime()); - taskProps.setDefinedParams(allParamMap); + // set task props + TaskProps taskProps = new TaskProps(taskNode.getParams(), + taskInstance.getExecutePath(), + processInstance.getScheduleTime(), + taskInstance.getName(), + taskInstance.getTaskType(), + taskInstance.getId(), + CommonUtils.getSystemEnvPath(), + tenant.getTenantCode(), + tenant.getQueueName(), + taskInstance.getStartTime(), + getGlobalParamsMap(), + taskInstance.getDependency(), + processInstance.getCmdTypeIfComplement()); // set task timeout setTaskTimeout(taskProps, taskNode); - taskProps.setDependence(taskInstance.getDependency()); - taskProps.setTaskAppId(String.format("%s_%s_%s", taskInstance.getProcessDefine().getId(), taskInstance.getProcessInstance().getId(), @@ -209,72 +152,98 @@ public class TaskScheduleThread implements Callable { taskInstance.getProcessInstance().getId(), taskInstance.getId())); - task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); + task = TaskManager.newTask(taskInstance.getTaskType(), + taskProps, + taskLogger); - // job init + // task init task.init(); - // job handle + // task handle task.handle(); - logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); - - if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ - status = ExecutionStatus.SUCCESS; - // task recor flat : if true , start up qianfan - if (TaskRecordDao.getTaskRecordFlag() - && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - - AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); - - // replace placeholder - Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), - taskProps.getDefinedParams(), - params.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); - if (paramsMap != null && !paramsMap.isEmpty() - && paramsMap.containsKey("v_proc_date")){ - String vProcDate = paramsMap.get("v_proc_date").getValue(); - if (!StringUtils.isEmpty(vProcDate)){ - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; - } - } - } - } - }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ - status = ExecutionStatus.KILL; - }else { - status = ExecutionStatus.FAILURE; - } + // task result process + task.after(); } }catch (Exception e){ - logger.error("task escheduler failure : ", e); - status = ExecutionStatus.FAILURE ; - logger.error(String.format("task process exception, process id : %s , task : %s", - taskInstance.getProcessInstanceId(), - taskInstance.getName()),e); + logger.error("task scheduler failure", e); + task.setExitStatusCode(Constants.EXIT_CODE_FAILURE); kill(); } + + logger.info("task instance id : {},task final status : {}", + taskInstance.getId(), + task.getExitStatus()); // update task instance state - processDao.changeTaskState(status, + processDao.changeTaskState(task.getExitStatus(), new Date(), taskInstance.getId()); - return task.getExitStatusCode() > Constants.EXIT_CODE_SUCCESS; } /** - * set task time out + * get global paras map + * @return + */ + private Map getGlobalParamsMap() { + Map globalParamsMap = new HashMap<>(16); + + // global params string + String globalParamsStr = taskInstance.getProcessInstance().getGlobalParams(); + + if (globalParamsStr != null) { + List globalParamsList = JSONObject.parseArray(globalParamsStr, Property.class); + globalParamsMap.putAll(globalParamsList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue))); + } + return globalParamsMap; + } + + /** + * update task state according to task type + * @param taskType + */ + private void updateTaskState(String taskType) { + // update task status is running + if(taskType.equals(TaskType.SQL.name()) || + taskType.equals(TaskType.PROCEDURE.name())){ + processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + null, + getTaskLogPath(), + taskInstance.getId()); + }else{ + processDao.changeTaskState(ExecutionStatus.RUNNING_EXEUTION, + taskInstance.getStartTime(), + taskInstance.getHost(), + taskInstance.getExecutePath(), + getTaskLogPath(), + taskInstance.getId()); + } + } + + /** + * get task log path + * @return + */ + private String getTaskLogPath() { + return System.getProperty("user.dir") + Constants.SINGLE_SLASH + + "logs" + Constants.SINGLE_SLASH + + taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH + + taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH + + taskInstance.getId() + ".log"; + } + + /** + * set task timeout * @param taskProps * @param taskNode */ private void setTaskTimeout(TaskProps taskProps, TaskNode taskNode) { + // the default timeout is the maximum value of the integer taskProps.setTaskTimeout(Integer.MAX_VALUE); TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter(); if (taskTimeoutParameter.getEnable()){ + // get timeout strategy taskProps.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy()); switch (taskTimeoutParameter.getStrategy()){ case WARN: @@ -298,38 +267,7 @@ public class TaskScheduleThread implements Callable { } - /** - * get current task parameter class - * @return - */ - private Class getCurTaskParamsClass(){ - Class paramsClass = null; - TaskType taskType = TaskType.valueOf(taskInstance.getTaskType()); - switch (taskType){ - case SHELL: - paramsClass = ShellParameters.class; - break; - case SQL: - paramsClass = SqlParameters.class; - break; - case PROCEDURE: - paramsClass = ProcedureParameters.class; - break; - case MR: - paramsClass = MapreduceParameters.class; - break; - case SPARK: - paramsClass = SparkParameters.class; - break; - case PYTHON: - paramsClass = PythonParameters.class; - break; - default: - logger.error("not support this task type: {}", taskType); - throw new IllegalArgumentException("not support this task type"); - } - return paramsClass; - } + /** * kill task @@ -376,9 +314,7 @@ public class TaskScheduleThread implements Callable { File resFile = new File(execLocalPath, res); if (!resFile.exists()) { try { - /** - * query the tenant code of the resource according to the name of the resource - */ + // query the tenant code of the resource according to the name of the resource String tentnCode = processDao.queryTenantCodeByResName(res); String resHdfsPath = HadoopUtils.getHdfsFilename(tentnCode,res); @@ -388,7 +324,6 @@ public class TaskScheduleThread implements Callable { logger.error(e.getMessage(),e); throw new RuntimeException(e.getMessage()); } - } else { logger.info("file : {} exists ", resFile.getName()); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java index 82c9be9f0e..e0aef326fd 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java @@ -67,6 +67,11 @@ public abstract class AbstractCommandExecutor { */ protected final String taskAppId; + /** + * task appId + */ + protected final int taskInstId; + /** * tenant code , execute task linux user */ @@ -99,11 +104,12 @@ public abstract class AbstractCommandExecutor { public AbstractCommandExecutor(Consumer> logHandler, - String taskDir, String taskAppId, String tenantCode, String envFile, + String taskDir, String taskAppId,int taskInstId,String tenantCode, String envFile, Date startTime, int timeout, Logger logger){ this.logHandler = logHandler; this.taskDir = taskDir; this.taskAppId = taskAppId; + this.taskInstId = taskInstId; this.tenantCode = tenantCode; this.envFile = envFile; this.startTime = startTime; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java index 645a314a4c..213f4fd3f9 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractTask.java @@ -16,10 +16,26 @@ */ package cn.escheduler.server.worker.task; +import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.ExecutionStatus; +import cn.escheduler.common.enums.TaskRecordStatus; +import cn.escheduler.common.enums.TaskType; +import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; +import cn.escheduler.common.task.mr.MapreduceParameters; +import cn.escheduler.common.task.procedure.ProcedureParameters; +import cn.escheduler.common.task.python.PythonParameters; +import cn.escheduler.common.task.shell.ShellParameters; +import cn.escheduler.common.task.spark.SparkParameters; +import cn.escheduler.common.task.sql.SqlParameters; +import cn.escheduler.common.utils.JSONUtils; +import cn.escheduler.dao.TaskRecordDao; +import cn.escheduler.server.utils.ParamUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import java.util.List; +import java.util.Map; /** * executive task @@ -70,7 +86,7 @@ public abstract class AbstractTask { public void cancelApplication(boolean status) throws Exception { - cancel = true; + this.cancel = status; } /** @@ -89,6 +105,9 @@ public abstract class AbstractTask { return exitStatusCode; } + public void setExitStatusCode(int exitStatusCode) { + this.exitStatusCode = exitStatusCode; + } /** * get task parameters @@ -96,4 +115,96 @@ public abstract class AbstractTask { public abstract AbstractParameters getParameters(); + /** + * result processing + */ + public void after(){ + if (getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + // task recor flat : if true , start up qianfan + if (TaskRecordDao.getTaskRecordFlag() + && TaskType.typeIsNormalTask(taskProps.getTaskType())){ + AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + setExitStatusCode(Constants.EXIT_CODE_FAILURE); + } + } + } + } + + }else if (getExitStatusCode() == Constants.EXIT_CODE_KILL){ + setExitStatusCode(Constants.EXIT_CODE_KILL); + }else { + setExitStatusCode(Constants.EXIT_CODE_FAILURE); + } + } + + + + + /** + * get current task parameter class + * @return + */ + private Class getCurTaskParamsClass(){ + Class paramsClass = null; + // get task type + TaskType taskType = TaskType.valueOf(taskProps.getTaskType()); + switch (taskType){ + case SHELL: + paramsClass = ShellParameters.class; + break; + case SQL: + paramsClass = SqlParameters.class; + break; + case PROCEDURE: + paramsClass = ProcedureParameters.class; + break; + case MR: + paramsClass = MapreduceParameters.class; + break; + case SPARK: + paramsClass = SparkParameters.class; + break; + case PYTHON: + paramsClass = PythonParameters.class; + break; + default: + logger.error("not support this task type: {}", taskType); + throw new IllegalArgumentException("not support this task type"); + } + return paramsClass; + } + + /** + * get exit status according to exitCode + * @return + */ + public ExecutionStatus getExitStatus(){ + ExecutionStatus status; + switch (getExitStatusCode()){ + case Constants.EXIT_CODE_SUCCESS: + status = ExecutionStatus.SUCCESS; + break; + case Constants.EXIT_CODE_KILL: + status = ExecutionStatus.KILL; + break; + default: + status = ExecutionStatus.FAILURE; + break; + } + return status; + } } \ No newline at end of file diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java index 7559864bab..a981307c48 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractYarnTask.java @@ -38,7 +38,7 @@ public abstract class AbstractYarnTask extends AbstractTask { /** * process task */ - private ShellCommandExecutor processTask; + private ShellCommandExecutor shellCommandExecutor; /** * process database access @@ -53,21 +53,25 @@ public abstract class AbstractYarnTask extends AbstractTask { public AbstractYarnTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); - // find process instance by taskId this.processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); - this.processTask = new ShellCommandExecutor(this::logHandle, - taskProps.getTaskDir(), taskProps.getTaskAppId(), - taskProps.getTenantCode(), taskProps.getEnvFile(), taskProps.getTaskStartTime(), - taskProps.getTaskTimeout(), logger); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, + taskProps.getTaskDir(), + taskProps.getTaskAppId(), + taskProps.getTaskInstId(), + taskProps.getTenantCode(), + taskProps.getEnvFile(), + taskProps.getTaskStartTime(), + taskProps.getTaskTimeout(), + logger); } @Override public void handle() throws Exception { try { // construct process - exitStatusCode = processTask.run(buildCommand(), processDao); + exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao); } catch (Exception e) { - logger.error("yarn process failed : " + e.getMessage(), e); + logger.error("yarn process failure", e); exitStatusCode = -1; } } @@ -76,9 +80,8 @@ public abstract class AbstractYarnTask extends AbstractTask { public void cancelApplication(boolean status) throws Exception { cancel = true; // cancel process - processTask.cancelApplication(); - int taskInstId = taskProps.getTaskInstId(); - TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId); + shellCommandExecutor.cancelApplication(); + TaskInstance taskInstance = processDao.findTaskInstanceById(taskProps.getTaskInstId()); if (status && taskInstance != null){ ProcessUtils.killYarnJob(taskInstance); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java index e1df0b71a8..09a6add48f 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java @@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task; import cn.escheduler.common.Constants; import cn.escheduler.common.utils.FileUtils; -import cn.escheduler.common.utils.PropertyUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +42,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { public PythonCommandExecutor(Consumer> logHandler, - String taskDir, String taskAppId, String tenantCode, String envFile, - Date startTime, int timeout, Logger logger) { - super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger); + String taskDir, + String taskAppId, + int taskInstId, + String tenantCode, + String envFile, + Date startTime, + int timeout, + Logger logger) { + super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger); } @@ -67,7 +72,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { */ @Override protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { - logger.info("tenant :{}, work dir:{}", tenantCode, taskDir); + logger.info("tenantCode :{}, task dir:{}", tenantCode, taskDir); if (!Files.exists(Paths.get(commandFile))) { logger.info("generate command file:{}", commandFile); @@ -80,16 +85,15 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { logger.info(sb.toString()); // write data to file - FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8); + FileUtils.writeStringToFile(new File(commandFile), + sb.toString(), + StandardCharsets.UTF_8); } } @Override protected String commandType() { - - String envPath = PropertyUtils.getString(Constants.ESCHEDULER_ENV_PATH); - - String pythonHome = getPythonHome(envPath); + String pythonHome = getPythonHome(envFile); if (StringUtils.isEmpty(pythonHome)){ return PYTHON; } @@ -108,16 +112,25 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { /** - * get python home + * get the absolute path of the Python command + * note : + * common.properties + * PYTHON_HOME configured under common.properties is Python absolute path, not PYTHON_HOME itself + * + * for example : + * your PYTHON_HOM is /opt/python3.7/ + * you must set PYTHON_HOME is /opt/python3.7/python under nder common.properties + * escheduler.env.path file. + * * @param envPath * @return */ private static String getPythonHome(String envPath){ BufferedReader br = null; - String line = null; StringBuilder sb = new StringBuilder(); try { br = new BufferedReader(new InputStreamReader(new FileInputStream(envPath))); + String line; while ((line = br.readLine()) != null){ if (line.contains(Constants.PYTHON_HOME)){ sb.append(line); @@ -128,13 +141,13 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { if (org.apache.commons.lang.StringUtils.isEmpty(result)){ return null; } - String[] arrs = result.split("="); + String[] arrs = result.split(Constants.EQUAL_SIGN); if (arrs.length == 2){ return arrs[1]; } }catch (IOException e){ - logger.error("read file failed : " + e.getMessage(),e); + logger.error("read file failure",e); }finally { try { if (br != null){ diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java index b5e803ae80..68e36b704a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java @@ -29,9 +29,7 @@ import java.util.List; import java.util.function.Consumer; /** - * command executor - * - * 进程,真正在worker服务器上执行的任务 + * shell command executor */ public class ShellCommandExecutor extends AbstractCommandExecutor { @@ -39,9 +37,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { public ShellCommandExecutor(Consumer> logHandler, - String taskDir, String taskAppId, String tenantCode, String envFile, - Date startTime, int timeout, Logger logger) { - super(logHandler,taskDir,taskAppId, tenantCode, envFile, startTime, timeout, logger); + String taskDir, + String taskAppId, + int taskInstId, + String tenantCode, + String envFile, + Date startTime, + int timeout, + Logger logger) { + super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java index 053b5bed24..0db35e8d0a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/TaskProps.java @@ -16,6 +16,7 @@ */ package cn.escheduler.server.worker.task; +import cn.escheduler.common.enums.CommandType; import cn.escheduler.common.enums.DataType; import cn.escheduler.common.enums.Direct; import cn.escheduler.common.enums.TaskTimeoutStrategy; @@ -46,6 +47,8 @@ public class TaskProps { **/ private String tenantCode; + private String taskType; + /** * task parameters **/ @@ -101,6 +104,41 @@ public class TaskProps { */ private Date scheduleTime; + /** + * command type is complement + */ + private CommandType cmdTypeIfComplement; + + + public TaskProps(){} + public TaskProps(String taskParams, + String taskDir, + Date scheduleTime, + String nodeName, + String taskType, + int taskInstId, + String envFile, + String tenantCode, + String queue, + Date taskStartTime, + Map definedParams, + String dependence, + CommandType cmdTypeIfComplement){ + this.taskParams = taskParams; + this.taskDir = taskDir; + this.scheduleTime = scheduleTime; + this.nodeName = nodeName; + this.taskType = taskType; + this.taskInstId = taskInstId; + this.envFile = envFile; + this.tenantCode = tenantCode; + this.queue = queue; + this.taskStartTime = taskStartTime; + this.definedParams = definedParams; + this.dependence = dependence; + this.cmdTypeIfComplement = cmdTypeIfComplement; + + } public String getTenantCode() { return tenantCode; @@ -200,22 +238,12 @@ public class TaskProps { this.taskTimeoutStrategy = taskTimeoutStrategy; } - /** - * get parameters map - * @return - */ - public Map getUserDefParamsMap() { - if (definedParams != null) { - Map userDefParamsMaps = new HashMap<>(); - Iterator> iter = definedParams.entrySet().iterator(); - while (iter.hasNext()){ - Map.Entry en = iter.next(); - Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue()); - userDefParamsMaps.put(property.getProp(),property); - } - return userDefParamsMaps; - } - return null; + public String getTaskType() { + return taskType; + } + + public void setTaskType(String taskType) { + this.taskType = taskType; } public String getDependence() { @@ -233,4 +261,30 @@ public class TaskProps { public void setScheduleTime(Date scheduleTime) { this.scheduleTime = scheduleTime; } + + public CommandType getCmdTypeIfComplement() { + return cmdTypeIfComplement; + } + + public void setCmdTypeIfComplement(CommandType cmdTypeIfComplement) { + this.cmdTypeIfComplement = cmdTypeIfComplement; + } + + /** + * get parameters map + * @return + */ + public Map getUserDefParamsMap() { + if (definedParams != null) { + Map userDefParamsMaps = new HashMap<>(); + Iterator> iter = definedParams.entrySet().iterator(); + while (iter.hasNext()){ + Map.Entry en = iter.next(); + Property property = new Property(en.getKey(), Direct.IN, DataType.VARCHAR , en.getValue()); + userDefParamsMaps.put(property.getProp(),property); + } + return userDefParamsMaps; + } + return null; + } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java index 1e46e3e38d..7d115add8c 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentExecute.java @@ -208,6 +208,4 @@ public class DependentExecute { return dependResultMap; } - - } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java index 21e596f55f..13f00f9264 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/dependent/DependentTask.java @@ -104,7 +104,7 @@ public class DependentTask extends AbstractTask { Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE; } }catch (Exception e){ - logger.error("Exception " + e); + logger.error(e.getMessage(),e); exitStatusCode = -1; } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java index 971ecd0b6d..9650e55e26 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/mr/MapReduceTask.java @@ -70,8 +70,8 @@ public class MapReduceTask extends AbstractYarnTask { Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), mapreduceParameters.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); if (paramsMap != null){ String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); mapreduceParameters.setMainArgs(args); @@ -86,7 +86,8 @@ public class MapReduceTask extends AbstractYarnTask { protected String buildCommand() throws Exception { List parameterList = buildParameters(mapreduceParameters); - String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList), taskProps.getDefinedParams()); + String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", parameterList), + taskProps.getDefinedParams()); logger.info("mapreduce task command: {}", command); return command; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java index 98428c5389..c4803d91c0 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/processdure/ProcedureTask.java @@ -21,12 +21,7 @@ import cn.escheduler.common.enums.DataType; import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.Direct; import cn.escheduler.common.enums.TaskTimeoutStrategy; -import cn.escheduler.common.job.db.BaseDataSource; -import cn.escheduler.common.job.db.ClickHouseDataSource; -import cn.escheduler.common.job.db.MySQLDataSource; -import cn.escheduler.common.job.db.OracleDataSource; -import cn.escheduler.common.job.db.PostgreDataSource; -import cn.escheduler.common.job.db.SQLServerDataSource; +import cn.escheduler.common.job.db.*; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.procedure.ProcedureParameters; @@ -49,6 +44,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import static cn.escheduler.common.enums.DataType.*; + /** * procedure task */ @@ -64,6 +61,11 @@ public class ProcedureTask extends AbstractTask { */ private ProcessDao processDao; + /** + * base datasource + */ + private BaseDataSource baseDataSource; + public ProcedureTask(TaskProps taskProps, Logger logger) { super(taskProps, logger); @@ -93,176 +95,181 @@ public class ProcedureTask extends AbstractTask { // determine whether there is a data source if (procedureParameters.getDatasource() == 0){ - logger.error("datasource is null"); - exitStatusCode = 0; - }else { + logger.error("datasource id not exists"); + exitStatusCode = -1; + return; + } - DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource()); - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", - dataSource.getName(),dataSource.getType(),dataSource.getNote(), - dataSource.getUserId(),dataSource.getConnectionParams()); + DataSource dataSource = processDao.findDataSourceById(procedureParameters.getDatasource()); + logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", + dataSource.getName(), + dataSource.getType(), + dataSource.getNote(), + dataSource.getUserId(), + dataSource.getConnectionParams()); + + if (dataSource == null){ + logger.error("datasource not exists"); + exitStatusCode = -1; + return; + } + Connection connection = null; + CallableStatement stmt = null; + try { + // load class + DataSourceFactory.loadClass(dataSource.getType()); + // get datasource + baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); - if (dataSource != null){ - Connection connection = null; - CallableStatement stmt = null; - try { - BaseDataSource baseDataSource = null; - - if (DbType.MYSQL.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class); - Class.forName(Constants.JDBC_MYSQL_CLASS_NAME); - }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class); - Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME); - }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){ - // NOTE: currently, ClickHouse don't support procedure or UDF yet, - // but still load JDBC driver to keep source code sync with other DB - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class); - Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME); - }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), OracleDataSource.class); - Class.forName(Constants.JDBC_ORACLE_CLASS_NAME); - }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(), SQLServerDataSource.class); - Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME); - } + // get jdbc connection + connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), + baseDataSource.getUser(), + baseDataSource.getPassword()); - // get jdbc connection - connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), - baseDataSource.getUser(), - baseDataSource.getPassword()); - // get process instance by task instance id - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); - // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), - taskProps.getDefinedParams(), - procedureParameters.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + procedureParameters.getLocalParametersMap(), + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); - Collection userDefParamsList = null; + Collection userDefParamsList = null; - if (procedureParameters.getLocalParametersMap() != null){ - userDefParamsList = procedureParameters.getLocalParametersMap().values(); - } + if (procedureParameters.getLocalParametersMap() != null){ + userDefParamsList = procedureParameters.getLocalParametersMap().values(); + } - String method = ""; - // no parameters - if (CollectionUtils.isEmpty(userDefParamsList)){ - method = "{call " + procedureParameters.getMethod() + "}"; - }else { // exists parameters - int size = userDefParamsList.size(); - StringBuilder parameter = new StringBuilder(); - parameter.append("("); - for (int i = 0 ;i < size - 1; i++){ - parameter.append("?,"); - } - parameter.append("?)"); - method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}"; - } + String method = ""; + // no parameters + if (CollectionUtils.isEmpty(userDefParamsList)){ + method = "{call " + procedureParameters.getMethod() + "}"; + }else { // exists parameters + int size = userDefParamsList.size(); + StringBuilder parameter = new StringBuilder(); + parameter.append("("); + for (int i = 0 ;i < size - 1; i++){ + parameter.append("?,"); + } + parameter.append("?)"); + method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}"; + } - logger.info("call method : {}",method); - // call method - stmt = connection.prepareCall(method); - if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){ - stmt.setQueryTimeout(taskProps.getTaskTimeout()); - } - Map outParameterMap = new HashMap<>(); - if (userDefParamsList != null && userDefParamsList.size() > 0){ - int index = 1; - for (Property property : userDefParamsList){ - logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" - ,property.getProp(), - property.getDirect(), - property.getType(), - property.getValue()); - // set parameters - if (property.getDirect().equals(Direct.IN)){ - ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); - }else if (property.getDirect().equals(Direct.OUT)){ - setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); - property.setValue(paramsMap.get(property.getProp()).getValue()); - outParameterMap.put(index,property); - } - index++; - } + logger.info("call method : {}",method); + // call method + stmt = connection.prepareCall(method); + if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){ + stmt.setQueryTimeout(taskProps.getTaskTimeout()); + } + Map outParameterMap = new HashMap<>(); + if (userDefParamsList != null && userDefParamsList.size() > 0){ + int index = 1; + for (Property property : userDefParamsList){ + logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}" + ,property.getProp(), + property.getDirect(), + property.getType(), + property.getValue()); + // set parameters + if (property.getDirect().equals(Direct.IN)){ + ParameterUtils.setInParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); + }else if (property.getDirect().equals(Direct.OUT)){ + setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue()); + property.setValue(paramsMap.get(property.getProp()).getValue()); + outParameterMap.put(index,property); } + index++; + } + } - stmt.executeUpdate(); - - /** - * print the output parameters to the log - */ - Iterator> iter = outParameterMap.entrySet().iterator(); - while (iter.hasNext()){ - Map.Entry en = iter.next(); - - int index = en.getKey(); - Property property = en.getValue(); - String prop = property.getProp(); - DataType dataType = property.getType(); - - if (dataType.equals(DataType.VARCHAR)){ - String value = stmt.getString(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.INTEGER)){ - int value = stmt.getInt(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.LONG)){ - long value = stmt.getLong(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.FLOAT)){ - float value = stmt.getFloat(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.DOUBLE)){ - double value = stmt.getDouble(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.DATE)){ - Date value = stmt.getDate(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.TIME)){ - Time value = stmt.getTime(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.TIMESTAMP)){ - Timestamp value = stmt.getTimestamp(index); - logger.info("out prameter key : {} , value : {}",prop,value); - }else if (dataType.equals(DataType.BOOLEAN)){ - boolean value = stmt.getBoolean(index); - logger.info("out prameter key : {} , value : {}",prop,value); - } - } + stmt.executeUpdate(); + + /** + * print the output parameters to the log + */ + Iterator> iter = outParameterMap.entrySet().iterator(); + while (iter.hasNext()){ + Map.Entry en = iter.next(); + + int index = en.getKey(); + Property property = en.getValue(); + String prop = property.getProp(); + DataType dataType = property.getType(); + // get output parameter + getOutputParameter(stmt, index, prop, dataType); + } - exitStatusCode = 0; - }catch (Exception e){ - logger.error(e.getMessage(),e); + exitStatusCode = 0; + }catch (Exception e){ + logger.error(e.getMessage(),e); + exitStatusCode = -1; + throw new RuntimeException(String.format("process interrupted. exit status code is %d",exitStatusCode)); + } + finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { exitStatusCode = -1; - throw new RuntimeException("process interrupted. exit status code is : " + exitStatusCode); + logger.error(e.getMessage(),e); } - finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - exitStatusCode = -1; - logger.error(e.getMessage(),e); - } - } - if (connection != null) { - try { - connection.close(); - } catch (SQLException e) { - exitStatusCode = -1; - logger.error(e.getMessage(), e); - } - } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + exitStatusCode = -1; + logger.error(e.getMessage(), e); } } } } + /** + * get output parameter + * @param stmt + * @param index + * @param prop + * @param dataType + * @throws SQLException + */ + private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException { + switch (dataType){ + case VARCHAR: + logger.info("out prameter key : {} , value : {}",prop,stmt.getString(index)); + break; + case INTEGER: + logger.info("out prameter key : {} , value : {}", prop, stmt.getInt(index)); + break; + case LONG: + logger.info("out prameter key : {} , value : {}",prop,stmt.getLong(index)); + break; + case FLOAT: + logger.info("out prameter key : {} , value : {}",prop,stmt.getFloat(index)); + break; + case DOUBLE: + logger.info("out prameter key : {} , value : {}",prop,stmt.getDouble(index)); + break; + case DATE: + logger.info("out prameter key : {} , value : {}",prop,stmt.getDate(index)); + break; + case TIME: + logger.info("out prameter key : {} , value : {}",prop,stmt.getTime(index)); + break; + case TIMESTAMP: + logger.info("out prameter key : {} , value : {}",prop,stmt.getTimestamp(index)); + break; + case BOOLEAN: + logger.info("out prameter key : {} , value : {}",prop, stmt.getBoolean(index)); + break; + default: + break; + } + } + @Override public AbstractParameters getParameters() { return procedureParameters; @@ -277,61 +284,61 @@ public class ProcedureTask extends AbstractTask { * @throws Exception */ private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{ - if (dataType.equals(DataType.VARCHAR)){ + if (dataType.equals(VARCHAR)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.VARCHAR); }else { stmt.registerOutParameter(index, Types.VARCHAR, value); } - }else if (dataType.equals(DataType.INTEGER)){ + }else if (dataType.equals(INTEGER)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.INTEGER); }else { stmt.registerOutParameter(index, Types.INTEGER, value); } - }else if (dataType.equals(DataType.LONG)){ + }else if (dataType.equals(LONG)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index,Types.INTEGER); }else { stmt.registerOutParameter(index,Types.INTEGER ,value); } - }else if (dataType.equals(DataType.FLOAT)){ + }else if (dataType.equals(FLOAT)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.FLOAT); }else { stmt.registerOutParameter(index, Types.FLOAT,value); } - }else if (dataType.equals(DataType.DOUBLE)){ + }else if (dataType.equals(DOUBLE)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.DOUBLE); }else { stmt.registerOutParameter(index, Types.DOUBLE , value); } - }else if (dataType.equals(DataType.DATE)){ + }else if (dataType.equals(DATE)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.DATE); }else { stmt.registerOutParameter(index, Types.DATE , value); } - }else if (dataType.equals(DataType.TIME)){ + }else if (dataType.equals(TIME)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.TIME); }else { stmt.registerOutParameter(index, Types.TIME , value); } - }else if (dataType.equals(DataType.TIMESTAMP)){ + }else if (dataType.equals(TIMESTAMP)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.TIMESTAMP); }else { stmt.registerOutParameter(index, Types.TIMESTAMP , value); } - }else if (dataType.equals(DataType.BOOLEAN)){ + }else if (dataType.equals(BOOLEAN)){ if (StringUtils.isEmpty(value)){ stmt.registerOutParameter(index, Types.BOOLEAN); }else { diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java index c446215a38..cb2bf7d27e 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/python/PythonTask.java @@ -20,27 +20,18 @@ package cn.escheduler.server.worker.task.python; import cn.escheduler.common.process.Property; import cn.escheduler.common.task.AbstractParameters; import cn.escheduler.common.task.python.PythonParameters; -import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ProcessDao; -import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.PythonCommandExecutor; import cn.escheduler.server.worker.task.TaskProps; import org.slf4j.Logger; -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; + import java.util.Map; -import java.util.Set; /** * python task @@ -57,7 +48,10 @@ public class PythonTask extends AbstractTask { */ private String taskDir; - private PythonCommandExecutor pythonProcessTask; + /** + * python command executor + */ + private PythonCommandExecutor pythonCommandExecutor; /** * process database access @@ -70,10 +64,15 @@ public class PythonTask extends AbstractTask { this.taskDir = taskProps.getTaskDir(); - this.pythonProcessTask = new PythonCommandExecutor(this::logHandle, - taskProps.getTaskDir(), taskProps.getTaskAppId(), - taskProps.getTenantCode(), null, taskProps.getTaskStartTime(), - taskProps.getTaskTimeout(), logger); + this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, + taskProps.getTaskDir(), + taskProps.getTaskAppId(), + taskProps.getTaskInstId(), + taskProps.getTenantCode(), + taskProps.getEnvFile(), + taskProps.getTaskStartTime(), + taskProps.getTaskTimeout(), + logger); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } @@ -92,9 +91,9 @@ public class PythonTask extends AbstractTask { public void handle() throws Exception { try { // construct process - exitStatusCode = pythonProcessTask.run(buildCommand(), processDao); + exitStatusCode = pythonCommandExecutor.run(buildCommand(), processDao); } catch (Exception e) { - logger.error("python process exception", e); + logger.error("python task failure", e); exitStatusCode = -1; } } @@ -102,7 +101,7 @@ public class PythonTask extends AbstractTask { @Override public void cancelApplication(boolean cancelApplication) throws Exception { // cancel process - pythonProcessTask.cancelApplication(); + pythonCommandExecutor.cancelApplication(); } /** @@ -111,21 +110,7 @@ public class PythonTask extends AbstractTask { * @throws Exception */ private String buildCommand() throws Exception { - // generate scripts -// String fileName = String.format("%s/py_%s_node.py", taskDir, taskProps.getTaskAppId()); -// Path path = new File(fileName).toPath(); - - - -// if (Files.exists(path)) { -// return fileName; -// } - - String rawScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); - - - // find process instance by task id - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); /** * combining local and global parameters @@ -133,27 +118,16 @@ public class PythonTask extends AbstractTask { Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), pythonParameters.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); if (paramsMap != null){ - rawScript = ParameterUtils.convertParameterPlaceholders(rawScript, ParamUtils.convert(paramsMap)); + rawPythonScript = ParameterUtils.convertParameterPlaceholders(rawPythonScript, ParamUtils.convert(paramsMap)); } - -// pythonParameters.setRawScript(rawScript); - - logger.info("raw script : {}", pythonParameters.getRawScript()); + logger.info("raw python script : {}", pythonParameters.getRawScript()); logger.info("task dir : {}", taskDir); -// Set perms = PosixFilePermissions.fromString("rwxr-xr-x"); -// FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); -// -// Files.createFile(path, attr); -// -// Files.write(path, pythonParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); -// -// return fileName; - return rawScript; + return rawPythonScript; } @Override diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java index b8564e8f95..12ff021d03 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/shell/ShellTask.java @@ -54,7 +54,7 @@ public class ShellTask extends AbstractTask { */ private String taskDir; - private ShellCommandExecutor processTask; + private ShellCommandExecutor shellCommandExecutor; /** * process database access @@ -62,15 +62,19 @@ public class ShellTask extends AbstractTask { private ProcessDao processDao; - public ShellTask(TaskProps props, Logger logger) { - super(props, logger); + public ShellTask(TaskProps taskProps, Logger logger) { + super(taskProps, logger); - this.taskDir = props.getTaskDir(); + this.taskDir = taskProps.getTaskDir(); - this.processTask = new ShellCommandExecutor(this::logHandle, - props.getTaskDir(), props.getTaskAppId(), - props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), - props.getTaskTimeout(), logger); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), + taskProps.getTaskAppId(), + taskProps.getTaskInstId(), + taskProps.getTenantCode(), + taskProps.getEnvFile(), + taskProps.getTaskStartTime(), + taskProps.getTaskTimeout(), + logger); this.processDao = DaoFactory.getDaoInstance(ProcessDao.class); } @@ -89,9 +93,9 @@ public class ShellTask extends AbstractTask { public void handle() throws Exception { try { // construct process - exitStatusCode = processTask.run(buildCommand(), processDao); + exitStatusCode = shellCommandExecutor.run(buildCommand(), processDao); } catch (Exception e) { - logger.error(e.getMessage(), e); + logger.error("shell task failure", e); exitStatusCode = -1; } } @@ -99,7 +103,7 @@ public class ShellTask extends AbstractTask { @Override public void cancelApplication(boolean cancelApplication) throws Exception { // cancel process - processTask.cancelApplication(); + shellCommandExecutor.cancelApplication(); } /** @@ -118,8 +122,6 @@ public class ShellTask extends AbstractTask { String script = shellParameters.getRawScript().replaceAll("\\r\\n", "\n"); - // find process instance by task id - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); /** * combining local and global parameters @@ -127,8 +129,8 @@ public class ShellTask extends AbstractTask { Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), shellParameters.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); if (paramsMap != null){ script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java index 4764a96a0c..d4acf70b2c 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/spark/SparkTask.java @@ -66,8 +66,6 @@ public class SparkTask extends AbstractYarnTask { if (StringUtils.isNotEmpty(sparkParameters.getMainArgs())) { String args = sparkParameters.getMainArgs(); - // get process instance by task instance id - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); /** * combining local and global parameters @@ -75,8 +73,8 @@ public class SparkTask extends AbstractYarnTask { Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), sparkParameters.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); if (paramsMap != null ){ args = ParameterUtils.convertParameterPlaceholders(args, ParamUtils.convert(paramsMap)); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index 09f6467aad..d9a8274520 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -18,7 +18,6 @@ package cn.escheduler.server.worker.task.sql; import cn.escheduler.alert.utils.MailUtils; import cn.escheduler.common.Constants; -import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.ShowType; import cn.escheduler.common.enums.TaskTimeoutStrategy; import cn.escheduler.common.enums.UdfType; @@ -44,8 +43,6 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.EnumUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import java.sql.*; @@ -54,7 +51,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static cn.escheduler.common.utils.PropertyUtils.getString; +import static cn.escheduler.common.Constants.*; +import static cn.escheduler.common.enums.DbType.*; /** * sql task @@ -76,12 +74,22 @@ public class SqlTask extends AbstractTask { */ private AlertDao alertDao; + /** + * datasource + */ + private DataSource dataSource; + + /** + * base datasource + */ + private BaseDataSource baseDataSource; + - public SqlTask(TaskProps props, Logger logger) { - super(props, logger); + public SqlTask(TaskProps taskProps, Logger logger) { + super(taskProps, logger); logger.info("sql task params {}", taskProps.getTaskParams()); - this.sqlParameters = JSONObject.parseObject(props.getTaskParams(), SqlParameters.class); + this.sqlParameters = JSONObject.parseObject(taskProps.getTaskParams(), SqlParameters.class); if (!sqlParameters.checkParameters()) { throw new RuntimeException("sql task params is not valid"); @@ -97,75 +105,73 @@ public class SqlTask extends AbstractTask { Thread.currentThread().setName(threadLoggerInfoName); logger.info(sqlParameters.toString()); logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}", - sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(), - sqlParameters.getLocalParams(), sqlParameters.getUdfs(), sqlParameters.getShowType(), sqlParameters.getConnParams()); - - // determine whether there is a data source + sqlParameters.getType(), + sqlParameters.getDatasource(), + sqlParameters.getSql(), + sqlParameters.getLocalParams(), + sqlParameters.getUdfs(), + sqlParameters.getShowType(), + sqlParameters.getConnParams()); + + // not set data source if (sqlParameters.getDatasource() == 0){ - logger.error("datasource is null"); + logger.error("datasource id not exists"); exitStatusCode = -1; - }else { - List createFuncs = null; - DataSource dataSource = processDao.findDataSourceById(sqlParameters.getDatasource()); - logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", - dataSource.getName(),dataSource.getType(),dataSource.getNote(), - dataSource.getUserId(),dataSource.getConnectionParams()); - - if (dataSource != null){ - Connection con = null; - try { - BaseDataSource baseDataSource = null; - if (DbType.MYSQL.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),MySQLDataSource.class); - Class.forName(Constants.JDBC_MYSQL_CLASS_NAME); - }else if (DbType.POSTGRESQL.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),PostgreDataSource.class); - Class.forName(Constants.JDBC_POSTGRESQL_CLASS_NAME); - }else if (DbType.HIVE.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),HiveDataSource.class); - Class.forName(Constants.JDBC_HIVE_CLASS_NAME); - }else if (DbType.SPARK.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SparkDataSource.class); - Class.forName(Constants.JDBC_SPARK_CLASS_NAME); - }else if (DbType.CLICKHOUSE.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),ClickHouseDataSource.class); - Class.forName(Constants.JDBC_CLICKHOUSE_CLASS_NAME); - }else if (DbType.ORACLE.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),OracleDataSource.class); - Class.forName(Constants.JDBC_ORACLE_CLASS_NAME); - }else if (DbType.SQLSERVER.name().equals(dataSource.getType().name())){ - baseDataSource = JSONObject.parseObject(dataSource.getConnectionParams(),SQLServerDataSource.class); - Class.forName(Constants.JDBC_SQLSERVER_CLASS_NAME); - } + return; + } + dataSource= processDao.findDataSourceById(sqlParameters.getDatasource()); + logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}", + dataSource.getName(), + dataSource.getType(), + dataSource.getNote(), + dataSource.getUserId(), + dataSource.getConnectionParams()); - // ready to execute SQL and parameter entity Map - SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); - List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()).orElse(new ArrayList<>()) - .stream() - .map(this::getSqlAndSqlParamsMap) - .collect(Collectors.toList()); - List postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()).orElse(new ArrayList<>()) - .stream() - .map(this::getSqlAndSqlParamsMap) - .collect(Collectors.toList()); - - if(EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) && StringUtils.isNotEmpty(sqlParameters.getUdfs())){ - List udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs()); - createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger); - } + if (dataSource == null){ + logger.error("datasource not exists"); + exitStatusCode = -1; + return; + } - // execute sql task - con = executeFuncAndSql(baseDataSource, mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + Connection con = null; + List createFuncs = null; + try { + // load class + DataSourceFactory.loadClass(dataSource.getType()); + // get datasource + baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); + + // ready to execute SQL and parameter entity Map + SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql()); + List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()) + .orElse(new ArrayList<>()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); + List postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPostStatements()) + .orElse(new ArrayList<>()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); + + // determine if it is UDF + boolean udfTypeFlag = EnumUtils.isValidEnum(UdfType.class, sqlParameters.getType()) + && StringUtils.isNotEmpty(sqlParameters.getUdfs()); + if(udfTypeFlag){ + List udfFuncList = processDao.queryUdfFunListByids(sqlParameters.getUdfs()); + createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger); + } - } finally { - if (con != null) { - try { - con.close(); - } catch (SQLException e) { - throw e; - } - } + // execute sql task + con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); + } finally { + if (con != null) { + try { + con.close(); + } catch (SQLException e) { + throw e; } } } @@ -180,13 +186,13 @@ public class SqlTask extends AbstractTask { StringBuilder sqlBuilder = new StringBuilder(); // find process instance by task id - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), taskProps.getDefinedParams(), sqlParameters.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); // spell SQL according to the final user-defined variable if(paramsMap == null){ @@ -195,14 +201,15 @@ public class SqlTask extends AbstractTask { } if (StringUtils.isNotEmpty(sqlParameters.getTitle())){ - String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), ParamUtils.convert(paramsMap)); - logger.info(title); + String title = ParameterUtils.convertParameterPlaceholders(sqlParameters.getTitle(), + ParamUtils.convert(paramsMap)); + logger.info("SQL tile : {}",title); sqlParameters.setTitle(title); } // special characters need to be escaped, ${} needs to be escaped String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; - setSqlParamsMap(sql,rgex,sqlParamsMap,paramsMap); + setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap); // replace the ${} of the SQL statement with the Placeholder String formatSql = sql.replaceAll(rgex,"?"); @@ -219,47 +226,45 @@ public class SqlTask extends AbstractTask { } /** - * execute sql - * @param baseDataSource + * execute sql * @param mainSqlBinds * @param preStatementsBinds * @param postStatementsBinds * @param createFuncs + * @return */ - public Connection executeFuncAndSql(BaseDataSource baseDataSource, - SqlBinds mainSqlBinds, + public Connection executeFuncAndSql(SqlBinds mainSqlBinds, List preStatementsBinds, List postStatementsBinds, List createFuncs){ Connection connection = null; try { - if (CommonUtils.getKerberosStartupState()) { - System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, - getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); - Configuration configuration = new Configuration(); - configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); - UserGroupInformation.setConfiguration(configuration); - UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME), - getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH)); - } - if (DbType.HIVE.name().equals(sqlParameters.getType())) { + // if upload resource is HDFS and kerberos startup + CommonUtils.loadKerberosConf(); + + // if hive , load connection params if exists + if (HIVE == dataSource.getType()) { Properties paramProp = new Properties(); - paramProp.setProperty("user", baseDataSource.getUser()); - paramProp.setProperty("password", baseDataSource.getPassword()); - Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), Constants.SEMICOLON,"hiveconf:"); + paramProp.setProperty(USER, baseDataSource.getUser()); + paramProp.setProperty(PASSWORD, baseDataSource.getPassword()); + Map connParamMap = CollectionUtils.stringToMap(sqlParameters.getConnParams(), + SEMICOLON, + HIVE_CONF); if(connParamMap != null){ paramProp.putAll(connParamMap); } - connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),paramProp); + connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), + paramProp); }else{ connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), - baseDataSource.getUser(), baseDataSource.getPassword()); + baseDataSource.getUser(), + baseDataSource.getPassword()); } // create temp function if (CollectionUtils.isNotEmpty(createFuncs)) { - try (Statement funcStmt = connection.createStatement()) { + try (Statement funcStmt = connection.createStatement()) { for (String createFunc : createFuncs) { logger.info("hive create function sql: {}", createFunc); funcStmt.execute(createFunc); @@ -270,7 +275,7 @@ public class SqlTask extends AbstractTask { for (SqlBinds sqlBind: preStatementsBinds) { try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { int result = stmt.executeUpdate(); - logger.info("pre statement execute result: " + result + ", for sql: " + sqlBind.getSql()); + logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql()); } } @@ -278,7 +283,7 @@ public class SqlTask extends AbstractTask { // decide whether to executeQuery or executeUpdate based on sqlType if (sqlParameters.getSqlType() == SqlType.QUERY.ordinal()) { // query statements need to be convert to JsonArray and inserted into Alert to send - JSONArray array = new JSONArray(); + JSONArray resultJSONArray = new JSONArray(); ResultSet resultSet = stmt.executeQuery(); ResultSetMetaData md = resultSet.getMetaData(); int num = md.getColumnCount(); @@ -288,21 +293,19 @@ public class SqlTask extends AbstractTask { for (int i = 1; i <= num; i++) { mapOfColValues.put(md.getColumnName(i), resultSet.getObject(i)); } - array.add(mapOfColValues); + resultJSONArray.add(mapOfColValues); } - logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); - - // send as an attachment - if (StringUtils.isEmpty(sqlParameters.getShowType())) { - logger.info("showType is empty,don't need send email"); - } else { - if (array.size() > 0) { - if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { - sendAttachment(sqlParameters.getTitle(), JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); - }else{ - sendAttachment(taskProps.getNodeName() + " query resultsets ", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); - } + logger.debug("execute sql : {}", JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); + + // if there is a result set + if (resultJSONArray.size() > 0) { + if (StringUtils.isNotEmpty(sqlParameters.getTitle())) { + sendAttachment(sqlParameters.getTitle(), + JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); + }else{ + sendAttachment(taskProps.getNodeName() + " query resultsets ", + JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); } } @@ -310,7 +313,7 @@ public class SqlTask extends AbstractTask { } else if (sqlParameters.getSqlType() == SqlType.NON_QUERY.ordinal()) { // non query statement - int result = stmt.executeUpdate(); + stmt.executeUpdate(); exitStatusCode = 0; } } @@ -318,7 +321,7 @@ public class SqlTask extends AbstractTask { for (SqlBinds sqlBind: postStatementsBinds) { try (PreparedStatement stmt = prepareStatementAndBind(connection, sqlBind)) { int result = stmt.executeUpdate(); - logger.info("post statement execute result: " + result + ", for sql: " + sqlBind.getSql()); + logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql()); } } } catch (Exception e) { @@ -328,9 +331,19 @@ public class SqlTask extends AbstractTask { return connection; } + /** + * preparedStatement bind + * @param connection + * @param sqlBinds + * @return + * @throws Exception + */ private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception { PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql()); - if(taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED){ + // is the timeout set + boolean timeoutFlag = taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.FAILED || + taskProps.getTaskTimeoutStrategy() == TaskTimeoutStrategy.WARNFAILED; + if(timeoutFlag){ stmt.setQueryTimeout(taskProps.getTaskTimeout()); } Map params = sqlBinds.getParamsMap(); @@ -340,7 +353,7 @@ public class SqlTask extends AbstractTask { ParameterUtils.setInParameter(key,stmt,prop.getType(),prop.getValue()); } } - logger.info("prepare statement replace sql:{}",stmt.toString()); + logger.info("prepare statement replace sql : {} ",stmt.toString()); return stmt; } @@ -354,9 +367,6 @@ public class SqlTask extends AbstractTask { // process instance ProcessInstance instance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); - // process define - ProcessDefinition processDefine = processDao.findProcessDefineById(instance.getProcessDefinitionId()); - List users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId()); // receiving group list @@ -367,7 +377,7 @@ public class SqlTask extends AbstractTask { // custom receiver String receivers = sqlParameters.getReceivers(); if (StringUtils.isNotEmpty(receivers)){ - String[] splits = receivers.split(Constants.COMMA); + String[] splits = receivers.split(COMMA); for (String receiver : splits){ receviersList.add(receiver.trim()); } @@ -378,16 +388,17 @@ public class SqlTask extends AbstractTask { // Custom Copier String receiversCc = sqlParameters.getReceiversCc(); if (StringUtils.isNotEmpty(receiversCc)){ - String[] splits = receiversCc.split(Constants.COMMA); + String[] splits = receiversCc.split(COMMA); for (String receiverCc : splits){ receviersCcList.add(receiverCc.trim()); } } - String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); + String showTypeName = sqlParameters.getShowType().replace(COMMA,"").trim(); if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ - Map mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName)); - if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){ + Map mailResult = MailUtils.sendMails(receviersList, + receviersCcList, title, content, ShowType.valueOf(showTypeName)); + if(!(Boolean) mailResult.get(STATUS)){ throw new RuntimeException("send mail failed!"); } }else{ @@ -425,7 +436,7 @@ public class SqlTask extends AbstractTask { public void printReplacedSql(String content, String formatSql,String rgex, Map sqlParamsMap){ //parameter print style logger.info("after replace sql , preparing : {}" , formatSql); - StringBuffer logPrint = new StringBuffer("replaced sql , parameters:"); + StringBuilder logPrint = new StringBuilder("replaced sql , parameters:"); for(int i=1;i<=sqlParamsMap.size();i++){ logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")"); }