Browse Source

[Fix-10909] Returns executionStatus for each process instance status and access token module support. (#10922)

* Returns executionStatus for each process instance status and access token module support
3.1.0-release
WangJPLeo 2 years ago committed by GitHub
parent
commit
9f34a837b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 53
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java
  2. 64
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java
  3. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java
  4. 7
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml
  5. 5
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java
  6. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

53
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/AccessTokenServiceImpl.java

@ -17,33 +17,39 @@
package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.commons.lang3.StringUtils;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ACCESS_TOKEN_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ACCESS_TOKEN_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ACCESS_TOKEN_UPDATE;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.AccessTokenService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
import org.apache.dolphinscheduler.dao.entity.AccessToken;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* access token service impl
@ -69,14 +75,13 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
public Result queryAccessTokenList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Result result = new Result();
PageInfo<AccessToken> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<AccessToken> page = new Page<>(pageNo, pageSize);
int userId = loginUser.getId();
if (loginUser.getUserType() == UserType.ADMIN_USER) {
userId = 0;
Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ACCESS_TOKEN, loginUser.getId(), logger);
if (!ids.isEmpty()) {
Page<AccessToken> page = new Page<>(pageNo, pageSize);
IPage<AccessToken> accessTokenList = accessTokenMapper.selectAccessTokenPage(page, new ArrayList<>(ids), searchVal);
pageInfo.setTotal((int) accessTokenList.getTotal());
pageInfo.setTotalList(accessTokenList.getRecords());
}
IPage<AccessToken> accessTokenList = accessTokenMapper.selectAccessTokenPage(page, searchVal, userId);
pageInfo.setTotal((int) accessTokenList.getTotal());
pageInfo.setTotalList(accessTokenList.getRecords());
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
@ -93,14 +98,11 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
public Map<String, Object> queryAccessTokenByUser(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
result.put(Constants.STATUS, false);
// no permission
if (loginUser.getUserType().equals(UserType.GENERAL_USER) && loginUser.getId() != userId) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
List<AccessToken> accessTokenList = Collections.EMPTY_LIST;
Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ACCESS_TOKEN, loginUser.getId(), logger);
if (!ids.isEmpty()) {
accessTokenList = this.accessTokenMapper.selectBatchIds(ids);
}
userId = loginUser.getUserType().equals(UserType.ADMIN_USER) ? 0 : userId;
// query access token for specified user
List<AccessToken> accessTokenList = this.accessTokenMapper.queryAccessTokenByUser(userId);
result.put(Constants.DATA_LIST, accessTokenList);
this.putMsg(result, Status.SUCCESS);
return result;
@ -152,6 +154,7 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
if (insert > 0) {
result.setData(accessToken);
putMsg(result, Status.SUCCESS);
resourcePermissionCheckService.postHandle(AuthorizationType.ACCESS_TOKEN, loginUser.getId(), new ArrayList<>(accessToken.getId()), logger);
} else {
putMsg(result, Status.CREATE_ACCESS_TOKEN_ERROR);
}
@ -170,10 +173,6 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
@Override
public Map<String, Object> generateToken(User loginUser, int userId, String expireTime) {
Map<String, Object> result = new HashMap<>();
if (!(canOperatorPermissions(loginUser,null, AuthorizationType.ACCESS_TOKEN, ACCESS_TOKEN_CREATE) || loginUser.getId() == userId)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
String token = EncryptionUtils.getMd5(userId + expireTime + System.currentTimeMillis());
result.put(Constants.DATA_LIST, token);
putMsg(result, Status.SUCCESS);
@ -190,6 +189,10 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
@Override
public Map<String, Object> delAccessTokenById(User loginUser, int id) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser,new Object[]{id},AuthorizationType.ACCESS_TOKEN,ACCESS_TOKEN_DELETE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
AccessToken accessToken = accessTokenMapper.selectById(id);
@ -198,10 +201,6 @@ public class AccessTokenServiceImpl extends BaseServiceImpl implements AccessTok
putMsg(result, Status.ACCESS_TOKEN_NOT_EXIST);
return result;
}
if (!canOperatorPermissions(loginUser,new Object[]{id},AuthorizationType.ACCESS_TOKEN,ACCESS_TOKEN_DELETE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
accessTokenMapper.deleteById(id);
putMsg(result, Status.SUCCESS);

64
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/AccessTokenServiceTest.java

@ -17,12 +17,15 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.*;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ACCESS_TOKEN_DELETE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.ACCESS_TOKEN_UPDATE;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.AccessTokenServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.utils.PageInfo;
@ -38,10 +41,11 @@ import org.apache.dolphinscheduler.dao.mapper.AccessTokenMapper;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.assertj.core.util.Lists;
import org.junit.Assert;
import org.junit.Test;
@ -50,6 +54,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,6 +68,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
public class AccessTokenServiceTest {
private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
private static final Logger logger = LoggerFactory.getLogger(AccessTokenServiceTest.class);
private static final Logger serviceLogger = LoggerFactory.getLogger(AccessTokenServiceImpl.class);
@InjectMocks
private AccessTokenServiceImpl accessTokenService;
@ -82,31 +88,34 @@ public class AccessTokenServiceTest {
User user = new User();
user.setId(1);
user.setUserType(UserType.ADMIN_USER);
when(accessTokenMapper.selectAccessTokenPage(any(Page.class), eq("zhangsan"), eq(0))).thenReturn(tokenPage);
Set<Integer> tokenIds = new HashSet<>();
tokenIds.add(1);
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ACCESS_TOKEN, user.getId(), serviceLogger)).thenReturn(new HashSet());
Result result = accessTokenService.queryAccessTokenList(user, "zhangsan", 1, 10);
PageInfo<AccessToken> pageInfo = (PageInfo<AccessToken>) result.getData();
assertEquals(0, (int) pageInfo.getTotal());
PowerMockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ACCESS_TOKEN, user.getId(), serviceLogger)).thenReturn(tokenIds);
Mockito.when(accessTokenMapper.selectAccessTokenPage(Mockito.any(Page.class), Mockito.anyList(),Mockito.eq("zhangsan"))).thenReturn(tokenPage);
result = accessTokenService.queryAccessTokenList(user, "zhangsan", 1, 10);
pageInfo = (PageInfo<AccessToken>) result.getData();
logger.info(result.toString());
Assert.assertTrue(pageInfo.getTotal() > 0);
}
@Test
public void testQueryAccessTokenByUser() {
List<AccessToken> accessTokenList = Lists.newArrayList(this.getEntity());
Mockito.when(this.accessTokenMapper.queryAccessTokenByUser(Mockito.anyInt())).thenReturn(accessTokenList);
// USER_NO_OPERATION_PERM
User user = this.getLoginUser();
user.setUserType(UserType.GENERAL_USER);
Map<String, Object> result = this.accessTokenService.queryAccessTokenByUser(user, 3);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS));
// SUCCESS
user.setUserType(UserType.ADMIN_USER);
result = this.accessTokenService.queryAccessTokenByUser(user, 1);
List<AccessToken> accessTokenList = Lists.newArrayList(this.getEntity());
Set<Integer> tokenIds = new HashSet<>();
tokenIds.add(1);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ACCESS_TOKEN, user.getId(), serviceLogger)).thenReturn(tokenIds);
Map<String, Object> result = this.accessTokenService.queryAccessTokenByUser(user, 1);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
@ -115,12 +124,12 @@ public class AccessTokenServiceTest {
when(accessTokenMapper.insert(any(AccessToken.class))).thenReturn(2);
Result result = accessTokenService.createToken(getLoginUser(), 1, getDate(), "AccessTokenServiceTest");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
// Token is absent
result = this.accessTokenService.createToken(getLoginUser(), 1, getDate(), null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
@Test
@ -128,11 +137,9 @@ public class AccessTokenServiceTest {
User user = new User();
user.setId(1);
user.setUserType(UserType.ADMIN_USER);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ACCESS_TOKEN, 1, ACCESS_TOKEN_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ACCESS_TOKEN, null, 0, baseServiceLogger)).thenReturn(true);
Map<String, Object> result = accessTokenService.generateToken(getLoginUser(), Integer.MAX_VALUE,getDate());
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
String token = (String) result.get(Constants.DATA_LIST);
Assert.assertNotNull(token);
}
@ -145,21 +152,22 @@ public class AccessTokenServiceTest {
userLogin.setId(1);
userLogin.setUserType(UserType.ADMIN_USER);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ACCESS_TOKEN, 1, ACCESS_TOKEN_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ACCESS_TOKEN, new Object[]{0}, 0, baseServiceLogger)).thenReturn(true);
// not exist
Map<String, Object> result = accessTokenService.delAccessTokenById(userLogin, 0);
logger.info(result.toString());
Assert.assertEquals(Status.ACCESS_TOKEN_NOT_EXIST, result.get(Constants.STATUS));
assertEquals(Status.ACCESS_TOKEN_NOT_EXIST, result.get(Constants.STATUS));
// no operate
result = accessTokenService.delAccessTokenById(userLogin, 1);
logger.info(result.toString());
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS));
assertEquals(Status.USER_NO_OPERATION_PERM, result.get(Constants.STATUS));
//success
userLogin.setId(1);
userLogin.setUserType(UserType.ADMIN_USER);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ACCESS_TOKEN, new Object[]{1}, 0, baseServiceLogger)).thenReturn(true);
result = accessTokenService.delAccessTokenById(userLogin, 1);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
@ -173,20 +181,20 @@ public class AccessTokenServiceTest {
when(accessTokenMapper.selectById(1)).thenReturn(getEntity());
Map<String, Object> result = accessTokenService.updateToken(getLoginUser(), 1,Integer.MAX_VALUE,getDate(),"token");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
Assert.assertNotNull(result.get(Constants.DATA_LIST));
// Token is absent
result = accessTokenService.updateToken(getLoginUser(), 1, Integer.MAX_VALUE,getDate(),null);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
Assert.assertNotNull(result.get(Constants.DATA_LIST));
// ACCESS_TOKEN_NOT_EXIST
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ACCESS_TOKEN, new Object[]{2}, 0, baseServiceLogger)).thenReturn(true);
result = accessTokenService.updateToken(getLoginUser(), 2,Integer.MAX_VALUE,getDate(),"token");
logger.info(result.toString());
Assert.assertEquals(Status.ACCESS_TOKEN_NOT_EXIST, result.get(Constants.STATUS));
assertEquals(Status.ACCESS_TOKEN_NOT_EXIST, result.get(Constants.STATUS));
}
private User getLoginUser() {

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.java

@ -37,13 +37,13 @@ public interface AccessTokenMapper extends BaseMapper<AccessToken> {
* access token page
*
* @param page page
* @param tokenIds tokenIds
* @param userName userName
* @param userId userId
* @return access token Ipage
*/
IPage<AccessToken> selectAccessTokenPage(Page page,
@Param("userName") String userName,
@Param("userId") int userId
@Param("ids") List<Integer> tokenIds,
@Param("userName") String userName
);
/**

7
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapper.xml

@ -26,8 +26,11 @@
<if test="userName != null and userName != ''">
and u.user_name like concat ('%', #{userName}, '%')
</if>
<if test="userId != 0">
and t.user_id = #{userId}
<if test="ids != null and ids.size() > 0">
and t.id in
<foreach item="id" index="index" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</if>
order by t.update_time desc
</select>

5
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/AccessTokenMapperTest.java

@ -31,8 +31,10 @@ import org.apache.dolphinscheduler.dao.entity.AccessToken;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Test;
@ -139,7 +141,8 @@ public class AccessTokenMapperTest extends BaseDaoTest {
Map<Integer, AccessToken> accessTokenMap = createAccessTokens(count, userName);
Page page = new Page(offset, size);
IPage<AccessToken> accessTokenPage = accessTokenMapper.selectAccessTokenPage(page, userName, 0);
List<Integer> tokenIds = accessTokenMap.values().stream().map(AccessToken::getId).collect(Collectors.toList());
IPage<AccessToken> accessTokenPage = accessTokenMapper.selectAccessTokenPage(page, tokenIds, userName);
assertEquals(Integer.valueOf(accessTokenPage.getRecords().size()), size);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -1523,6 +1523,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
if (state == ExecutionStatus.READY_BLOCK) {
ExecutionStatus executionStatus = processReadyBlock();
logger.info("The workflowInstance is ready to block, the workflowInstance status is {}", executionStatus);
return executionStatus;
}
// waiting thread
@ -1551,6 +1552,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
executionStatus = ExecutionStatus.SUCCESS;
}
logger.info("The workflowInstance is ready to stop, the workflow status is {}", executionStatus);
return executionStatus;
}
// process failure

Loading…
Cancel
Save