Browse Source

[Feature-6988][MasterServer] add cache manager for workflow (#7090)

* add cache manager for workflow

* [DS-6988][MasterServer] add cache manager for workflow

* cache evict code optimization

* test

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
ba2b2a67c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 102
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
  2. 128
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java
  4. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  5. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  6. 58
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java
  7. 12
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java
  8. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java
  9. 19
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java
  10. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
  11. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  12. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  13. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  14. 13
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  15. 29
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
  16. 46
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
  17. 132
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
  18. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  19. 28
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  20. 7
      dolphinscheduler-server/src/main/resources/application-master.yaml
  21. 19
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
  22. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java
  23. 20
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java
  24. 29
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java
  25. 22
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java
  26. 22
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java
  27. 26
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java
  28. 58
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java
  29. 50
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java
  30. 64
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java
  31. 59
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java
  32. 39
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  33. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java
  34. 60
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java
  35. 78
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java
  36. 76
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java
  37. 30
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

102
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.api.aspect;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
import java.lang.reflect.Method;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Component;
/**
* aspect for cache evict
*/
@Aspect
@Component
public class CacheEvictAspect {
private static final String UPDATE_BY_ID = "updateById";
@Autowired
private CacheKeyGenerator cacheKeyGenerator;
@Autowired
private CacheNotifyService cacheNotifyService;
@Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)")
public void cacheEvictPointCut() {
// Do nothing because of it's a pointcut
}
@Around("cacheEvictPointCut()")
public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
MethodSignature sign = (MethodSignature) proceedingJoinPoint.getSignature();
Method method = sign.getMethod();
Object target = proceedingJoinPoint.getTarget();
Object[] args = proceedingJoinPoint.getArgs();
Object result = proceedingJoinPoint.proceed();
CacheConfig cacheConfig = method.getDeclaringClass().getAnnotation(CacheConfig.class);
CacheEvict cacheEvict = method.getAnnotation(CacheEvict.class);
CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
if (cacheType != null) {
// todo use springEL is better
if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length == 1) {
Object updateObj = args[0];
cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, updateObj).convert2Command());
} else {
Object key = cacheKeyGenerator.generate(target, method, args);
cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command());
}
}
return result;
}
private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict cacheEvict) {
String cacheName = null;
if (cacheEvict.cacheNames().length > 0) {
cacheName = cacheEvict.cacheNames()[0];
}
if (cacheConfig.cacheNames().length > 0) {
cacheName = cacheConfig.cacheNames()[0];
}
if (cacheName == null) {
return null;
}
for (CacheType cacheType : CacheType.values()) {
if (cacheType.getCacheName().equals(cacheName)) {
return cacheType;
}
}
return null;
}
}

