Browse Source

Merge pull request #507 from qiaozhanwei/dev-1.1.0

Kerberos integrated
pull/2/head
乔占卫 5 years ago committed by GitHub
parent
commit
c53f5bcc57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java
  2. 5
      escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java
  3. 12
      escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java
  4. 11
      escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java
  5. 4
      escheduler-common/src/main/resources/common/common.properties
  6. 5
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  7. 17
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

3
escheduler-api/src/main/java/cn/escheduler/api/controller/DataSourceController.java

@ -23,6 +23,7 @@ import cn.escheduler.api.utils.Constants;
import cn.escheduler.api.utils.Result;
import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.model.User;
@ -455,7 +456,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {},get kerberos startup state : {}", loginUser.getUserName());
try{
// if upload resource is HDFS and kerberos startup is true , else false
return success(Status.SUCCESS.getMsg(), CheckUtils.getKerberosStartupState());
return success(Status.SUCCESS.getMsg(), CommonUtils.getKerberosStartupState());
}catch (Exception e){
logger.error(KERBEROS_STARTUP_STATE.getMsg(),e);
return error(Status.KERBEROS_STARTUP_STATE.getCode(), Status.KERBEROS_STARTUP_STATE.getMsg());

5
escheduler-api/src/main/java/cn/escheduler/api/service/DataSourceService.java

@ -25,6 +25,7 @@ import cn.escheduler.common.enums.DbType;
import cn.escheduler.common.enums.ResUploadType;
import cn.escheduler.common.enums.UserType;
import cn.escheduler.common.job.db.*;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.PropertyUtils;
import cn.escheduler.dao.mapper.DataSourceMapper;
import cn.escheduler.dao.mapper.DatasourceUserMapper;
@ -381,7 +382,7 @@ public class DataSourceService extends BaseService{
break;
case HIVE:
case SPARK:
if (CheckUtils.getKerberosStartupState()) {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
@ -477,7 +478,7 @@ public class DataSourceService extends BaseService{
String address = buildAddress(type, host, port);
String jdbcUrl = address + "/" + database;
if (CheckUtils.getKerberosStartupState() &&
if (CommonUtils.getKerberosStartupState() &&
(type == DbType.HIVE || type == DbType.SPARK)){
jdbcUrl += ";principal=" + principal;
}

12
escheduler-api/src/main/java/cn/escheduler/api/utils/CheckUtils.java

@ -160,16 +160,4 @@ public class CheckUtils {
return pattern.matcher(str).matches();
}
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
}

11
escheduler-common/src/main/java/cn/escheduler/common/utils/CommonUtils.java

@ -17,6 +17,7 @@
package cn.escheduler.common.utils;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ResUploadType;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,4 +64,14 @@ public class CommonUtils {
/**
* if upload resource is HDFS and kerberos startup is true , else false
* @return
*/
public static boolean getKerberosStartupState(){
String resUploadStartupType = PropertyUtils.getString(cn.escheduler.common.Constants.RES_UPLOAD_STARTUP_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = getBoolean(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
}

4
escheduler-common/src/main/resources/common/common.properties

@ -26,10 +26,10 @@ hadoop.security.authentication.startup.state=false
java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user
login.user.keytab.username="hdfs-mycluster@ESZ.COM"
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
login.user.keytab.path="/opt/hdfs.headless.keytab"
login.user.keytab.path=/opt/hdfs.headless.keytab
# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
escheduler.env.path=/opt/.escheduler_env.sh

5
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -211,9 +211,12 @@ public class FetchTaskThread implements Runnable{
// set task execute path
taskInstance.setExecutePath(execLocalPath);
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger);
tenant.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task

17
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -29,6 +29,7 @@ import cn.escheduler.common.task.sql.SqlBinds;
import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.task.sql.SqlType;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.CommonUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
@ -43,6 +44,8 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import java.sql.*;
@ -51,6 +54,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static cn.escheduler.common.utils.PropertyUtils.getString;
/**
* sql task
*/
@ -228,7 +233,15 @@ public class SqlTask extends AbstractTask {
List<String> createFuncs){
Connection connection = null;
try {
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(cn.escheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(cn.escheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
getString(cn.escheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
if (DbType.HIVE.name().equals(sqlParameters.getType())) {
Properties paramProp = new Properties();
paramProp.setProperty("user", baseDataSource.getUser());
@ -278,7 +291,7 @@ public class SqlTask extends AbstractTask {
array.add(mapOfColValues);
}
logger.info("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
logger.debug("execute sql : {}", JSONObject.toJSONString(array, SerializerFeature.WriteMapNullValue));
// send as an attachment
if (StringUtils.isEmpty(sqlParameters.getShowType())) {

Loading…
Cancel
Save