Browse Source

[Fix issue #1770]check udf and data source in order to fix issue 1770 (#1826)

* check udf and data source in order to fix issue 1770

* check udf and data source in order to fix issue 1770

* update testListAuthorizedUdfFunc
pull/2/head
lgcareer 5 years ago committed by qiaozhanwei
parent
commit
779672ec2a
  1. 50
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java
  2. 55
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java
  3. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
  4. 8
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java
  5. 7
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java
  6. 157
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java
  7. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
  8. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml
  9. 77
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java
  10. 97
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java
  11. 50
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java
  12. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java
  13. 48
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

50
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/AuthorizationType.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
/**
* Authorization type
*/
public enum AuthorizationType {
/**
* 0 RESOURCE_FILE;
* 1 DATASOURCE;
* 2 UDF;
*/
RESOURCE_FILE(0, "resource file"),
DATASOURCE(1, "data source"),
UDF(2, "udf function");
AuthorizationType(int code, String descp){
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

55
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/ProcessDao.java

@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao;
import com.alibaba.fastjson.JSONObject;
import com.cronutils.model.Cron;
import org.apache.commons.lang.ArrayUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.model.DateInterval;
@ -25,7 +26,6 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.queue.ITaskQueue;
import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters;
import org.apache.dolphinscheduler.common.utils.ArrayUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -44,6 +44,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
@ -462,12 +463,9 @@ public class ProcessDao {
return null;
}
if(null == tenant){
if(tenant == null){
User user = userMapper.selectById(userId);
if (null != user) {
tenant = tenantMapper.queryById(user.getTenantId());
}
tenant = tenantMapper.queryById(user.getTenantId());
}
return tenant;
}
@ -974,6 +972,9 @@ public class ProcessDao {
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit to task queue, but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
@ -1770,5 +1771,47 @@ public class ProcessDao {
return projectIdList;
}
/**
* list unauthorized udf function
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
public <T> List<T> listUnauthorized(int userId,T[] needChecks,AuthorizationType authorizationType){
List<T> resultList = new ArrayList<T>();
if (!ArrayUtils.isEmpty(needChecks)) {
Set<T> originResSet = new HashSet<T>(Arrays.asList(needChecks));
switch (authorizationType){
case RESOURCE_FILE:
Set<String> authorizedResources = resourceMapper.listAuthorizedResource(userId, needChecks).stream().map(t -> t.getAlias()).collect(toSet());
originResSet.removeAll(authorizedResources);
break;
case DATASOURCE:
Set<Integer> authorizedDatasources = dataSourceMapper.listAuthorizedDataSource(userId,needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedDatasources);
break;
case UDF:
Set<Integer> authorizedUdfs = udfFuncMapper.listAuthorizedUdfFunc(userId, needChecks).stream().map(t -> t.getId()).collect(toSet());
originResSet.removeAll(authorizedUdfs);
break;
}
resultList.addAll(originResSet);
}
return resultList;
}
/**
* get user by user id
* @param userId user id
* @return User
*/
public User getUserById(int userId){
return userMapper.queryDetailsById(userId);
}
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java

@ -77,4 +77,13 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
List<DataSource> listAllDataSourceByType(@Param("type") Integer type);
/**
* list authorized UDF function
* @param userId userId
* @param dataSourceIds data source id array
* @return UDF function list
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
}

8
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapper.java

@ -83,4 +83,12 @@ public interface ResourceMapper extends BaseMapper<Resource> {
* @return tenant code
*/
String queryTenantCodeByResourceName(@Param("resName") String resName);
/**
* list authorized resource
* @param userId userId
* @param resNames resource names
* @return resource list
*/
<T> List<Resource> listAuthorizedResource(@Param("userId") int userId,@Param("resNames")T[] resNames);
}

7
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.java

@ -78,5 +78,12 @@ public interface UdfFuncMapper extends BaseMapper<UdfFunc> {
*/
List<UdfFunc> queryAuthedUdfFunc(@Param("userId") int userId);
/**
* list authorized UDF function
* @param userId userId
* @param udfIds UDF function id array
* @return UDF function list
*/
<T> List<UdfFunc> listAuthorizedUdfFunc (@Param("userId") int userId,@Param("udfIds")T[] udfIds);
}

157
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/permission/PermissionCheck.java

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.permission;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.User;
import org.slf4j.Logger;
import java.util.List;
public class PermissionCheck<T> {
/**
* logger
*/
private Logger logger;
/**
* Authorization Type
*/
private AuthorizationType authorizationType;
/**
* Authorization Type
*/
private ProcessDao processDao;
/**
* need check array
*/
private T[] needChecks;
/**
* user id
*/
private int userId;
/**
* permission check
* @param authorizationType authorization type
* @param processDao process dao
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao) {
this.authorizationType = authorizationType;
this.processDao = processDao;
}
/**
* permission check
* @param authorizationType
* @param processDao
* @param needChecks
* @param userId
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId) {
this.authorizationType = authorizationType;
this.processDao = processDao;
this.needChecks = needChecks;
this.userId = userId;
}
/**
* permission check
* @param authorizationType
* @param processDao
* @param needChecks
* @param userId
* @param logger
*/
public PermissionCheck(AuthorizationType authorizationType, ProcessDao processDao, T[] needChecks, int userId,Logger logger) {
this.authorizationType = authorizationType;
this.processDao = processDao;
this.needChecks = needChecks;
this.userId = userId;
this.logger = logger;
}
public AuthorizationType getAuthorizationType() {
return authorizationType;
}
public void setAuthorizationType(AuthorizationType authorizationType) {
this.authorizationType = authorizationType;
}
public ProcessDao getProcessDao() {
return processDao;
}
public void setProcessDao(ProcessDao processDao) {
this.processDao = processDao;
}
public T[] getNeedChecks() {
return needChecks;
}
public void setNeedChecks(T[] needChecks) {
this.needChecks = needChecks;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
/**
* has permission
* @return true if has permission
*/
public boolean hasPermission(){
try {
checkPermission();
return true;
} catch (Exception e) {
return false;
}
}
/**
* check permission
* @throws Exception exception
*/
public void checkPermission() throws Exception{
if(this.needChecks.length > 0){
// get user type in order to judge whether the user is admin
User user = processDao.getUserById(userId);
if (user.getUserType() != UserType.ADMIN_USER){
List<T> unauthorizedList = processDao.listUnauthorized(userId,needChecks,authorizationType);
// if exist unauthorized resource
if(CollectionUtils.isNotEmpty(unauthorizedList)){
logger.error("user {} didn't has permission of {}: {}", user.getUserName(), authorizationType.getDescp(),unauthorizedList.toString());
throw new RuntimeException(String.format("user %s didn't has permission of %s %s", user.getUserName(), authorizationType.getDescp(), unauthorizedList.get(0)));
}
}
}
}
}

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml

@ -74,6 +74,19 @@
from t_ds_datasource
where type = #{type}
</select>
<select id="listAuthorizedDataSource" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
select *
from t_ds_datasource
where
id in (select datasource_id from t_ds_relation_datasource_user where user_id=#{userId}
union select id as datasource_id from t_ds_datasource where user_id=#{userId})
<if test="dataSourceIds != null and dataSourceIds != ''">
and id in
<foreach collection="dataSourceIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapper.xml

@ -74,4 +74,17 @@
WHERE u.id = rel.udf_id
AND rel.user_id = #{userId}
</select>
<select id="listAuthorizedUdfFunc" resultType="org.apache.dolphinscheduler.dao.entity.UdfFunc">
select *
from t_ds_udfs
where
id in (select udf_id from t_ds_relation_udfs_user where user_id=#{userId}
union select id as udf_id from t_ds_udfs where user_id=#{userId})
<if test="udfIds != null and udfIds != ''">
and id in
<foreach collection="udfIds" item="i" open="(" close=")" separator=",">
#{i}
</foreach>
</if>
</select>
</mapper>

77
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.DatasourceUser;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -34,7 +36,9 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import static org.hamcrest.Matchers.*;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
/**
@ -58,6 +62,9 @@ public class DataSourceMapperTest {
@Autowired
DataSourceUserMapper dataSourceUserMapper;
@Autowired
private UserMapper userMapper;
/**
* test insert
*/
@ -244,6 +251,33 @@ public class DataSourceMapperTest {
}
}
@Test
public void testListAuthorizedDataSource(){
//create general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
//create data source
DataSource dataSource = createDataSource(generalUser1.getId(), "ds-1");
DataSource unauthorizdDataSource = createDataSource(generalUser2.getId(), "ds-2");
//data source ids
Integer[] dataSourceIds = new Integer[]{dataSource.getId(),unauthorizdDataSource.getId()};
List<DataSource> authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds);
Assert.assertEquals(generalUser1.getId(),dataSource.getUserId());
Assert.assertNotEquals(generalUser1.getId(),unauthorizdDataSource.getUserId());
Assert.assertFalse(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds)));
//authorize object unauthorizdDataSource to generalUser1
createUserDataSource(generalUser1, unauthorizdDataSource);
authorizedDataSource = dataSourceMapper.listAuthorizedDataSource(generalUser1.getId(), dataSourceIds);
Assert.assertTrue(authorizedDataSource.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(dataSourceIds)));
}
/**
* create datasource relation
* @param userId
@ -289,7 +323,6 @@ public class DataSourceMapperTest {
return dataSourceMap;
}
/**
* create datasource
* @return datasource
@ -330,5 +363,41 @@ public class DataSourceMapperTest {
return dataSource;
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create the relation of user and data source
* @param user user
* @param dataSource data source
* @return DatasourceUser
*/
private DatasourceUser createUserDataSource(User user,DataSource dataSource){
DatasourceUser datasourceUser = new DatasourceUser();
datasourceUser.setDatasourceId(dataSource.getId());
datasourceUser.setUserId(user.getId());
datasourceUser.setPerm(7);
datasourceUser.setCreateTime(DateUtils.getCurrentDate());
datasourceUser.setUpdateTime(DateUtils.getCurrentDate());
dataSourceUserMapper.insert(datasourceUser);
return datasourceUser;
}
}