128
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -168,15 +168,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* create process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return create result code
*/
@ -338,7 +338,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @return definition list
*/
@ -360,7 +360,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition simple list
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @return definition simple list
*/
@ -390,12 +390,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition list paging
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return process definition page
*/
@Override
@ -433,9 +433,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query detail of process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return process definition detail
*/
@Override
@ -485,16 +485,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* update process definition
*
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param loginUser login user
* @param projectCode project code
* @param name process definition name
* @param code process definition code
* @param description description
* @param globalParams global params
* @param locations locations for nodes
* @param timeout timeout
* @param tenantCode tenantCode
* @param taskRelationJson relation json for nodes
* @param taskDefinitionJson taskDefinitionJson
* @return update result code
*/
@ -605,9 +605,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* verify process definition name unique
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param name name
* @param name name
* @return true if process definition name not exists, otherwise false
*/
@Override
@ -630,9 +630,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete process definition by code
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return delete result code
*/
@Override
@ -700,9 +700,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* release process definition: online / offline
*
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param releaseState release state
* @return release result code
*/
@ -841,9 +841,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* import process definition
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param file process metadata json file
* @param file process metadata json file
* @return import process
*/
@Override
@ -1051,9 +1051,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details based on process definition
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @param code process definition code
* @param code process definition code
* @return task node list
*/
@Override
@ -1080,9 +1080,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* get task node details map based on process definition
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @param codes define codes
* @param codes define codes
* @return task node list
*/
@Override
@ -1124,7 +1124,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query process definition all by project code
*
* @param loginUser loginUser
* @param loginUser loginUser
* @param projectCode project code
* @return process definitions in the project
*/
@ -1226,11 +1226,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
if (taskInstance.isSubProcess()) {
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
subProcessCode = Integer.parseInt(JSONUtils.parseObject(
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());
}
treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(),
taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(),
taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode));
}
}
for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) {
@ -1291,9 +1291,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch copy process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@ -1314,9 +1314,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* batch move process definition
*
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param loginUser loginUser
* @param projectCode projectCode
* @param codes processDefinitionCodes
* @param targetProjectCode targetProjectCode
*/
@Override
@ -1415,10 +1415,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* switch the defined process definition version
*
* @param loginUser login user
* @param loginUser login user
* @param projectCode project code
* @param code process definition code
* @param version the version user want to switch
* @param code process definition code
* @param version the version user want to switch
* @return switch process definition version result code
*/
@Override
@ -1454,11 +1454,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* check batch operate result
*
* @param srcProjectCode srcProjectCode
* @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode
* @param result result
* @param result result
* @param failedProcessList failedProcessList
* @param isCopy isCopy
* @param isCopy isCopy
*/
private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode,
Map<String, Object> result, List<String> failedProcessList, boolean isCopy) {
@ -1476,11 +1476,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* query the pagination versions info by one certain process definition code
*
* @param loginUser login user info to check auth
* @param loginUser login user info to check auth
* @param projectCode project code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @param pageNo page number
* @param pageSize page size
* @param code process definition code
* @return the pagination process definition versions info of the certain process definition
*/
@Override
@ -1510,10 +1510,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
/**
* delete one certain process definition by version number and process definition code
*
* @param loginUser login user info to check auth
* @param loginUser login user info to check auth
* @param projectCode project code
* @param code process definition code
* @param version version number
* @param code process definition code
* @param version version number
* @return delete result code
*/
@Override

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java

@ -22,13 +22,10 @@ import org.apache.dolphinscheduler.api.service.QueueService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.lang.StringUtils;
@ -59,9 +56,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
@Autowired
private UserMapper userMapper;
@Autowired
private CacheNotifyService cacheNotifyService;
/**
* query queue list
*
@ -229,8 +223,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService {
queueMapper.updateById(queueObj);
cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.QUEUE, queueObj).convert2Command());
putMsg(result, Status.SUCCESS);
return result;

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@ -70,9 +67,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
@Autowired
private UserMapper userMapper;
@Autowired
private CacheNotifyService cacheNotifyService;
/**
* create tenant
*
@ -220,9 +214,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
tenant.setUpdateTime(now);
tenantMapper.updateById(tenant);
// notify master to expire cache
cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command());
result.put(Constants.STATUS, Status.SUCCESS);
result.put(Constants.MSG, Status.SUCCESS.getMsg());
return result;
@ -282,9 +273,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
tenantMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByTenantId(id, -1);
// notify master to expire cache
cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command());
putMsg(result, Status.SUCCESS);
return result;
}

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
@ -53,8 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
@ -121,10 +118,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
@Autowired
private ProjectMapper projectMapper;
@Autowired
private CacheNotifyService cacheNotifyService;
/**
* create user, only system admin have permission
*
@ -479,7 +472,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
// updateProcessInstance user
userMapper.updateById(user);
cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
putMsg(result, Status.SUCCESS);
return result;
@ -531,10 +523,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
userMapper.deleteById(id);
if (user != null) {
cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
}
putMsg(result, Status.SUCCESS);
return result;
@ -1079,8 +1067,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
user.setUpdateTime(now);
userMapper.updateById(user);
cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command());
User responseUser = userMapper.queryByUserNameAccurately(userName);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, responseUser);

58
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.collections.CollectionUtils;
@ -61,9 +60,6 @@ public class QueueServiceTest {
@InjectMocks
private QueueServiceImpl queueService;
@Mock
private CacheNotifyService cacheNotifyService;
@Mock
private QueueMapper queueMapper;
@ -77,7 +73,7 @@ public class QueueServiceTest {
}
@After
public void after(){
public void after() {
}
@Test
@ -86,7 +82,7 @@ public class QueueServiceTest {
Mockito.when(queueMapper.selectList(null)).thenReturn(getQueueList());
Map<String, Object> result = queueService.queryList(getLoginUser());
logger.info(result.toString());
List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
List<Queue> queueList = (List<Queue>) result.get(Constants.DATA_LIST);
Assert.assertTrue(CollectionUtils.isNotEmpty(queueList));
}
@ -94,13 +90,13 @@ public class QueueServiceTest {
@Test
public void testQueryListPage() {
IPage<Queue> page = new Page<>(1,10);
IPage<Queue> page = new Page<>(1, 10);
page.setTotal(1L);
page.setRecords(getQueueList());
Mockito.when(queueMapper.queryQueuePaging(Mockito.any(Page.class), Mockito.eq(queueName))).thenReturn(page);
Result result = queueService.queryList(getLoginUser(),queueName,1,10);
Result result = queueService.queryList(getLoginUser(), queueName, 1, 10);
logger.info(result.toString());
PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
PageInfo<Queue> pageInfo = (PageInfo<Queue>) result.getData();
Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList()));
}
@ -108,17 +104,17 @@ public class QueueServiceTest {
public void testCreateQueue() {
// queue is null
Map<String, Object> result = queueService.createQueue(getLoginUser(),null,queueName);
Map<String, Object> result = queueService.createQueue(getLoginUser(), null, queueName);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
// queueName is null
result = queueService.createQueue(getLoginUser(),queueName,null);
result = queueService.createQueue(getLoginUser(), queueName, null);
logger.info(result.toString());
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS));
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS));
// correct
result = queueService.createQueue(getLoginUser(),queueName,queueName);
result = queueService.createQueue(getLoginUser(), queueName, queueName);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS));
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@ -130,25 +126,25 @@ public class QueueServiceTest {
Mockito.when(queueMapper.existQueue(null, "test")).thenReturn(true);
// not exist
Map<String, Object> result = queueService.updateQueue(getLoginUser(),0,"queue",queueName);
Map<String, Object> result = queueService.updateQueue(getLoginUser(), 0, "queue", queueName);
logger.info(result.toString());
Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
//no need update
result = queueService.updateQueue(getLoginUser(),1,queueName,queueName);
result = queueService.updateQueue(getLoginUser(), 1, queueName, queueName);
logger.info(result.toString());
Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(),((Status)result.get(Constants.STATUS)).getCode());
Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
//queue exist
result = queueService.updateQueue(getLoginUser(),1,"test",queueName);
result = queueService.updateQueue(getLoginUser(), 1, "test", queueName);
logger.info(result.toString());
Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
// queueName exist
result = queueService.updateQueue(getLoginUser(),1,"test1","test");
result = queueService.updateQueue(getLoginUser(), 1, "test1", "test");
logger.info(result.toString());
Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode());
Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
//success
result = queueService.updateQueue(getLoginUser(),1,"test1","test1");
result = queueService.updateQueue(getLoginUser(), 1, "test1", "test1");
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS.getCode(),((Status)result.get(Constants.STATUS)).getCode());
Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode());
}
@ -159,27 +155,27 @@ public class QueueServiceTest {
Mockito.when(queueMapper.existQueue(null, queueName)).thenReturn(true);
//queue null
Result result = queueService.verifyQueue(null,queueName);
Result result = queueService.verifyQueue(null, queueName);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
//queueName null
result = queueService.verifyQueue(queueName,null);
result = queueService.verifyQueue(queueName, null);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode());
//exist queueName
result = queueService.verifyQueue(queueName,queueName);
result = queueService.verifyQueue(queueName, queueName);
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_NAME_EXIST.getCode());
//exist queue
result = queueService.verifyQueue(queueName,"test");
result = queueService.verifyQueue(queueName, "test");
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_VALUE_EXIST.getCode());
// success
result = queueService.verifyQueue("test","test");
result = queueService.verifyQueue("test", "test");
logger.info(result.toString());
Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode());
@ -187,7 +183,6 @@ public class QueueServiceTest {
/**
* create admin user
* @return
*/
private User getLoginUser() {
@ -205,7 +200,6 @@ public class QueueServiceTest {
/**
* get queue
* @return
*/
private Queue getQueue() {
Queue queue = new Queue();

12
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java

@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.commons.collections.CollectionUtils;
@ -63,9 +62,6 @@ public class TenantServiceTest {
@InjectMocks
private TenantServiceImpl tenantService;
@Mock
private CacheNotifyService cacheNotifyService;
@Mock
private TenantMapper tenantMapper;
@ -88,7 +84,7 @@ public class TenantServiceTest {
try {
//check tenantCode
Map<String, Object> result =
tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest");
tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest");
logger.info(result.toString());
Assert.assertEquals(Status.CHECK_OS_TENANT_CODE_ERROR, result.get(Constants.STATUS));
@ -116,7 +112,7 @@ public class TenantServiceTest {
page.setRecords(getList());
page.setTotal(1L);
Mockito.when(tenantMapper.queryTenantPaging(Mockito.any(Page.class), Mockito.eq("TenantServiceTest")))
.thenReturn(page);
.thenReturn(page);
Result result = tenantService.queryTenantList(getLoginUser(), "TenantServiceTest", 1, 10);
logger.info(result.toString());
PageInfo<Tenant> pageInfo = (PageInfo<Tenant>) result.getData();
@ -131,7 +127,7 @@ public class TenantServiceTest {
try {
// id not exist
Map<String, Object> result =
tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc");
tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc");
logger.info(result.toString());
// success
Assert.assertEquals(Status.TENANT_NOT_EXIST, result.get(Constants.STATUS));
@ -150,7 +146,7 @@ public class TenantServiceTest {
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(processInstanceMapper.queryByTenantIdAndStatus(1, Constants.NOT_TERMINATED_STATES))
.thenReturn(getInstanceList());
.thenReturn(getInstanceList());
Mockito.when(processDefinitionMapper.queryDefinitionListByTenant(2)).thenReturn(getDefinitionsList());
Mockito.when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList());

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java

@ -43,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.collections.CollectionUtils;
@ -79,9 +78,6 @@ public class UsersServiceTest {
@InjectMocks
private UsersServiceImpl usersService;
@Mock
private CacheNotifyService cacheNotifyService;
@Mock
private UserMapper userMapper;

19
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java

@ -18,7 +18,20 @@
package org.apache.dolphinscheduler.common.enums;
public enum CacheType {
TENANT,
USER,
QUEUE;
TENANT("tenant"),
USER("user"),
QUEUE("queue"),
PROCESS_DEFINITION("processDefinition"),
PROCESS_TASK_RELATION("processTaskRelation"),
TASK_DEFINITION("taskDefinition");
CacheType(String cacheName) {
this.cacheName = cacheName;
}
private final String cacheName;
public String getCacheName() {
return cacheName;
}
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java

@ -42,6 +42,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
@Configuration
public class SpringConnectionFactory {
@Bean
public PaginationInterceptor paginationInterceptor() {
return new PaginationInterceptor();
@ -60,6 +61,7 @@ public class SpringConnectionFactory {
configuration.setCallSettersOnNulls(true);
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.addInterceptor(paginationInterceptor());
configuration.setGlobalConfig(new GlobalConfig().setBanner(false));
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -23,6 +23,9 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -30,6 +33,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process definition log mapper interface
*/
@CacheConfig(cacheNames = "processDefinition")
public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
/**
@ -66,6 +70,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
* @param version version number
* @return the process definition version info
*/
@Cacheable(sync = true, key = "#processDefinitionCode + '_' + #version")
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
/**

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
@ -28,13 +27,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* process definition mapper interface
*/
@CacheConfig(cacheNames = "processDefinition", keyGenerator = "cacheKeyGenerator")
public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
/**
@ -43,8 +46,15 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param code code
* @return process definition
*/
@Cacheable(sync = true)
ProcessDefinition queryByCode(@Param("code") long code);
/**
* update
*/
@CacheEvict
int updateById(@Param("et") ProcessDefinition processDefinition);
/**
* query process definition by code list
*
@ -59,6 +69,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param code code
* @return delete result
*/
@CacheEvict
int deleteByCode(@Param("code") long code);
/**

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -25,11 +25,16 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* process task relation mapper interface
*/
@CacheConfig(cacheNames = "processTaskRelation", keyGenerator = "cacheKeyGenerator")
public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelation> {
/**
@ -39,9 +44,16 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return ProcessTaskRelation list
*/
@Cacheable(sync = true)
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);
/**
* update
*/
@CacheEvict
int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
/**
* process task relation by taskCode
*
@ -65,6 +77,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return int
*/
@CacheEvict
int deleteByCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);

13
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -25,6 +25,10 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@ -32,6 +36,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task definition log mapper interface
*/
@CacheConfig(cacheNames = "taskDefinition", keyGenerator = "cacheKeyGenerator")
public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
/**
@ -48,9 +53,16 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version version
* @return task definition log
*/
@Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion")
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
@Param("version") int version);
/**
* update
*/
@CacheEvict
int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
/**
* @param taskDefinitions taskDefinition list
* @return list
@ -72,6 +84,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version task definition version
* @return delete result
*/
@CacheEvict
int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version);
/**

29
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java

@ -14,29 +14,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* tenant mapper interface
*/
@CacheConfig(cacheNames = "tenant", keyGenerator = "cacheKeyGenerator")
public interface TenantMapper extends BaseMapper<Tenant> {
/**
* query tenant by id
*
* @param tenantId tenantId
* @return tenant
*/
@Cacheable(sync = true)
Tenant queryById(@Param("tenantId") int tenantId);
/**
* delete by id
*/
@CacheEvict
int deleteById(int id);
/**
* update
*/
@CacheEvict
int updateById(@Param("et") Tenant tenant);
/**
* query tenant by code
*
* @param tenantCode tenantCode
* @return tenant
*/
@ -44,6 +65,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* tenant page
*
* @param page page
* @param searchVal searchVal
* @return tenant IPage
@ -53,6 +75,7 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* check tenant exist
*
* @param tenantCode tenantCode
* @return true if exist else return null
*/

46
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java

@ -14,29 +14,57 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.User;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* user mapper interface
*/
@CacheConfig(cacheNames = "user", keyGenerator = "cacheKeyGenerator")
public interface UserMapper extends BaseMapper<User> {
/**
* select by user id
*/
@Cacheable(sync = true)
User selectById(int id);
/**
* delete by id
*/
@CacheEvict
int deleteById(int id);
/**
* update
*/
@CacheEvict
int updateById(@Param("et") User user);
/**
* query all general user
*
* @return user list
*/
List<User> queryAllGeneralUser();
/**
* query user by name
*
* @param userName userName
* @return user
*/
@ -44,6 +72,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by userName and password
*
* @param userName userName
* @param password password
* @return user
@ -53,6 +82,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* user page
*
* @param page page
* @param userName userName
* @return user IPage
@ -62,6 +92,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user detail by id
*
* @param userId userId
* @return user
*/
@ -69,6 +100,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user list by alertgroupId
*
* @param alertgroupId alertgroupId
* @return user list
*/
@ -76,6 +108,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user list by tenantId
*
* @param tenantId tenantId
* @return user list
*/
@ -83,6 +116,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by userId
*
* @param userId userId
* @return user
*/
@ -90,6 +124,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by token
*
* @param token token
* @return user
*/
@ -97,6 +132,7 @@ public interface UserMapper extends BaseMapper<User> {
/**
* query user by queue name
*
* @param queueName queue name
* @return user list
*/
@ -104,13 +140,15 @@ public interface UserMapper extends BaseMapper<User> {
/**
* check the user exist
* @param queueName queue name
*
* @param queue queue name
* @return true if exist else return null
*/
Boolean existUser(@Param("queue") String queue);
/**
* update user with old queue
*
* @param oldQueue old queue name
* @param newQueue new queue name
* @return update rows

132
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java

@ -17,16 +17,24 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import com.google.common.base.Preconditions;
@ -39,11 +47,7 @@ public class CacheProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class);
private CacheProcessorFactory cacheProcessorFactory;
public CacheProcessor() {
this.cacheProcessorFactory = SpringApplicationContext.getBean(CacheProcessorFactory.class);
}
private CacheManager cacheManager;
@Override
public void process(Channel channel, Command command) {
@ -53,6 +57,120 @@ public class CacheProcessor implements NettyRequestProcessor {
logger.info("received command : {}", cacheExpireCommand);
cacheProcessorFactory.getCacheProcessor(cacheExpireCommand.getCacheType()).cacheExpire(cacheExpireCommand.getUpdateObjClass(), cacheExpireCommand.getUpdateObjJson());
this.cacheExpire(cacheExpireCommand);
}
private void cacheExpire(CacheExpireCommand cacheExpireCommand) {
if (cacheManager == null) {
cacheManager = SpringApplicationContext.getBean(CacheManager.class);
}
Object object = JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), cacheExpireCommand.getUpdateObjClass());
if (object == null) {
return;
}
CacheType cacheType = cacheExpireCommand.getCacheType();
switch (cacheType) {
case TENANT:
if (object instanceof Tenant) {
Tenant tenant = (Tenant) object;
tenantCacheExpire(tenant);
}
break;
case USER:
if (object instanceof User) {
User user = (User) object;
userCacheExpire(user);
}
break;
case QUEUE:
if (object instanceof Queue) {
Queue queue = (Queue) object;
queueCacheExpire(queue);
}
break;
case PROCESS_DEFINITION:
if (object instanceof ProcessDefinition) {
ProcessDefinition processDefinition = (ProcessDefinition) object;
processDefinitionCacheExpire(processDefinition);
}
break;
case TASK_DEFINITION:
if (object instanceof TaskDefinition) {
TaskDefinition taskDefinition = (TaskDefinition) object;
taskDefinitionCacheExpire(taskDefinition);
}
break;
case PROCESS_TASK_RELATION:
if (object instanceof ProcessTaskRelation) {
ProcessTaskRelation processTaskRelation = (ProcessTaskRelation) object;
processTaskRelationCacheExpire(processTaskRelation);
}
break;
default:
logger.error("no support cache type:{}", cacheType);
}
// if delete operation, just send key
if (object instanceof String) {
Cache cache = cacheManager.getCache(cacheType.getCacheName());
if (cache != null) {
cache.evict(object);
logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), object);
}
}
}
private void tenantCacheExpire(Tenant tenant) {
Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName());
if (cache != null) {
cache.evict(tenant.getId());
logger.info("cache evict, type:{}, key:{}", CacheType.TENANT.getCacheName(), tenant.getId());
}
}
private void userCacheExpire(User user) {
Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
if (cache != null) {
cache.evict(user.getId());
logger.info("cache evict, type:{}, key:{}", CacheType.USER.getCacheName(), user.getId());
}
}
private void queueCacheExpire(Queue queue) {
Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
if (cache != null) {
cache.clear();
logger.info("cache evict, type:{}, clear", CacheType.USER.getCacheName());
}
}
private void processDefinitionCacheExpire(ProcessDefinition processDefinition) {
Cache cache = cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName());
if (cache != null) {
cache.evict(processDefinition.getCode());
cache.evict(processDefinition.getCode() + "_" + processDefinition.getVersion());
logger.info("cache evict, type:{}, key:{}",
CacheType.PROCESS_DEFINITION.getCacheName(), processDefinition.getCode() + "_" + processDefinition.getVersion());
}
}
private void processTaskRelationCacheExpire(ProcessTaskRelation processTaskRelation) {
Cache cache = cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName());
if (cache != null) {
cache.evict(processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
logger.info("cache evict, type:{}, key:{}",
CacheType.PROCESS_TASK_RELATION.getCacheName(), processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
}
}
private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) {
Cache cache = cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName());
if (cache != null) {
cache.evict(taskDefinition.getCode() + "_" + taskDefinition.getVersion());
logger.info("cache evict, type:{}, key:{}",
CacheType.TASK_DEFINITION.getCacheName(), taskDefinition.getCode() + "_" + taskDefinition.getVersion());
}
}
}

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -118,12 +118,6 @@ public class MasterSchedulerService extends Thread {
*/
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
/**
* key:code-version
* value: processDefinition
*/
HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@ -195,10 +189,6 @@ public class MasterSchedulerService extends Thread {
return;
}
if (!masterConfig.isCacheProcessDefinition() && processDefinitionCacheMaps.size() > 0) {
processDefinitionCacheMaps.clear();
}
List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
return;
@ -245,8 +235,7 @@ public class MasterSchedulerService extends Thread {
try {
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command,
processDefinitionCacheMaps);
command);
if (processInstance != null) {
processInstances[index] = processInstance;
logger.info("handle command command {} end, create process instance {}",

28
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -737,19 +737,21 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskMap.clear();
errorTaskMap.clear();
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) {
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
taskInstanceMap.put(task.getId(), task);
if (task.isTaskComplete()) {
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) {
continue;
}
if (task.getState().typeIsFailure() && !task.taskCanRetry()) {
errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
}
}
}

7
dolphinscheduler-server/src/main/resources/application-master.yaml

@ -19,12 +19,15 @@ spring:
name: master-server
cache:
# default enable cache, you can disable by `type: none`
type: caffeine
type: none
cache-names:
- tenant
- user
- processDefinition
- processTaskRelation
- taskDefinition
caffeine:
spec: maximumSize=100,expireAfterWrite=60s,recordStats
spec: maximumSize=100,expireAfterWrite=300s,recordStats
master:
listen-port: 5678

19
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java

@ -22,19 +22,17 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory;
import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import io.netty.channel.Channel;
@ -47,21 +45,20 @@ public class CacheProcessorTest {
private CacheProcessor cacheProcessor;
@InjectMocks
private TenantCacheProcessorImpl tenantCacheProcessor;
@Mock
private Channel channel;
@Mock
private CacheProcessorFactory cacheProcessorFactory;
private CacheManager cacheManager;
@Mock
private Cache cache;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor);
PowerMockito.when(SpringApplicationContext.getBean(CacheProcessorFactory.class)).thenReturn(cacheProcessorFactory);
Mockito.when(cacheProcessorFactory.getCacheProcessor(CacheType.TENANT)).thenReturn(tenantCacheProcessor);
PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager);
Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache);
cacheProcessor = new CacheProcessor();
}

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java vendored

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.service;
package org.apache.dolphinscheduler.service.cache;
import org.apache.dolphinscheduler.remote.command.Command;

20
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java vendored

@ -15,12 +15,22 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
package org.apache.dolphinscheduler.service.cache.impl;
import org.apache.dolphinscheduler.dao.entity.User;
import java.lang.reflect.Method;
public interface UserCacheProcessor extends BaseCacheProcessor {
void update(int userId);
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
User selectById(int userId);
/**
* custom cache key generator
*/
@Component
public class CacheKeyGenerator implements KeyGenerator {
@Override
public Object generate(Object target, Method method, Object... params) {
return StringUtils.arrayToDelimitedString(params, "_");
}
}

29
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java → dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java vendored

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.service.impl;
package org.apache.dolphinscheduler.service.cache.impl;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections4.CollectionUtils;
@ -114,19 +114,22 @@ public class CacheNotifyServiceImpl implements CacheNotifyService {
@Override
public void notifyMaster(Command command) {
logger.info("send result, command:{}", command.toString());
try {
List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
if (CollectionUtils.isEmpty(serverList)) {
return;
}
List<Server> serverList = registryClient.getServerList(NodeType.MASTER);
if (CollectionUtils.isEmpty(serverList)) {
return;
}
for (Server server : serverList) {
Host host = new Host(server.getHost(), server.getPort());
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
if (nettyRemoteChannel == null) {
continue;
for (Server server : serverList) {
Host host = new Host(server.getHost(), server.getPort());
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host);
if (nettyRemoteChannel == null) {
continue;
}
nettyRemoteChannel.writeAndFlush(command);
}
nettyRemoteChannel.writeAndFlush(command);
} catch (Exception e) {
logger.error("notify master error", e);
}
}
}

22
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java vendored

@ -1,22 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
public interface BaseCacheProcessor {
void cacheExpire(Class updateObjClass, String updateObjJson);
}

22
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java vendored

@ -1,22 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
public interface QueueCacheProcessor extends BaseCacheProcessor {
public void expireAllUserCache();
}

26
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java vendored

@ -1,26 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
import org.apache.dolphinscheduler.dao.entity.Tenant;
public interface TenantCacheProcessor extends BaseCacheProcessor {
void update(int tenantId);
Tenant queryById(int tenantId);
}

58
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java vendored

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor.impl;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.service.cache.processor.BaseCacheProcessor;
import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor;
import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CacheProcessorFactory {
@Autowired
private TenantCacheProcessor tenantCacheProcessor;
@Autowired
private UserCacheProcessor userCacheProcessor;
@Autowired
private QueueCacheProcessor queueCacheProcessor;
Map<CacheType, BaseCacheProcessor> cacheProcessorMap = new ConcurrentHashMap<>();
@PostConstruct
private void init() {
cacheProcessorMap.put(CacheType.TENANT, tenantCacheProcessor);
cacheProcessorMap.put(CacheType.USER, userCacheProcessor);
cacheProcessorMap.put(CacheType.QUEUE, queueCacheProcessor);
}
public BaseCacheProcessor getCacheProcessor(CacheType cacheType) {
return cacheProcessorMap.get(cacheType);
}
}

50
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java vendored

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor.impl;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.stereotype.Component;
@Component
public class QueueCacheProcessorImpl implements QueueCacheProcessor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
@CacheEvict(cacheNames = "user", allEntries = true)
public void expireAllUserCache() {
// just evict cache
logger.debug("expire all user cache");
}
@Override
public void cacheExpire(Class updateObjClass, String updateObjJson) {
Queue updateQueue = (Queue) JSONUtils.parseObject(updateObjJson, updateObjClass);
if (updateQueue == null) {
return;
}
SpringApplicationContext.getBean(QueueCacheProcessor.class).expireAllUserCache();
}
}

64
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java vendored

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor.impl;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;
@Component
@CacheConfig(cacheNames = "tenant")
public class TenantCacheProcessorImpl implements TenantCacheProcessor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private TenantMapper tenantMapper;
@Override
@CacheEvict
public void update(int tenantId) {
// just evict cache
}
@Override
@Cacheable(sync = true)
public Tenant queryById(int tenantId) {
logger.debug("tenant cache proxy, tenantId:{}", tenantId);
return tenantMapper.queryById(tenantId);
}
@Override
public void cacheExpire(Class updateObjClass, String updateObjJson) {
Tenant updateTenant = (Tenant) JSONUtils.parseObject(updateObjJson, updateObjClass);
if (updateTenant == null) {
return;
}
SpringApplicationContext.getBean(TenantCacheProcessor.class).update(updateTenant.getId());
}
}

59
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java vendored

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor.impl;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Component;
@Component
@CacheConfig(cacheNames = "user")
public class UserCacheProcessorImpl implements UserCacheProcessor {
@Autowired
private UserMapper userMapper;
@Override
@CacheEvict
public void update(int userId) {
// just evict cache
}
@Override
@Cacheable(sync = true)
public User selectById(int userId) {
return userMapper.selectById(userId);
}
@Override
public void cacheExpire(Class updateObjClass, String updateObjJson) {
User user = (User) JSONUtils.parseObject(updateObjJson, updateObjClass);
if (user == null) {
return;
}
SpringApplicationContext.getBean(UserCacheProcessor.class).update(user.getId());
}
}

39
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -100,15 +100,15 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor;
import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
@ -155,7 +155,7 @@ public class ProcessService {
ExecutionStatus.READY_STOP.ordinal()};
@Autowired
private UserCacheProcessor userCacheProcessor;
private UserMapper userMapper;
@Autowired
private ProcessDefinitionMapper processDefineMapper;
@ -194,7 +194,7 @@ public class ProcessService {
private ErrorCommandMapper errorCommandMapper;
@Autowired
private TenantCacheProcessor tenantCacheProcessor;
private TenantMapper tenantMapper;
@Autowired
private ProjectMapper projectMapper;
@ -233,8 +233,8 @@ public class ProcessService {
* @return process instance
*/
@Transactional
public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps);
public ProcessInstance handleCommand(Logger logger, String host, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
logger.error("scan command, command parameter is error: {}", command);
@ -739,7 +739,7 @@ public class ProcessService {
public Tenant getTenantForProcess(int tenantId, int userId) {
Tenant tenant = null;
if (tenantId >= 0) {
tenant = tenantCacheProcessor.queryById(tenantId);
tenant = tenantMapper.queryById(tenantId);
}
if (userId == 0) {
@ -747,8 +747,8 @@ public class ProcessService {
}
if (tenant == null) {
User user = userCacheProcessor.selectById(userId);
tenant = tenantCacheProcessor.queryById(user.getTenantId());
User user = userMapper.selectById(userId);
tenant = tenantMapper.queryById(user.getTenantId());
}
return tenant;
}
@ -794,19 +794,12 @@ public class ProcessService {
* @param host host
* @return process instance
*/
private ProcessInstance constructProcessInstance(Command command, String host, HashMap<String, ProcessDefinition> processDefinitionCacheMaps) {
private ProcessInstance constructProcessInstance(Command command, String host) {
ProcessInstance processInstance;
ProcessDefinition processDefinition;
CommandType commandType = command.getCommandType();
String key = String.format("%d-%d", command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinitionCacheMaps.containsKey(key)) {
processDefinition = processDefinitionCacheMaps.get(key);
} else {
processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinition != null) {
processDefinitionCacheMaps.put(key, processDefinition);
}
}
processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion());
if (processDefinition == null) {
logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode());
return null;
@ -1964,11 +1957,11 @@ public class ProcessService {
return StringUtils.EMPTY;
}
int userId = resourceList.get(0).getUserId();
User user = userCacheProcessor.selectById(userId);
User user = userMapper.selectById(userId);
if (Objects.isNull(user)) {
return StringUtils.EMPTY;
}
Tenant tenant = tenantCacheProcessor.queryById(user.getTenantId());
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (Objects.isNull(tenant)) {
return StringUtils.EMPTY;
}
@ -2038,7 +2031,7 @@ public class ProcessService {
if (processInstance == null) {
return queue;
}
User executor = userCacheProcessor.selectById(processInstance.getExecutorId());
User executor = userMapper.selectById(processInstance.getExecutorId());
if (executor != null) {
queue = executor.getQueue();
}
@ -2149,7 +2142,7 @@ public class ProcessService {
* @return User
*/
public User getUserById(int userId) {
return userCacheProcessor.selectById(userId);
return userMapper.selectById(userId);
}
/**

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java vendored

@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.service.cache.service.impl.CacheNotifyServiceImpl;
import org.apache.dolphinscheduler.service.cache.impl.CacheNotifyServiceImpl;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;

60
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java vendored

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.impl.QueueCacheProcessorImpl;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* tenant cache proxy test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class QueueCacheProcessorTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
@InjectMocks
private QueueCacheProcessorImpl queueCacheProcessor;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(QueueCacheProcessor.class)).thenReturn(queueCacheProcessor);
}
@Test
public void testCacheExpire() {
Queue queue = new Queue();
queue.setId(100);
queueCacheProcessor.cacheExpire(Queue.class, JSONUtils.toJsonString(queue));
}
}

78
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java vendored

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* tenant cache proxy test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class TenantCacheProcessorTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
@InjectMocks
private TenantCacheProcessorImpl tenantCacheProcessor;
@Mock
private TenantMapper tenantMapper;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor);
}
@Test
public void testQueryById() {
Tenant tenant1 = new Tenant();
tenant1.setId(100);
tenant1.setDescription("test1");
Mockito.when(tenantMapper.queryById(100)).thenReturn(tenant1);
Assert.assertEquals(tenant1, tenantCacheProcessor.queryById(100));
}
@Test
public void testCacheExpire() {
Tenant tenant1 = new Tenant();
tenant1.setId(100);
tenant1.setDescription("test1");
tenantCacheProcessor.cacheExpire(Tenant.class, JSONUtils.toJsonString(tenant1));
}
}

76
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java vendored

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.cache.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* tenant cache proxy test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class UserCacheProxyTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
@InjectMocks
private UserCacheProcessorImpl userCacheProcessor;
@Mock
private UserMapper userMapper;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(UserCacheProcessor.class)).thenReturn(userCacheProcessor);
}
@Test
public void testQueryById() {
User user1 = new User();
user1.setId(100);
Mockito.when(userMapper.selectById(100)).thenReturn(user1);
Assert.assertEquals(user1, userCacheProcessor.selectById(100));
}
@Test
public void testCacheExpire() {
User user = new User();
user.setId(100);
userCacheProcessor.cacheExpire(User.class, JSONUtils.toJsonString(user));
}
}

30
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -64,7 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
@ -116,7 +116,7 @@ public class ProcessServiceTest {
@Mock
private ProcessInstanceMapper processInstanceMapper;
@Mock
private UserCacheProcessorImpl userCacheProcessor;
private UserMapper userMapper;
@Mock
private TaskInstanceMapper taskInstanceMapper;
@Mock
@ -134,8 +134,6 @@ public class ProcessServiceTest {
@Mock
private TaskGroupQueueMapper taskGroupQueueMapper;
private HashMap<String, ProcessDefinition> processDefinitionCacheMaps = new HashMap<>();
@Test
public void testCreateSubCommand() {
ProcessInstance parentInstance = new ProcessInstance();
@ -263,7 +261,7 @@ public class ProcessServiceTest {
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps));
Assert.assertNull(processService.handleCommand(logger, host, command));
int definitionVersion = 1;
long definitionCode = 123;
@ -298,7 +296,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
Assert.assertNotNull(processService.handleCommand(logger, host, command1));
Command command2 = new Command();
command2.setId(2);
@ -308,7 +306,7 @@ public class ProcessServiceTest {
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps));
Assert.assertNotNull(processService.handleCommand(logger, host, command2));
Command command3 = new Command();
command3.setId(3);
@ -318,7 +316,7 @@ public class ProcessServiceTest {
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps));
Assert.assertNotNull(processService.handleCommand(logger, host, command3));
Command command4 = new Command();
command4.setId(4);
@ -328,7 +326,7 @@ public class ProcessServiceTest {
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps));
Assert.assertNotNull(processService.handleCommand(logger, host, command4));
Command command5 = new Command();
command5.setId(5);
@ -342,7 +340,7 @@ public class ProcessServiceTest {
command5.setCommandType(CommandType.START_PROCESS);
command5.setDryRun(Constants.DRY_RUN_FLAG_NO);
Mockito.when(commandMapper.deleteById(5)).thenReturn(1);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps);
ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5);
Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\""));
ProcessDefinition processDefinition1 = new ProcessDefinition();
@ -367,7 +365,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps));
Assert.assertNotNull(processService.handleCommand(logger, host, command1));
Command command6 = new Command();
command6.setId(6);
@ -378,7 +376,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
Assert.assertTrue(processInstance6 != null);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
@ -397,7 +395,7 @@ public class ProcessServiceTest {
command7.setProcessDefinitionVersion(1);
Mockito.when(commandMapper.deleteById(7)).thenReturn(1);
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps);
ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7);
Assert.assertTrue(processInstance8 == null);
ProcessDefinition processDefinition2 = new ProcessDefinition();
@ -421,7 +419,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);
Assert.assertTrue(processInstance10 == null);
}
@ -462,14 +460,14 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
// will throw exception when command id is 0 and delete fail
processService.handleCommand(logger, host, command1, processDefinitionCacheMaps);
processService.handleCommand(logger, host, command1);
}
@Test
public void testGetUserById() {
User user = new User();
user.setId(123);
Mockito.when(userCacheProcessor.selectById(123)).thenReturn(user);
Mockito.when(userMapper.selectById(123)).thenReturn(user);
Assert.assertEquals(user, processService.getUserById(123));
}

Loading…
Cancel
Save