diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java index 51e0c93bd6..7301f419cc 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java @@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.ResUploadType; +import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.dao.model.User; @@ -455,7 +456,7 @@ public class DataSourceController extends BaseController { logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName()); try{ // if upload resource is HDFS and kerberos startup is true , else false - return success(Status.SUCCESS.getMsg(), CheckUtils.getKerberosStartupState()); + return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState()); }catch (Exception e){ logger.error(KERBEROS_STARTUP_STATE.getMsg(),e); return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg()); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java index cb44c5751a..e7f90c5ce5 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java @@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType; import cn.escheduler.common.enums.ResUploadType; import cn.escheduler.common.enums.UserType; import cn.escheduler.common.job.db.*; +import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.PropertyUtils; import cn.escheduler.dao.mapper.DataSourceMapper; import cn.escheduler.dao.mapper.DatasourceUserMapper; @@ -381,7 +382,7 @@ public class DataSourceService extends BaseService{ break; case HIVE: case SPARK: - if (CheckUtils.getKerberosStartupState()) { + if (CommonUtils.getKerberosStartupState()) { System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); Configuration configuration = new Configuration(); @@ -477,7 +478,7 @@ public class DataSourceService extends BaseService{ String address = buildAddress(type, host, port); String jdbcUrl = address + "/" + database; - if (CheckUtils.getKerberosStartupState() && + if (CommonUtils.getKerberosStartupState() && (type == DbType.HIVE || type == DbType.SPARK)){ jdbcUrl += ";principal=" + principal; } diff --git a/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java b/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java index f6330b79de..00c50f8263 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java @@ -160,16 +160,4 @@ public class CheckUtils { return pattern.matcher(str).matches(); } - - /** - * if upload resource is HDFS and kerberos startup is true , else false - * @return - */ - public static boolean getKerberosStartupState(){ - String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE); - ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); - Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); - return resUploadType == ResUploadType.HDFS && kerberosStartupState; - } - } diff --git a/escheduler-api/src/main/resources/logback.xml b/escheduler-api/src/main/resources/logback.xml deleted file mode 100644 index 2e27d70ef3..0000000000 --- a/escheduler-api/src/main/resources/logback.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - - - - - - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - INFO - - ${log.base}/escheduler-api-server.log - - ${log.base}/escheduler-api-server.%d{yyyy-MM-dd_HH}.%i.log - 168 - 64MB - - - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - - - - - - - \ No newline at end of file diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java index d0164791d2..43087fbd9c 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java @@ -17,6 +17,7 @@ package cn.escheduler.common.utils; import cn.escheduler.common.Constants; +import cn.escheduler.common.enums.ResUploadType; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,4 +64,14 @@ public class CommonUtils { + /** + * if upload resource is HDFS and kerberos startup is true , else false + * @return + */ + public static boolean getKerberosStartupState(){ + String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE); + ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); + Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE); + return resUploadType == ResUploadType.HDFS && kerberosStartupState; + } } diff --git a/escheduler-common/src/main/resources/common/common.properties b/escheduler-common/src/main/resources/common/common.properties index 874d3d0f0b..27d525f8f5 100644 --- a/escheduler-common/src/main/resources/common/common.properties +++ b/escheduler-common/src/main/resources/common/common.properties @@ -26,10 +26,10 @@ hadoop.security.authentication.startup.state=false java.security.krb5.conf.path=/opt/krb5.conf # loginUserFromKeytab user -login.user.keytab.username="hdfs-mycluster@ESZ.COM" +login.user.keytab.username=hdfs-mycluster@ESZ.COM # loginUserFromKeytab path -login.user.keytab.path="/opt/hdfs.headless.keytab" +login.user.keytab.path=/opt/hdfs.headless.keytab # system env path. self configuration, please make sure the directory and file exists and have read write execute permissions escheduler.env.path=/opt/.escheduler_env.sh diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 2aa59c7013..17ca727340 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao { // find pause tasks and init task's state cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); List suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); + List stopNodeList = findTaskIdByInstanceState(processInstance.getId(), + ExecutionStatus.KILL); + suspendedNodeList.addAll(stopNodeList); for(Integer taskId : suspendedNodeList){ // 把暂停状态初始化 initTaskInstance(this.findTaskInstanceById(taskId)); @@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao { * @param taskInstance */ private void initTaskInstance(TaskInstance taskInstance){ - if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){ - taskInstance.setFlag(Flag.NO); - updateTaskInstance(taskInstance); - }else{ - taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); - updateTaskInstance(taskInstance); + + if(!taskInstance.isSubProcess()){ + if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){ + taskInstance.setFlag(Flag.NO); + updateTaskInstance(taskInstance); + return; + } } + taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); + updateTaskInstance(taskInstance); } /** diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java index 1dc3bbc99b..78165e3f9a 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java @@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider { public String queryDetailById(Map parameter) { return new SQL() { { - SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); + SELECT("inst.*,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); - FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q"); + FROM(TABLE_NAME + " inst"); - WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}"); + WHERE("inst.id = #{processId}"); } }.toString(); } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index e741966f06..1c6232bc9a 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{ // get process define ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), - processDefine.getUserId()); - - if(tenant != null){ - processInstance.setTenantCode(tenant.getTenantCode()); - } taskInstance.setProcessInstance(processInstance); taskInstance.setProcessDefine(processDefine); @@ -217,9 +211,12 @@ public class FetchTaskThread implements Runnable{ // set task execute path taskInstance.setExecutePath(execLocalPath); + Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), + processDefine.getUserId()); + // check and create Linux users FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, - processInstance.getTenantCode(), logger); + tenant.getTenantCode(), logger); logger.info("task : {} ready to submit to task scheduler thread",taskId); // submit task diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index c21a3a2f1b..89226bf8b5 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.utils.*; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.TaskRecordDao; +import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.dao.model.Tenant; import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.worker.log.TaskLogger; @@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable { // set task params taskProps.setTaskParams(taskNode.getParams()); // set tenant code , execute task linux user - taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setNodeName(taskInstance.getName()); taskProps.setTaskInstId(taskInstance.getId()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); - // set queue - if (StringUtils.isEmpty(queue)){ - taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); - }else { - taskProps.setQueue(queue); - } - taskProps.setTaskStartTime(taskInstance.getStartTime()); - taskProps.setDefinedParams(allParamMap); - - // set task timeout - setTaskTimeout(taskProps, taskNode); - - taskProps.setDependence(taskInstance.getDependency()); - - taskProps.setTaskAppId(String.format("%s_%s_%s", - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); - - // custom logger - TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, - taskInstance.getProcessDefine().getId(), - taskInstance.getProcessInstance().getId(), - taskInstance.getId())); - - task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); - - // job init - task.init(); - - // job handle - task.handle(); - - - logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); - - if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ - status = ExecutionStatus.SUCCESS; - // task recor flat : if true , start up qianfan - if (TaskRecordDao.getTaskRecordFlag() - && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ - - AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); - - // replace placeholder - Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), - taskProps.getDefinedParams(), - params.getLocalParametersMap(), - processInstance.getCmdTypeIfComplement(), - processInstance.getScheduleTime()); - if (paramsMap != null && !paramsMap.isEmpty() - && paramsMap.containsKey("v_proc_date")){ - String vProcDate = paramsMap.get("v_proc_date").getValue(); - if (!StringUtils.isEmpty(vProcDate)){ - TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); - logger.info("task record status : {}",taskRecordState); - if (taskRecordState == TaskRecordStatus.FAILURE){ - status = ExecutionStatus.FAILURE; + + ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); + + Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), + processDefine.getUserId()); + + if(tenant == null){ + processInstance.setTenantCode(tenant.getTenantCode()); + logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}", + processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId() + ); + status = ExecutionStatus.FAILURE; + }else{ + taskProps.setTenantCode(tenant.getTenantCode()); + String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); + // set queue + if (StringUtils.isEmpty(queue)){ + taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); + }else { + taskProps.setQueue(tenant.getQueueName()); + } + taskProps.setTaskStartTime(taskInstance.getStartTime()); + taskProps.setDefinedParams(allParamMap); + + // set task timeout + setTaskTimeout(taskProps, taskNode); + + taskProps.setDependence(taskInstance.getDependency()); + + taskProps.setTaskAppId(String.format("%s_%s_%s", + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId())); + + // custom logger + TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, + taskInstance.getProcessDefine().getId(), + taskInstance.getProcessInstance().getId(), + taskInstance.getId())); + + task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); + + // job init + task.init(); + + // job handle + task.handle(); + logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); + + if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ + status = ExecutionStatus.SUCCESS; + // task recor flat : if true , start up qianfan + if (TaskRecordDao.getTaskRecordFlag() + && TaskType.typeIsNormalTask(taskInstance.getTaskType())){ + + AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); + + // replace placeholder + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + params.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), + processInstance.getScheduleTime()); + if (paramsMap != null && !paramsMap.isEmpty() + && paramsMap.containsKey("v_proc_date")){ + String vProcDate = paramsMap.get("v_proc_date").getValue(); + if (!StringUtils.isEmpty(vProcDate)){ + TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate); + logger.info("task record status : {}",taskRecordState); + if (taskRecordState == TaskRecordStatus.FAILURE){ + status = ExecutionStatus.FAILURE; + } } } } - } - }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ - status = ExecutionStatus.KILL; - }else { - status = ExecutionStatus.FAILURE; + }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ + status = ExecutionStatus.KILL; + }else { + status = ExecutionStatus.FAILURE; + } } }catch (Exception e){ logger.error("task escheduler failure : " + e.getMessage(),e); diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index dd10d05ddf..26d682f132 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds; import cn.escheduler.common.task.sql.SqlParameters; import cn.escheduler.common.task.sql.SqlType; import cn.escheduler.common.utils.CollectionUtils; +import cn.escheduler.common.utils.CommonUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.AlertDao; import cn.escheduler.dao.DaoFactory; @@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.EnumUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import java.sql.*; @@ -51,6 +54,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import static cn.escheduler.common.utils.PropertyUtils.getString; + /** * sql task */ @@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask { List createFuncs){ Connection connection = null; try { - + if (CommonUtils.getKerberosStartupState()) { + System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF, + getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH)); + Configuration configuration = new Configuration(); + configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(configuration); + UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME), + getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH)); + } if (DbType.HIVE.name().equals(sqlParameters.getType())) { Properties paramProp = new Properties(); paramProp.setProperty("user", baseDataSource.getUser()); @@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask { array.add(mapOfColValues); } - logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); + logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue)); // send as an attachment if (StringUtils.isEmpty(sqlParameters.getShowType())) {