97
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ResourceMapperTest.java

@ -17,22 +17,36 @@
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.dao.entity.*;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.ResourcesUser;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@Rollback(true)
public class ResourceMapperTest {
@Autowired
@ -61,6 +75,59 @@ public class ResourceMapperTest {
return resource;
}
/**
* create resource by user
* @param user user
* @return Resource
*/
private Resource createResource(User user){
//insertOne
Resource resource = new Resource();
resource.setAlias(String.format("ut resource %s",user.getUserName()));
resource.setType(ResourceType.FILE);
resource.setUserId(user.getId());
resourceMapper.insert(resource);
return resource;
}
/**
* create user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* create resource user
* @return ResourcesUser
*/
private ResourcesUser createResourcesUser(Resource resource,User user){
//insertOne
ResourcesUser resourcesUser = new ResourcesUser();
resourcesUser.setCreateTime(new Date());
resourcesUser.setUpdateTime(new Date());
resourcesUser.setUserId(user.getId());
resourcesUser.setResourcesId(resource.getId());
resourceUserMapper.insert(resourcesUser);
return resourcesUser;
}
@Test
public void testInsert(){
Resource resource = insertOne();
assertNotNull(resource.getId());
assertThat(resource.getId(),greaterThan(0));
}
/**
* test update
*/
@ -230,4 +297,30 @@ public class ResourceMapperTest {
resourceMapper.deleteById(resource.getId());
}
@Test
public void testListAuthorizedResource(){
// create a general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
// create one resource
Resource resource = createResource(generalUser2);
Resource unauthorizedResource = createResource(generalUser2);
// need download resources
String[] resNames = new String[]{resource.getAlias(), unauthorizedResource.getAlias()};
List<Resource> resources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertEquals(generalUser2.getId(),resource.getUserId());
Assert.assertFalse(resources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
// authorize object unauthorizedResource to generalUser
createResourcesUser(unauthorizedResource,generalUser2);
List<Resource> authorizedResources = resourceMapper.listAuthorizedResource(generalUser2.getId(), resNames);
Assert.assertTrue(authorizedResources.stream().map(t -> t.getAlias()).collect(toList()).containsAll(Arrays.asList(resNames)));
}
}

50
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UdfFuncMapperTest.java

@ -29,13 +29,20 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static java.util.stream.Collectors.toList;
@RunWith(SpringRunner.class)
@SpringBootTest
@Transactional
@Rollback(true)
public class UdfFuncMapperTest {
@Autowired
@ -133,6 +140,23 @@ public class UdfFuncMapperTest {
return udfUser;
}
/**
* create general user
* @return User
*/
private User createGeneralUser(String userName){
User user = new User();
user.setUserName(userName);
user.setUserPassword("1");
user.setEmail("xx@123.com");
user.setUserType(UserType.GENERAL_USER);
user.setCreateTime(new Date());
user.setTenantId(1);
user.setUpdateTime(new Date());
userMapper.insert(user);
return user;
}
/**
* test update
*/
@ -268,4 +292,30 @@ public class UdfFuncMapperTest {
udfUserMapper.deleteById(udfUser.getId());
Assert.assertNotEquals(udfFuncList.size(), 0);
}
@Test
public void testListAuthorizedUdfFunc(){
//create general user
User generalUser1 = createGeneralUser("user1");
User generalUser2 = createGeneralUser("user2");
//create udf function
UdfFunc udfFunc = insertOne(generalUser1);
UdfFunc unauthorizdUdfFunc = insertOne(generalUser2);
//udf function ids
Integer[] udfFuncIds = new Integer[]{udfFunc.getId(),unauthorizdUdfFunc.getId()};
List<UdfFunc> authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds);
Assert.assertEquals(generalUser1.getId(),udfFunc.getUserId());
Assert.assertNotEquals(generalUser1.getId(),unauthorizdUdfFunc.getUserId());
Assert.assertFalse(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds)));
//authorize object unauthorizdUdfFunc to generalUser1
insertOneUDFUser(generalUser1,unauthorizdUdfFunc);
authorizedUdfFunc = udfFuncMapper.listAuthorizedUdfFunc(generalUser1.getId(), udfFuncIds);
Assert.assertTrue(authorizedUdfFunc.stream().map(t -> t.getId()).collect(toList()).containsAll(Arrays.asList(udfFuncIds)));
}
}

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -21,6 +21,7 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -42,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
@ -94,12 +95,15 @@ public class TaskScheduleThread implements Runnable {
// task node
TaskNode taskNode = JSONObject.parseObject(taskInstance.getTaskJson(), TaskNode.class);
// get resource files
List<String> resourceFiles = createProjectResFiles(taskNode);
// copy hdfs/minio file to local
copyHdfsToLocal(processDao,
downloadResource(
taskInstance.getExecutePath(),
createProjectResFiles(taskNode),
resourceFiles,
logger);
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
@ -204,8 +208,8 @@ public class TaskScheduleThread implements Runnable {
}
/**
* get task log path
* @return
* get task log path
* @return log path
*/
private String getTaskLogPath() {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
@ -294,14 +298,14 @@ public class TaskScheduleThread implements Runnable {
}
/**
* copy hdfs file to local
* download resource file
*
* @param processDao
* @param execLocalPath
* @param projectRes
* @param logger
*/
private void copyHdfsToLocal(ProcessDao processDao, String execLocalPath, List<String> projectRes, Logger logger) throws IOException {
private void downloadResource(String execLocalPath, List<String> projectRes, Logger logger) throws Exception {
checkDownloadPermission(projectRes);
for (String res : projectRes) {
File resFile = new File(execLocalPath, res);
if (!resFile.exists()) {
@ -321,4 +325,16 @@ public class TaskScheduleThread implements Runnable {
}
}
}
/**
* check download resource permission
* @param projectRes resource name list
* @throws Exception exception
*/
private void checkDownloadPermission(List<String> projectRes) throws Exception {
int userId = taskInstance.getProcessInstance().getExecutorId();
String[] resNames = projectRes.toArray(new String[projectRes.size()]);
PermissionCheck<String> permissionCheck = new PermissionCheck<>(AuthorizationType.RESOURCE_FILE,processDao,resNames,userId,logger);
permissionCheck.checkPermission();
}
}

48
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -19,8 +19,11 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ShowType;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.UdfType;
@ -38,6 +41,7 @@ import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.permission.PermissionCheck;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -119,13 +123,6 @@ public class SqlTask extends AbstractTask {
}
dataSource= processDao.findDataSourceById(sqlParameters.getDatasource());
if (null == dataSource){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
@ -133,6 +130,12 @@ public class SqlTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
Connection con = null;
List<String> createFuncs = null;
try {
@ -164,6 +167,8 @@ public class SqlTask extends AbstractTask {
for(int i=0;i<ids.length;i++){
idsArray[i]=Integer.parseInt(ids[i]);
}
// check udf permission
checkUdfPermission(ArrayUtils.toObject(idsArray));
List<UdfFunc> udfFuncList = processDao.queryUdfFunListByids(idsArray);
createFuncs = UDFUtils.createFuncs(udfFuncList, taskProps.getTenantCode(), logger);
}
@ -449,4 +454,33 @@ public class SqlTask extends AbstractTask {
}
logger.info(logPrint.toString());
}
/**
* check udf function permission
* @param udfFunIds udf functions
* @return if has download permission return true else false
*/
private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
// process instance
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF,processDao,udfFunIds,userId,logger);
permissionCheckUdf.checkPermission();
}
/**
* check data source permission
* @param dataSourceId data source id
* @return if has download permission return true else false
*/
private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE,processDao,new Integer[]{dataSourceId},userId,logger);
permissionCheckDataSource.checkPermission();
}
}

Loading…
Cancel
Save