From 9c9c940f4858782700e79e587009ec8497bab1d7 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Tue, 2 Jul 2019 11:10:44 +0800 Subject: [PATCH 1/8] sql task add kerbos auth --- .../api/controller/DataSourceController.java | 3 ++- .../api/service/DataSourceService.java | 5 +++-- .../cn/escheduler/api/utils/CheckUtils.java | 12 ------------ .../cn/escheduler/common/utils/CommonUtils.java | 11 +++++++++++ .../server/worker/task/sql/SqlTask.java | 17 +++++++++++++++-- 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java index 51e0c93bd6..7301f419cc 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java @@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.ResUploadType; +import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.dao.model.User; @@ -455,7 +456,7 @@ public class DataSourceController extends BaseController { logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName()); try{ // if upload resource is HDFS and kerberos startup is true , else false - return success(Status.SUCCESS.getMsg(), CheckUtils.getKerberosStartupState()); + return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState()); }catch (Exception e){ logger.error(KERBEROS_STARTUP_STATE.getMsg(),e); return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg()); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java index 02164f971b..b11e34913f 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java @@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.ResUploadType; import cn.escheduler.common.enums.UserType; import cn.escheduler.common.job.db.*; +import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.dao.mapper.DataSourceMapper; import cn.escheduler.dao.mapper.DatasourceUserMapper; @@ -374,7 +375,7 @@ public class DataSourceService extends BaseService{ break; case HIVE: case SPARK: - if (CheckUtils.getKerberosStartupState()) { + if (CommonUtils.getKerberosStartupState()) { System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); Configuration configuration = new Configuration(); @@ -470,7 +471,7 @@ public class DataSourceService extends BaseService{ String address = buildAddress(type, host, port); String jdbcUrl = address + "/" + database; - if (CheckUtils.getKerberosStartupState() && + if (CommonUtils.getKerberosStartupState() && (type == DbType.HIVE || type == DbType.SPARK)){ jdbcUrl += ";principal=" + principal; } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java index f6330b79de..00c50f8263 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java @@ -160,16 +160,4 @@ public class CheckUtils { return pattern.matcher(str).matches(); } - - /** - * if upload resource is HDFS and kerberos startup is true , else false - * @return - */ - public static boolean getKerberosStartupState(){ - String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE); - ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); - Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); - return resUploadType == ResUploadType.HDFS && kerberosStartupState; - } - } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java index d0164791d2..43087fbd9c 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java @@ -17,6 +17,7 @@ package cn.escheduler.common.utils; import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.ResUploadType; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,4 +64,14 @@ public class CommonUtils { + /** + * if upload resource is HDFS and kerberos startup is true , else false + * @return + */ + public static boolean getKerberosStartupState(){ + String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE); + ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); + Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); + return resUploadType == ResUploadType.HDFS && kerberosStartupState; + } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index dd10d05ddf..26d682f132 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds; import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.task.sql.SqlType; import cn.escheduler.common.utils.CollectionUtils; +import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.DaoFactory; @@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.EnumUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import java.sql.*; @@ -51,6 +54,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static cn.escheduler.common.utils.PropertyUtils.getString; + /** * sql task */ @@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask { List createFuncs){ Connection connection = null; try { - + if (CommonUtils.getKerberosStartupState()) { + System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, + getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); + Configuration configuration = new Configuration(); + configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(configuration); + UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME), + getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH)); + } if (DbType.HIVE.name().equals(sqlParameters.getType())) { Properties paramProp = new Properties(); paramProp.setProperty("user", baseDataSource.getUser()); @@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask { array.add(mapOfColValues); } - logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); + logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); // send as an attachment if (StringUtils.isEmpty(sqlParameters.getShowType())) { From 9429e24a122547eb38be18b147e70f45389cfcdb Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Tue, 2 Jul 2019 15:57:03 +0800 Subject: [PATCH 2/8] Added the ability for administrators to view all data sources --- .../api/service/DataSourceService.java | 9 ++++++++- .../dao/mapper/DataSourceMapper.java | 13 +++++++++++++ .../dao/mapper/DataSourceMapperProvider.java | 18 ++++++++++++++++-- 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java index 02164f971b..80491f8f32 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java @@ -324,7 +324,14 @@ public class DataSourceService extends BaseService{ */ public Map queryDataSourceList(User loginUser, Integer type) { Map result = new HashMap<>(5); - List datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type); + + List datasourceList; + + if (isAdmin(loginUser)) { + datasourceList = dataSourceMapper.listAllDataSourceByType(); + }else{ + datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type); + } result.put(Constants.DATA_LIST, datasourceList); putMsg(result, Status.SUCCESS); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java index 149d15e662..acedde653d 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java @@ -216,4 +216,17 @@ public interface DataSourceMapper { @SelectProvider(type = DataSourceMapperProvider.class, method = "queryDatasourceExceptUserId") List queryDatasourceExceptUserId(@Param("userId") int userId); + @Results(value = { + @Result(property = "id", column = "id", id = true, javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "name", column = "name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "note", column = "note", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "type", column = "type", typeHandler = EnumOrdinalTypeHandler.class, javaType = DbType.class, jdbcType = JdbcType.INTEGER), + @Result(property = "userId", column = "user_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "connectionParams", column = "connection_params", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) + }) + @SelectProvider(type = DataSourceMapperProvider.class, method = "listAllDataSourceByType") + List listAllDataSourceByType(); + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java index 73228057c2..61461ff1c1 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java @@ -175,8 +175,7 @@ public class DataSourceMapperProvider { } /** - * 查询总的数据源数目 - * + * Query the total number of data sources * @param parameter * @return */ @@ -228,4 +227,19 @@ public class DataSourceMapperProvider { WHERE("user_id <> #{userId}"); }}.toString(); } + + + /** + * list all data source by type + * + * @param parameter + * @return + */ + public String listAllDataSourceByType(Map parameter) { + return new SQL() {{ + SELECT("*"); + FROM(TABLE_NAME); + }}.toString(); + } + } From f21dd49ded83b002ba62c86a3159d081d106db7a Mon Sep 17 00:00:00 2001 From: lenboo Date: Tue, 2 Jul 2019 16:01:53 +0800 Subject: [PATCH 3/8] update tenant id --- .../java/cn/escheduler/dao/ProcessDao.java | 18 ++- .../mapper/ProcessInstanceMapperProvider.java | 6 +- .../server/worker/runner/FetchTaskThread.java | 6 - .../worker/runner/TaskScheduleThread.java | 144 ++++++++++-------- 4 files changed, 94 insertions(+), 80 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 2aa59c7013..17ca727340 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao { // find pause tasks and init task's state cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); + List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), + ExecutionStatus.KILL); + suspendedNodeList.addAll(stopNodeList); for(Integer taskId : suspendedNodeList){ // 把暂停状态初始化 initTaskInstance(this.findTaskInstanceById(taskId)); @@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao { * @param taskInstance */ private void initTaskInstance(TaskInstance taskInstance){ - if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){ - taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); - }else{ - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateTaskInstance(taskInstance); + + if(!taskInstance.isSubProcess()){ + if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){ + taskInstance.setFlag(Flag.NO); + updateTaskInstance(taskInstance); + return; + } } + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + updateTaskInstance(taskInstance); } /** diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java index 1dc3bbc99b..78165e3f9a 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java @@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider { public String queryDetailById(Map parameter) { return new SQL() { { - SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); + SELECT("inst.*,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); - FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q"); + FROM(TABLE_NAME + " inst"); - WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}"); + WHERE("inst.id = #{processId}"); } }.toString(); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index e741966f06..1c328b6cba 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{ // get process define ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), - processDefine.getUserId()); - - if(tenant != null){ - processInstance.setTenantCode(tenant.getTenantCode()); - } taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processDefine); diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index c21a3a2f1b..591712ba7d 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; +import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.dao.model.Tenant; import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; @@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable { // set task params taskProps.setTaskParams(taskNode.getParams()); // set tenant code , execute task linux user - taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setNodeName(taskInstance.getName()); taskProps.setTaskInstId(taskInstance.getId()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); - // set queue - if (StringUtils.isEmpty(queue)){ - taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); - }else { - taskProps.setQueue(queue); - } - taskProps.setTaskStartTime(taskInstance.getStartTime()); - taskProps.setDefinedParams(allParamMap); - - // set task timeout - setTaskTimeout(taskProps, taskNode); - - taskProps.setDependence(taskInstance.getDependency()); - - taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); - - // custom logger - TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); - - task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); - - // job init - task.init(); - - // job handle - task.handle(); - - - logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); - - if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ - status = ExecutionStatus.SUCCESS; - // task recor flat : if true , start up qianfan - if (TaskRecordDao.getTaskRecordFlag() - && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - - AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); - - // replace placeholder - Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), - taskProps.getDefinedParams(), - params.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); - if (paramsMap != null && !paramsMap.isEmpty() - && paramsMap.containsKey("v_proc_date")){ - String vProcDate = paramsMap.get("v_proc_date").getValue(); - if (!StringUtils.isEmpty(vProcDate)){ - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; + + ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); + + Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), + processDefine.getUserId()); + + if(tenant == null){ + processInstance.setTenantCode(tenant.getTenantCode()); + logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}", + processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId() + ); + status = ExecutionStatus.FAILURE; + }else{ + taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); + // set queue + if (StringUtils.isEmpty(queue)){ + taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + }else { + taskProps.setQueue(tenant.getQueueName()); + } + taskProps.setTaskStartTime(taskInstance.getStartTime()); + taskProps.setDefinedParams(allParamMap); + + // set task timeout + setTaskTimeout(taskProps, taskNode); + + taskProps.setDependence(taskInstance.getDependency()); + + taskProps.setTaskAppId(String.format("%s_%s_%s", + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId())); + + // custom logger + TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId())); + + task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); + + // job init + task.init(); + + // job handle + task.handle(); + logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); + + if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + status = ExecutionStatus.SUCCESS; + // task recor flat : if true , start up qianfan + if (TaskRecordDao.getTaskRecordFlag() + && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ + + AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } } } } - } - }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ - status = ExecutionStatus.KILL; - }else { - status = ExecutionStatus.FAILURE; + }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ + status = ExecutionStatus.KILL; + }else { + status = ExecutionStatus.FAILURE; + } } }catch (Exception e){ logger.error("task escheduler failure : " + e.getMessage(),e); From b8a9ef55de874ff8a2897cef6c0069fa6b2fd588 Mon Sep 17 00:00:00 2001 From: lenboo Date: Tue, 2 Jul 2019 16:13:02 +0800 Subject: [PATCH 4/8] update tenant in ui --- .../cn/escheduler/server/worker/runner/TaskScheduleThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index 591712ba7d..89226bf8b5 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -182,7 +182,7 @@ public class TaskScheduleThread implements Callable { ); status = ExecutionStatus.FAILURE; }else{ - taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); + taskProps.setTenantCode(tenant.getTenantCode()); String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); // set queue if (StringUtils.isEmpty(queue)){ From 49d0690a7d8fe283eb9456f771c2ef26120e94be Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Tue, 2 Jul 2019 16:29:02 +0800 Subject: [PATCH 5/8] Added the ability for administrators to view all data sources by type --- .../main/java/cn/escheduler/api/service/DataSourceService.java | 2 +- .../main/java/cn/escheduler/dao/mapper/DataSourceMapper.java | 2 +- .../java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java index 80491f8f32..cb44c5751a 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java @@ -328,7 +328,7 @@ public class DataSourceService extends BaseService{ List datasourceList; if (isAdmin(loginUser)) { - datasourceList = dataSourceMapper.listAllDataSourceByType(); + datasourceList = dataSourceMapper.listAllDataSourceByType(type); }else{ datasourceList = dataSourceMapper.queryDataSourceByType(loginUser.getId(), type); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java index acedde653d..66a7b61bbc 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapper.java @@ -227,6 +227,6 @@ public interface DataSourceMapper { @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) }) @SelectProvider(type = DataSourceMapperProvider.class, method = "listAllDataSourceByType") - List listAllDataSourceByType(); + List listAllDataSourceByType(@Param("type") Integer type); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java index 61461ff1c1..7613e555c1 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/DataSourceMapperProvider.java @@ -239,6 +239,7 @@ public class DataSourceMapperProvider { return new SQL() {{ SELECT("*"); FROM(TABLE_NAME); + WHERE("type = #{type}"); }}.toString(); } From 46bf965f8191b4eb6557c4c4e1d1c88653ea33cf Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Tue, 2 Jul 2019 18:26:14 +0800 Subject: [PATCH 6/8] update --- .../cn/escheduler/server/worker/runner/FetchTaskThread.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 1c328b6cba..1c6232bc9a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -211,9 +211,12 @@ public class FetchTaskThread implements Runnable{ // set task execute path taskInstance.setExecutePath(execLocalPath); + Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), + processDefine.getUserId()); + // check and create Linux users FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, - processInstance.getTenantCode(), logger); + tenant.getTenantCode(), logger); logger.info("task : {} ready to submit to task scheduler thread",taskId); // submit task From c9c91a45e35d2a932ff9d46c29215abfc7da1ef3 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Tue, 2 Jul 2019 20:02:34 +0800 Subject: [PATCH 7/8] remove double quotes --- escheduler-common/src/main/resources/common/common.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/escheduler-common/src/main/resources/common/common.properties b/escheduler-common/src/main/resources/common/common.properties index 874d3d0f0b..27d525f8f5 100644 --- a/escheduler-common/src/main/resources/common/common.properties +++ b/escheduler-common/src/main/resources/common/common.properties @@ -26,10 +26,10 @@ hadoop.security.authentication.startup.state=false java.security.krb5.conf.path=/opt/krb5.conf # loginUserFromKeytab user -login.user.keytab.username="hdfs-mycluster@ESZ.COM" +login.user.keytab.username=hdfs-mycluster@ESZ.COM # loginUserFromKeytab path -login.user.keytab.path="/opt/hdfs.headless.keytab" +login.user.keytab.path=/opt/hdfs.headless.keytab # system env path. self configuration, please make sure the directory and file exists and have read write execute permissions escheduler.env.path=/opt/.escheduler_env.sh From b495bf22d02a3c137f37d448aa4333f1d90fda84 Mon Sep 17 00:00:00 2001 From: qiaozhanwei <825193156@qq.com> Date: Wed, 3 Jul 2019 10:38:56 +0800 Subject: [PATCH 8/8] remove logback.xml --- escheduler-api/src/main/resources/logback.xml | 42 ------------------- 1 file changed, 42 deletions(-) delete mode 100644 escheduler-api/src/main/resources/logback.xml diff --git a/escheduler-api/src/main/resources/logback.xml b/escheduler-api/src/main/resources/logback.xml deleted file mode 100644 index 2e27d70ef3..0000000000 --- a/escheduler-api/src/main/resources/logback.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - INFO - - ${log.base}/escheduler-api-server.log - - ${log.base}/escheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log - 168 - 64MB - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - - \ No newline at end of file