@ -16,6 +16,7 @@
* /
package org.apache.dolphinscheduler.server.utils ;
import org.apache.commons.collections.MapUtils ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.common.utils.HadoopUtils ;
@ -24,10 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.slf4j.Logger ;
import java.text.MessageFormat ;
import java.util.ArrayList ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Set ;
import java.util.* ;
import java.util.stream.Collectors ;
import static org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty ;
@ -43,53 +42,44 @@ public class UDFUtils {
/ * *
* create function list
* @param udfFuncs udf functions
* @param tenantCode tenant code
* @param logger logger
* @param udfFuncTenantCodeMap key is udf function , value is tenant code
* @param logger logger
* @return create function list
* /
public static List < String > createFuncs ( List < UdfFunc > udfFuncs , String tenantCode , Logger logger ) {
public static List < String > createFuncs ( Map < UdfFunc , String > udfFuncTenantCodeMap , Logger logger ) {
if ( Collection Utils. isEmpty ( udfFuncs ) ) {
if ( Map Utils. isEmpty ( udfFuncTenantCodeMap ) ) {
logger . info ( "can't find udf function resource" ) ;
return null ;
}
// get hive udf jar path
String hiveUdfJarPath = HadoopUtils . getHdfsUdfDir ( tenantCode ) ;
logger . info ( "hive udf jar path : {}" , hiveUdfJarPath ) ;
// is the root directory of udf defined
if ( StringUtils . isEmpty ( hiveUdfJarPath ) ) {
logger . error ( "not define hive udf jar path" ) ;
throw new RuntimeException ( "hive udf jar base path not defined " ) ;
}
Set < String > resources = getFuncResouces ( udfFuncs ) ;
List < String > funcList = new ArrayList < > ( ) ;
// build jar sql
buildJarSql ( funcList , resources , hiveUdfJarPath ) ;
buildJarSql ( funcList , udfFuncTenantCodeMap ) ;
// build temp function sql
buildTempFuncSql ( funcList , udfFuncs ) ;
buildTempFuncSql ( funcList , udfFuncTenantCodeMap . keySet ( ) . stream ( ) . collect ( Collectors . toList ( ) ) ) ;
return funcList ;
}
/ * *
* build jar sql
* @param sqls sql list
* @param resources resource set
* @param uploadPath upload path
* @param sqls sql list
* @param udfFuncTenantCodeMap key is udf function , value is tenant code
* /
private static void buildJarSql ( List < String > sqls , Set < String > resources , String uploadPath ) {
private static void buildJarSql ( List < String > sqls , Map < UdfFunc , String > udfFuncTenantCodeMap ) {
String defaultFS = HadoopUtils . getInstance ( ) . getConfiguration ( ) . get ( Constants . FS_DEFAULTFS ) ;
if ( ! uploadPath . startsWith ( "hdfs:" ) ) {
uploadPath = defaultFS + uploadPath ;
}
for ( String resource : resources ) {
sqls . add ( String . format ( "add jar %s/%s" , uploadPath , resource ) ) ;
Set < Map . Entry < UdfFunc , String > > entries = udfFuncTenantCodeMap . entrySet ( ) ;
for ( Map . Entry < UdfFunc , String > entry : entries ) {
String uploadPath = HadoopUtils . getHdfsUdfDir ( entry . getValue ( ) ) ;
if ( ! uploadPath . startsWith ( "hdfs:" ) ) {
uploadPath = defaultFS + uploadPath ;
}
sqls . add ( String . format ( "add jar %s%s" , uploadPath , entry . getKey ( ) . getResourceName ( ) ) ) ;
}
}
/ * *
@ -106,20 +96,5 @@ public class UDFUtils {
}
}
/ * *
* get the resource names of all functions
* @param udfFuncs udf function list
* @return
* /
private static Set < String > getFuncResouces ( List < UdfFunc > udfFuncs ) {
Set < String > resources = new HashSet < > ( ) ;
for ( UdfFunc udfFunc : udfFuncs ) {
resources . add ( udfFunc . getResourceName ( ) ) ;
}
return resources ;
}
}