diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java new file mode 100644 index 0000000000..a989b6cecd --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.aspect; + +import org.apache.dolphinscheduler.common.enums.CacheType; +import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; +import org.apache.dolphinscheduler.service.cache.CacheNotifyService; +import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator; + +import java.lang.reflect.Method; + +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.stereotype.Component; + +/** + * aspect for cache evict + */ +@Aspect +@Component +public class CacheEvictAspect { + + private static final String UPDATE_BY_ID = "updateById"; + + @Autowired + private CacheKeyGenerator cacheKeyGenerator; + + @Autowired + private CacheNotifyService cacheNotifyService; + + @Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)") + public void cacheEvictPointCut() { + // Do nothing because of it's a pointcut + } + + @Around("cacheEvictPointCut()") + public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { + MethodSignature sign = (MethodSignature) proceedingJoinPoint.getSignature(); + Method method = sign.getMethod(); + Object target = proceedingJoinPoint.getTarget(); + Object[] args = proceedingJoinPoint.getArgs(); + + Object result = proceedingJoinPoint.proceed(); + + CacheConfig cacheConfig = method.getDeclaringClass().getAnnotation(CacheConfig.class); + CacheEvict cacheEvict = method.getAnnotation(CacheEvict.class); + + CacheType cacheType = getCacheType(cacheConfig, cacheEvict); + if (cacheType != null) { + // todo use springEL is better + if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length == 1) { + Object updateObj = args[0]; + cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, updateObj).convert2Command()); + } else { + Object key = cacheKeyGenerator.generate(target, method, args); + cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command()); + } + } + + return result; + } + + private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict cacheEvict) { + String cacheName = null; + if (cacheEvict.cacheNames().length > 0) { + cacheName = cacheEvict.cacheNames()[0]; + } + if (cacheConfig.cacheNames().length > 0) { + cacheName = cacheConfig.cacheNames()[0]; + } + if (cacheName == null) { + return null; + } + for (CacheType cacheType : CacheType.values()) { + if (cacheType.getCacheName().equals(cacheName)) { + return cacheType; + } + } + return null; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index e6a20b030b..c4478748c6 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -168,15 +168,15 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * create process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param description description - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode - * @param taskRelationJson relation json for nodes + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson * @return create result code */ @@ -338,7 +338,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition list * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @return definition list */ @@ -360,7 +360,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition simple list * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code * @return definition simple list */ @@ -390,12 +390,12 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition list paging * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param searchVal search value - * @param userId user id - * @param pageNo page number - * @param pageSize page size + * @param searchVal search value + * @param userId user id + * @param pageNo page number + * @param pageSize page size * @return process definition page */ @Override @@ -433,9 +433,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query detail of process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return process definition detail */ @Override @@ -485,16 +485,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * update process definition * - * @param loginUser login user - * @param projectCode project code - * @param name process definition name - * @param code process definition code - * @param description description - * @param globalParams global params - * @param locations locations for nodes - * @param timeout timeout - * @param tenantCode tenantCode - * @param taskRelationJson relation json for nodes + * @param loginUser login user + * @param projectCode project code + * @param name process definition name + * @param code process definition code + * @param description description + * @param globalParams global params + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @param taskDefinitionJson taskDefinitionJson * @return update result code */ @@ -605,9 +605,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * verify process definition name unique * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param name name + * @param name name * @return true if process definition name not exists, otherwise false */ @Override @@ -630,9 +630,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete process definition by code * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return delete result code */ @Override @@ -700,9 +700,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * release process definition: online / offline * - * @param loginUser login user - * @param projectCode project code - * @param code process definition code + * @param loginUser login user + * @param projectCode project code + * @param code process definition code * @param releaseState release state * @return release result code */ @@ -841,9 +841,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * import process definition * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param file process metadata json file + * @param file process metadata json file * @return import process */ @Override @@ -1051,9 +1051,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param code process definition code + * @param code process definition code * @return task node list */ @Override @@ -1080,9 +1080,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * get task node details map based on process definition * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code - * @param codes define codes + * @param codes define codes * @return task node list */ @Override @@ -1124,7 +1124,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query process definition all by project code * - * @param loginUser loginUser + * @param loginUser loginUser * @param projectCode project code * @return process definitions in the project */ @@ -1226,11 +1226,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro if (taskInstance.isSubProcess()) { TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode()); subProcessCode = Integer.parseInt(JSONUtils.parseObject( - taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText()); + taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText()); } treeViewDto.getInstances().add(new Instance(taskInstance.getId(), taskInstance.getName(), taskInstance.getTaskCode(), - taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), - taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode)); + taskInstance.getTaskType(), taskInstance.getState().toString(), taskInstance.getStartTime(), taskInstance.getEndTime(), + taskInstance.getHost(), DateUtils.format2Readable(endTime.getTime() - startTime.getTime()), subProcessCode)); } } for (TreeViewDto pTreeViewDto : parentTreeViewDtoList) { @@ -1291,9 +1291,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch copy process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1314,9 +1314,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * batch move process definition * - * @param loginUser loginUser - * @param projectCode projectCode - * @param codes processDefinitionCodes + * @param loginUser loginUser + * @param projectCode projectCode + * @param codes processDefinitionCodes * @param targetProjectCode targetProjectCode */ @Override @@ -1415,10 +1415,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * switch the defined process definition version * - * @param loginUser login user + * @param loginUser login user * @param projectCode project code - * @param code process definition code - * @param version the version user want to switch + * @param code process definition code + * @param version the version user want to switch * @return switch process definition version result code */ @Override @@ -1454,11 +1454,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * check batch operate result * - * @param srcProjectCode srcProjectCode + * @param srcProjectCode srcProjectCode * @param targetProjectCode targetProjectCode - * @param result result + * @param result result * @param failedProcessList failedProcessList - * @param isCopy isCopy + * @param isCopy isCopy */ private void checkBatchOperateResult(long srcProjectCode, long targetProjectCode, Map result, List failedProcessList, boolean isCopy) { @@ -1476,11 +1476,11 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * query the pagination versions info by one certain process definition code * - * @param loginUser login user info to check auth + * @param loginUser login user info to check auth * @param projectCode project code - * @param pageNo page number - * @param pageSize page size - * @param code process definition code + * @param pageNo page number + * @param pageSize page size + * @param code process definition code * @return the pagination process definition versions info of the certain process definition */ @Override @@ -1510,10 +1510,10 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro /** * delete one certain process definition by version number and process definition code * - * @param loginUser login user info to check auth + * @param loginUser login user info to check auth * @param projectCode project code - * @param code process definition code - * @param version version number + * @param code process definition code + * @param version version number * @return delete result code */ @Override diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java index 14c57ccf2e..2da89df000 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/QueueServiceImpl.java @@ -22,13 +22,10 @@ import org.apache.dolphinscheduler.api.service.QueueService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.QueueMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.commons.lang.StringUtils; @@ -59,9 +56,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { @Autowired private UserMapper userMapper; - @Autowired - private CacheNotifyService cacheNotifyService; - /** * query queue list * @@ -229,8 +223,6 @@ public class QueueServiceImpl extends BaseServiceImpl implements QueueService { queueMapper.updateById(queueObj); - cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.QUEUE, queueObj).convert2Command()); - putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index 58f6bbc131..0601725797 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -34,8 +33,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -70,9 +67,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService @Autowired private UserMapper userMapper; - @Autowired - private CacheNotifyService cacheNotifyService; - /** * create tenant * @@ -220,9 +214,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService tenant.setUpdateTime(now); tenantMapper.updateById(tenant); - // notify master to expire cache - cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command()); - result.put(Constants.STATUS, Status.SUCCESS); result.put(Constants.MSG, Status.SUCCESS.getMsg()); return result; @@ -282,9 +273,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService tenantMapper.deleteById(id); processInstanceMapper.updateProcessInstanceByTenantId(id, -1); - // notify master to expire cache - cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.TENANT, tenant).convert2Command()); - putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index ff5dc7de93..5199d4f39d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.api.utils.CheckUtils; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.EncryptionUtils; @@ -53,8 +52,6 @@ import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.ResourceProcessDefinitionUtils; -import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; @@ -121,10 +118,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { @Autowired private ProjectMapper projectMapper; - @Autowired - private CacheNotifyService cacheNotifyService; - - /** * create user, only system admin have permission * @@ -479,7 +472,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { // updateProcessInstance user userMapper.updateById(user); - cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command()); putMsg(result, Status.SUCCESS); return result; @@ -531,10 +523,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { userMapper.deleteById(id); - if (user != null) { - cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command()); - } - putMsg(result, Status.SUCCESS); return result; @@ -1079,8 +1067,6 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { user.setUpdateTime(now); userMapper.updateById(user); - cacheNotifyService.notifyMaster(new CacheExpireCommand(CacheType.USER, user).convert2Command()); - User responseUser = userMapper.queryByUserNameAccurately(userName); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, responseUser); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java index 7f1a2ed54d..f3167a560a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/QueueServiceTest.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.QueueMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.commons.collections.CollectionUtils; @@ -61,9 +60,6 @@ public class QueueServiceTest { @InjectMocks private QueueServiceImpl queueService; - @Mock - private CacheNotifyService cacheNotifyService; - @Mock private QueueMapper queueMapper; @@ -77,7 +73,7 @@ public class QueueServiceTest { } @After - public void after(){ + public void after() { } @Test @@ -86,7 +82,7 @@ public class QueueServiceTest { Mockito.when(queueMapper.selectList(null)).thenReturn(getQueueList()); Map result = queueService.queryList(getLoginUser()); logger.info(result.toString()); - List queueList = (List) result.get(Constants.DATA_LIST); + List queueList = (List) result.get(Constants.DATA_LIST); Assert.assertTrue(CollectionUtils.isNotEmpty(queueList)); } @@ -94,13 +90,13 @@ public class QueueServiceTest { @Test public void testQueryListPage() { - IPage page = new Page<>(1,10); + IPage page = new Page<>(1, 10); page.setTotal(1L); page.setRecords(getQueueList()); Mockito.when(queueMapper.queryQueuePaging(Mockito.any(Page.class), Mockito.eq(queueName))).thenReturn(page); - Result result = queueService.queryList(getLoginUser(),queueName,1,10); + Result result = queueService.queryList(getLoginUser(), queueName, 1, 10); logger.info(result.toString()); - PageInfo pageInfo = (PageInfo) result.getData(); + PageInfo pageInfo = (PageInfo) result.getData(); Assert.assertTrue(CollectionUtils.isNotEmpty(pageInfo.getTotalList())); } @@ -108,17 +104,17 @@ public class QueueServiceTest { public void testCreateQueue() { // queue is null - Map result = queueService.createQueue(getLoginUser(),null,queueName); + Map result = queueService.createQueue(getLoginUser(), null, queueName); logger.info(result.toString()); - Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS)); + Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS)); // queueName is null - result = queueService.createQueue(getLoginUser(),queueName,null); + result = queueService.createQueue(getLoginUser(), queueName, null); logger.info(result.toString()); - Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR,result.get(Constants.STATUS)); + Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, result.get(Constants.STATUS)); // correct - result = queueService.createQueue(getLoginUser(),queueName,queueName); + result = queueService.createQueue(getLoginUser(), queueName, queueName); logger.info(result.toString()); - Assert.assertEquals(Status.SUCCESS,result.get(Constants.STATUS)); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } @@ -130,25 +126,25 @@ public class QueueServiceTest { Mockito.when(queueMapper.existQueue(null, "test")).thenReturn(true); // not exist - Map result = queueService.updateQueue(getLoginUser(),0,"queue",queueName); + Map result = queueService.updateQueue(getLoginUser(), 0, "queue", queueName); logger.info(result.toString()); - Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode()); + Assert.assertEquals(Status.QUEUE_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); //no need update - result = queueService.updateQueue(getLoginUser(),1,queueName,queueName); + result = queueService.updateQueue(getLoginUser(), 1, queueName, queueName); logger.info(result.toString()); - Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(),((Status)result.get(Constants.STATUS)).getCode()); + Assert.assertEquals(Status.NEED_NOT_UPDATE_QUEUE.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); //queue exist - result = queueService.updateQueue(getLoginUser(),1,"test",queueName); + result = queueService.updateQueue(getLoginUser(), 1, "test", queueName); logger.info(result.toString()); - Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode()); + Assert.assertEquals(Status.QUEUE_VALUE_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); // queueName exist - result = queueService.updateQueue(getLoginUser(),1,"test1","test"); + result = queueService.updateQueue(getLoginUser(), 1, "test1", "test"); logger.info(result.toString()); - Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(),((Status)result.get(Constants.STATUS)).getCode()); + Assert.assertEquals(Status.QUEUE_NAME_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); //success - result = queueService.updateQueue(getLoginUser(),1,"test1","test1"); + result = queueService.updateQueue(getLoginUser(), 1, "test1", "test1"); logger.info(result.toString()); - Assert.assertEquals(Status.SUCCESS.getCode(),((Status)result.get(Constants.STATUS)).getCode()); + Assert.assertEquals(Status.SUCCESS.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); } @@ -159,27 +155,27 @@ public class QueueServiceTest { Mockito.when(queueMapper.existQueue(null, queueName)).thenReturn(true); //queue null - Result result = queueService.verifyQueue(null,queueName); + Result result = queueService.verifyQueue(null, queueName); logger.info(result.toString()); Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode()); //queueName null - result = queueService.verifyQueue(queueName,null); + result = queueService.verifyQueue(queueName, null); logger.info(result.toString()); Assert.assertEquals(result.getCode().intValue(), Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode()); //exist queueName - result = queueService.verifyQueue(queueName,queueName); + result = queueService.verifyQueue(queueName, queueName); logger.info(result.toString()); Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_NAME_EXIST.getCode()); //exist queue - result = queueService.verifyQueue(queueName,"test"); + result = queueService.verifyQueue(queueName, "test"); logger.info(result.toString()); Assert.assertEquals(result.getCode().intValue(), Status.QUEUE_VALUE_EXIST.getCode()); // success - result = queueService.verifyQueue("test","test"); + result = queueService.verifyQueue("test", "test"); logger.info(result.toString()); Assert.assertEquals(result.getCode().intValue(), Status.SUCCESS.getCode()); @@ -187,7 +183,6 @@ public class QueueServiceTest { /** * create admin user - * @return */ private User getLoginUser() { @@ -205,7 +200,6 @@ public class QueueServiceTest { /** * get queue - * @return */ private Queue getQueue() { Queue queue = new Queue(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java index d9e1e974d7..e1c00d2e2e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TenantServiceTest.java @@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.commons.collections.CollectionUtils; @@ -63,9 +62,6 @@ public class TenantServiceTest { @InjectMocks private TenantServiceImpl tenantService; - @Mock - private CacheNotifyService cacheNotifyService; - @Mock private TenantMapper tenantMapper; @@ -88,7 +84,7 @@ public class TenantServiceTest { try { //check tenantCode Map result = - tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest"); + tenantService.createTenant(getLoginUser(), "%!1111", 1, "TenantServiceTest"); logger.info(result.toString()); Assert.assertEquals(Status.CHECK_OS_TENANT_CODE_ERROR, result.get(Constants.STATUS)); @@ -116,7 +112,7 @@ public class TenantServiceTest { page.setRecords(getList()); page.setTotal(1L); Mockito.when(tenantMapper.queryTenantPaging(Mockito.any(Page.class), Mockito.eq("TenantServiceTest"))) - .thenReturn(page); + .thenReturn(page); Result result = tenantService.queryTenantList(getLoginUser(), "TenantServiceTest", 1, 10); logger.info(result.toString()); PageInfo pageInfo = (PageInfo) result.getData(); @@ -131,7 +127,7 @@ public class TenantServiceTest { try { // id not exist Map result = - tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc"); + tenantService.updateTenant(getLoginUser(), 912222, tenantCode, 1, "desc"); logger.info(result.toString()); // success Assert.assertEquals(Status.TENANT_NOT_EXIST, result.get(Constants.STATUS)); @@ -150,7 +146,7 @@ public class TenantServiceTest { Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(processInstanceMapper.queryByTenantIdAndStatus(1, Constants.NOT_TERMINATED_STATES)) - .thenReturn(getInstanceList()); + .thenReturn(getInstanceList()); Mockito.when(processDefinitionMapper.queryDefinitionListByTenant(2)).thenReturn(getDefinitionsList()); Mockito.when(userMapper.queryUserListByTenant(3)).thenReturn(getUserList()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java index 01db35b15e..e586db8cbf 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java @@ -43,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.ResourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.commons.collections.CollectionUtils; @@ -79,9 +78,6 @@ public class UsersServiceTest { @InjectMocks private UsersServiceImpl usersService; - @Mock - private CacheNotifyService cacheNotifyService; - @Mock private UserMapper userMapper; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java index db31eeac77..f1921db892 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/CacheType.java @@ -18,7 +18,20 @@ package org.apache.dolphinscheduler.common.enums; public enum CacheType { - TENANT, - USER, - QUEUE; + TENANT("tenant"), + USER("user"), + QUEUE("queue"), + PROCESS_DEFINITION("processDefinition"), + PROCESS_TASK_RELATION("processTaskRelation"), + TASK_DEFINITION("taskDefinition"); + + CacheType(String cacheName) { + this.cacheName = cacheName; + } + + private final String cacheName; + + public String getCacheName() { + return cacheName; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java index 5381495d4a..2b4f99ceaa 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java @@ -42,6 +42,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean; @Configuration public class SpringConnectionFactory { + @Bean public PaginationInterceptor paginationInterceptor() { return new PaginationInterceptor(); @@ -60,6 +61,7 @@ public class SpringConnectionFactory { configuration.setCallSettersOnNulls(true); configuration.setJdbcTypeForNull(JdbcType.NULL); configuration.addInterceptor(paginationInterceptor()); + configuration.setGlobalConfig(new GlobalConfig().setBanner(false)); MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean(); sqlSessionFactoryBean.setConfiguration(configuration); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java index 51ccb73813..0e03b5012f 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java @@ -23,6 +23,9 @@ import org.apache.ibatis.annotations.Param; import java.util.List; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -30,6 +33,7 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; /** * process definition log mapper interface */ +@CacheConfig(cacheNames = "processDefinition") public interface ProcessDefinitionLogMapper extends BaseMapper { /** @@ -66,6 +70,7 @@ public interface ProcessDefinitionLogMapper extends BaseMapper { /** @@ -43,8 +46,15 @@ public interface ProcessDefinitionMapper extends BaseMapper { * @param code code * @return process definition */ + @Cacheable(sync = true) ProcessDefinition queryByCode(@Param("code") long code); + /** + * update + */ + @CacheEvict + int updateById(@Param("et") ProcessDefinition processDefinition); + /** * query process definition by code list * @@ -59,6 +69,7 @@ public interface ProcessDefinitionMapper extends BaseMapper { * @param code code * @return delete result */ + @CacheEvict int deleteByCode(@Param("code") long code); /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 5b74f46b85..5b3ea752ec 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -25,11 +25,16 @@ import org.apache.ibatis.annotations.Param; import java.util.List; import java.util.Map; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; + import com.baomidou.mybatisplus.core.mapper.BaseMapper; /** * process task relation mapper interface */ +@CacheConfig(cacheNames = "processTaskRelation", keyGenerator = "cacheKeyGenerator") public interface ProcessTaskRelationMapper extends BaseMapper { /** @@ -39,9 +44,16 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryByProcessCode(@Param("projectCode") long projectCode, @Param("processCode") long processCode); + /** + * update + */ + @CacheEvict + int updateById(@Param("et") ProcessTaskRelation processTaskRelation); + /** * process task relation by taskCode * @@ -65,6 +77,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper { /** @@ -48,9 +53,16 @@ public interface TaskDefinitionLogMapper extends BaseMapper { * @param version version * @return task definition log */ + @Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion") TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version); + /** + * update + */ + @CacheEvict + int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog); + /** * @param taskDefinitions taskDefinition list * @return list @@ -72,6 +84,7 @@ public interface TaskDefinitionLogMapper extends BaseMapper { * @param version task definition version * @return delete result */ + @CacheEvict int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version); /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java index 89b62378c0..843122fe36 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java @@ -14,29 +14,50 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.Tenant; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; + import org.apache.ibatis.annotations.Param; -import java.util.List; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; /** * tenant mapper interface */ +@CacheConfig(cacheNames = "tenant", keyGenerator = "cacheKeyGenerator") public interface TenantMapper extends BaseMapper { /** * query tenant by id + * * @param tenantId tenantId * @return tenant */ + @Cacheable(sync = true) Tenant queryById(@Param("tenantId") int tenantId); + /** + * delete by id + */ + @CacheEvict + int deleteById(int id); + + /** + * update + */ + @CacheEvict + int updateById(@Param("et") Tenant tenant); + /** * query tenant by code + * * @param tenantCode tenantCode * @return tenant */ @@ -44,6 +65,7 @@ public interface TenantMapper extends BaseMapper { /** * tenant page + * * @param page page * @param searchVal searchVal * @return tenant IPage @@ -53,6 +75,7 @@ public interface TenantMapper extends BaseMapper { /** * check tenant exist + * * @param tenantCode tenantCode * @return true if exist else return null */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java index 4418363dc2..20fafdc1c6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java @@ -14,29 +14,57 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.User; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + import org.apache.ibatis.annotations.Param; import java.util.List; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.CacheEvict; +import org.springframework.cache.annotation.Cacheable; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + /** * user mapper interface */ +@CacheConfig(cacheNames = "user", keyGenerator = "cacheKeyGenerator") public interface UserMapper extends BaseMapper { + /** + * select by user id + */ + @Cacheable(sync = true) + User selectById(int id); + + /** + * delete by id + */ + @CacheEvict + int deleteById(int id); + + /** + * update + */ + @CacheEvict + int updateById(@Param("et") User user); + /** * query all general user + * * @return user list */ List queryAllGeneralUser(); /** * query user by name + * * @param userName userName * @return user */ @@ -44,6 +72,7 @@ public interface UserMapper extends BaseMapper { /** * query user by userName and password + * * @param userName userName * @param password password * @return user @@ -53,6 +82,7 @@ public interface UserMapper extends BaseMapper { /** * user page + * * @param page page * @param userName userName * @return user IPage @@ -62,6 +92,7 @@ public interface UserMapper extends BaseMapper { /** * query user detail by id + * * @param userId userId * @return user */ @@ -69,6 +100,7 @@ public interface UserMapper extends BaseMapper { /** * query user list by alertgroupId + * * @param alertgroupId alertgroupId * @return user list */ @@ -76,6 +108,7 @@ public interface UserMapper extends BaseMapper { /** * query user list by tenantId + * * @param tenantId tenantId * @return user list */ @@ -83,6 +116,7 @@ public interface UserMapper extends BaseMapper { /** * query user by userId + * * @param userId userId * @return user */ @@ -90,6 +124,7 @@ public interface UserMapper extends BaseMapper { /** * query user by token + * * @param token token * @return user */ @@ -97,6 +132,7 @@ public interface UserMapper extends BaseMapper { /** * query user by queue name + * * @param queueName queue name * @return user list */ @@ -104,13 +140,15 @@ public interface UserMapper extends BaseMapper { /** * check the user exist - * @param queueName queue name + * + * @param queue queue name * @return true if exist else return null */ Boolean existUser(@Param("queue") String queue); /** * update user with old queue + * * @param oldQueue old queue name * @param newQueue new queue name * @return update rows diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java index 64571b8fca..778d1aef55 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java @@ -17,16 +17,24 @@ package org.apache.dolphinscheduler.server.master.processor; +import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.Queue; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.Tenant; +import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; import com.google.common.base.Preconditions; @@ -39,11 +47,7 @@ public class CacheProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(CacheProcessor.class); - private CacheProcessorFactory cacheProcessorFactory; - - public CacheProcessor() { - this.cacheProcessorFactory = SpringApplicationContext.getBean(CacheProcessorFactory.class); - } + private CacheManager cacheManager; @Override public void process(Channel channel, Command command) { @@ -53,6 +57,120 @@ public class CacheProcessor implements NettyRequestProcessor { logger.info("received command : {}", cacheExpireCommand); - cacheProcessorFactory.getCacheProcessor(cacheExpireCommand.getCacheType()).cacheExpire(cacheExpireCommand.getUpdateObjClass(), cacheExpireCommand.getUpdateObjJson()); + this.cacheExpire(cacheExpireCommand); + } + + private void cacheExpire(CacheExpireCommand cacheExpireCommand) { + if (cacheManager == null) { + cacheManager = SpringApplicationContext.getBean(CacheManager.class); + } + + Object object = JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), cacheExpireCommand.getUpdateObjClass()); + if (object == null) { + return; + } + + CacheType cacheType = cacheExpireCommand.getCacheType(); + switch (cacheType) { + case TENANT: + if (object instanceof Tenant) { + Tenant tenant = (Tenant) object; + tenantCacheExpire(tenant); + } + break; + case USER: + if (object instanceof User) { + User user = (User) object; + userCacheExpire(user); + } + break; + case QUEUE: + if (object instanceof Queue) { + Queue queue = (Queue) object; + queueCacheExpire(queue); + } + break; + case PROCESS_DEFINITION: + if (object instanceof ProcessDefinition) { + ProcessDefinition processDefinition = (ProcessDefinition) object; + processDefinitionCacheExpire(processDefinition); + } + break; + case TASK_DEFINITION: + if (object instanceof TaskDefinition) { + TaskDefinition taskDefinition = (TaskDefinition) object; + taskDefinitionCacheExpire(taskDefinition); + } + break; + case PROCESS_TASK_RELATION: + if (object instanceof ProcessTaskRelation) { + ProcessTaskRelation processTaskRelation = (ProcessTaskRelation) object; + processTaskRelationCacheExpire(processTaskRelation); + } + break; + default: + logger.error("no support cache type:{}", cacheType); + } + + // if delete operation, just send key + if (object instanceof String) { + Cache cache = cacheManager.getCache(cacheType.getCacheName()); + if (cache != null) { + cache.evict(object); + logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), object); + } + } + } + + private void tenantCacheExpire(Tenant tenant) { + Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName()); + if (cache != null) { + cache.evict(tenant.getId()); + logger.info("cache evict, type:{}, key:{}", CacheType.TENANT.getCacheName(), tenant.getId()); + } + } + + private void userCacheExpire(User user) { + Cache cache = cacheManager.getCache(CacheType.USER.getCacheName()); + if (cache != null) { + cache.evict(user.getId()); + logger.info("cache evict, type:{}, key:{}", CacheType.USER.getCacheName(), user.getId()); + } + } + + private void queueCacheExpire(Queue queue) { + Cache cache = cacheManager.getCache(CacheType.USER.getCacheName()); + if (cache != null) { + cache.clear(); + logger.info("cache evict, type:{}, clear", CacheType.USER.getCacheName()); + } + } + + private void processDefinitionCacheExpire(ProcessDefinition processDefinition) { + Cache cache = cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName()); + if (cache != null) { + cache.evict(processDefinition.getCode()); + cache.evict(processDefinition.getCode() + "_" + processDefinition.getVersion()); + logger.info("cache evict, type:{}, key:{}", + CacheType.PROCESS_DEFINITION.getCacheName(), processDefinition.getCode() + "_" + processDefinition.getVersion()); + } + } + + private void processTaskRelationCacheExpire(ProcessTaskRelation processTaskRelation) { + Cache cache = cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName()); + if (cache != null) { + cache.evict(processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode()); + logger.info("cache evict, type:{}, key:{}", + CacheType.PROCESS_TASK_RELATION.getCacheName(), processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode()); + } + } + + private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) { + Cache cache = cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName()); + if (cache != null) { + cache.evict(taskDefinition.getCode() + "_" + taskDefinition.getVersion()); + logger.info("cache evict, type:{}, key:{}", + CacheType.TASK_DEFINITION.getCacheName(), taskDefinition.getCode() + "_" + taskDefinition.getVersion()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 2b8287b1c7..6e05da90d5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -118,12 +118,6 @@ public class MasterSchedulerService extends Thread { */ ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); - /** - * key:code-version - * value: processDefinition - */ - HashMap processDefinitionCacheMaps = new HashMap<>(); - private StateWheelExecuteThread stateWheelExecuteThread; /** @@ -195,10 +189,6 @@ public class MasterSchedulerService extends Thread { return; } - if (!masterConfig.isCacheProcessDefinition() && processDefinitionCacheMaps.size() > 0) { - processDefinitionCacheMaps.clear(); - } - List processInstances = command2ProcessInstance(commands); if (CollectionUtils.isEmpty(processInstances)) { return; @@ -245,8 +235,7 @@ public class MasterSchedulerService extends Thread { try { ProcessInstance processInstance = processService.handleCommand(logger, getLocalAddress(), - command, - processDefinitionCacheMaps); + command); if (processInstance != null) { processInstances[index] = processInstance; logger.info("handle command command {} end, create process instance {}", diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 059c371ae4..abc887227f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -737,19 +737,21 @@ public class WorkflowExecuteThread implements Runnable { completeTaskMap.clear(); errorTaskMap.clear(); - List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); - for (TaskInstance task : validTaskInstanceList) { - validTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); - taskInstanceMap.put(task.getId(), task); - - if (task.isTaskComplete()) { - completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); - } - if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { - continue; - } - if (task.getState().typeIsFailure() && !task.taskCanRetry()) { - errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) { + List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); + for (TaskInstance task : validTaskInstanceList) { + validTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + taskInstanceMap.put(task.getId(), task); + + if (task.isTaskComplete()) { + completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + } + if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), dag)) { + continue; + } + if (task.getState().typeIsFailure() && !task.taskCanRetry()) { + errorTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); + } } } diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml b/dolphinscheduler-server/src/main/resources/application-master.yaml index 86796c1195..866b2000ed 100644 --- a/dolphinscheduler-server/src/main/resources/application-master.yaml +++ b/dolphinscheduler-server/src/main/resources/application-master.yaml @@ -19,12 +19,15 @@ spring: name: master-server cache: # default enable cache, you can disable by `type: none` - type: caffeine + type: none cache-names: - tenant - user + - processDefinition + - processTaskRelation + - taskDefinition caffeine: - spec: maximumSize=100,expireAfterWrite=60s,recordStats + spec: maximumSize=100,expireAfterWrite=300s,recordStats master: listen-port: 5678 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java index 02f32160c4..6f1907ccec 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java @@ -22,19 +22,17 @@ import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; -import org.apache.dolphinscheduler.service.cache.processor.impl.CacheProcessorFactory; -import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; import io.netty.channel.Channel; @@ -47,21 +45,20 @@ public class CacheProcessorTest { private CacheProcessor cacheProcessor; - @InjectMocks - private TenantCacheProcessorImpl tenantCacheProcessor; - @Mock private Channel channel; @Mock - private CacheProcessorFactory cacheProcessorFactory; + private CacheManager cacheManager; + + @Mock + private Cache cache; @Before public void before() { PowerMockito.mockStatic(SpringApplicationContext.class); - PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor); - PowerMockito.when(SpringApplicationContext.getBean(CacheProcessorFactory.class)).thenReturn(cacheProcessorFactory); - Mockito.when(cacheProcessorFactory.getCacheProcessor(CacheType.TENANT)).thenReturn(tenantCacheProcessor); + PowerMockito.when(SpringApplicationContext.getBean(CacheManager.class)).thenReturn(cacheManager); + Mockito.when(cacheManager.getCache(CacheType.TENANT.getCacheName())).thenReturn(cache); cacheProcessor = new CacheProcessor(); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java similarity index 94% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java index 3b051b6c81..09c5571497 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/CacheNotifyService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/CacheNotifyService.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.service.cache.service; +package org.apache.dolphinscheduler.service.cache; import org.apache.dolphinscheduler.remote.command.Command; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java similarity index 61% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java index 1d93b42ea8..2a036542a4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProcessor.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheKeyGenerator.java @@ -15,12 +15,22 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.service.cache.processor; +package org.apache.dolphinscheduler.service.cache.impl; -import org.apache.dolphinscheduler.dao.entity.User; +import java.lang.reflect.Method; -public interface UserCacheProcessor extends BaseCacheProcessor { - void update(int userId); +import org.springframework.cache.interceptor.KeyGenerator; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; - User selectById(int userId); +/** + * custom cache key generator + */ +@Component +public class CacheKeyGenerator implements KeyGenerator { + + @Override + public Object generate(Object target, Method method, Object... params) { + return StringUtils.arrayToDelimitedString(params, "_"); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java similarity index 82% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java index 1aa15c2788..ffa9299ffc 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/service/impl/CacheNotifyServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/impl/CacheNotifyServiceImpl.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.service.cache.service.impl; +package org.apache.dolphinscheduler.service.cache.impl; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.service.cache.service.CacheNotifyService; +import org.apache.dolphinscheduler.service.cache.CacheNotifyService; import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.collections4.CollectionUtils; @@ -114,19 +114,22 @@ public class CacheNotifyServiceImpl implements CacheNotifyService { @Override public void notifyMaster(Command command) { logger.info("send result, command:{}", command.toString()); + try { + List serverList = registryClient.getServerList(NodeType.MASTER); + if (CollectionUtils.isEmpty(serverList)) { + return; + } - List serverList = registryClient.getServerList(NodeType.MASTER); - if (CollectionUtils.isEmpty(serverList)) { - return; - } - - for (Server server : serverList) { - Host host = new Host(server.getHost(), server.getPort()); - NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host); - if (nettyRemoteChannel == null) { - continue; + for (Server server : serverList) { + Host host = new Host(server.getHost(), server.getPort()); + NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(host); + if (nettyRemoteChannel == null) { + continue; + } + nettyRemoteChannel.writeAndFlush(command); } - nettyRemoteChannel.writeAndFlush(command); + } catch (Exception e) { + logger.error("notify master error", e); } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java deleted file mode 100644 index 2b0dc8be8d..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/BaseCacheProcessor.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor; - -public interface BaseCacheProcessor { - void cacheExpire(Class updateObjClass, String updateObjJson); -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java deleted file mode 100644 index 4b438eb0ac..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessor.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor; - -public interface QueueCacheProcessor extends BaseCacheProcessor { - public void expireAllUserCache(); -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java deleted file mode 100644 index c6b8000547..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessor.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor; - -import org.apache.dolphinscheduler.dao.entity.Tenant; - -public interface TenantCacheProcessor extends BaseCacheProcessor { - void update(int tenantId); - - Tenant queryById(int tenantId); -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java deleted file mode 100644 index 16a6ccd9ad..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/CacheProcessorFactory.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor.impl; - -import org.apache.dolphinscheduler.common.enums.CacheType; -import org.apache.dolphinscheduler.service.cache.processor.BaseCacheProcessor; -import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor; -import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; -import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.annotation.PostConstruct; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class CacheProcessorFactory { - - @Autowired - private TenantCacheProcessor tenantCacheProcessor; - - @Autowired - private UserCacheProcessor userCacheProcessor; - - @Autowired - private QueueCacheProcessor queueCacheProcessor; - - Map cacheProcessorMap = new ConcurrentHashMap<>(); - - @PostConstruct - private void init() { - cacheProcessorMap.put(CacheType.TENANT, tenantCacheProcessor); - cacheProcessorMap.put(CacheType.USER, userCacheProcessor); - cacheProcessorMap.put(CacheType.QUEUE, queueCacheProcessor); - } - - public BaseCacheProcessor getCacheProcessor(CacheType cacheType) { - return cacheProcessorMap.get(cacheType); - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java deleted file mode 100644 index 174d59fba9..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/QueueCacheProcessorImpl.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor.impl; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.Queue; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.QueueCacheProcessor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.cache.annotation.CacheEvict; -import org.springframework.stereotype.Component; - -@Component -public class QueueCacheProcessorImpl implements QueueCacheProcessor { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - @Override - @CacheEvict(cacheNames = "user", allEntries = true) - public void expireAllUserCache() { - // just evict cache - logger.debug("expire all user cache"); - } - - @Override - public void cacheExpire(Class updateObjClass, String updateObjJson) { - Queue updateQueue = (Queue) JSONUtils.parseObject(updateObjJson, updateObjClass); - if (updateQueue == null) { - return; - } - SpringApplicationContext.getBean(QueueCacheProcessor.class).expireAllUserCache(); - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java deleted file mode 100644 index 3ca801400b..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/TenantCacheProcessorImpl.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor.impl; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.mapper.TenantMapper; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cache.annotation.CacheConfig; -import org.springframework.cache.annotation.CacheEvict; -import org.springframework.cache.annotation.Cacheable; -import org.springframework.stereotype.Component; - -@Component -@CacheConfig(cacheNames = "tenant") -public class TenantCacheProcessorImpl implements TenantCacheProcessor { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - @Autowired - private TenantMapper tenantMapper; - - @Override - @CacheEvict - public void update(int tenantId) { - // just evict cache - } - - @Override - @Cacheable(sync = true) - public Tenant queryById(int tenantId) { - logger.debug("tenant cache proxy, tenantId:{}", tenantId); - return tenantMapper.queryById(tenantId); - } - - @Override - public void cacheExpire(Class updateObjClass, String updateObjJson) { - Tenant updateTenant = (Tenant) JSONUtils.parseObject(updateObjJson, updateObjClass); - if (updateTenant == null) { - return; - } - SpringApplicationContext.getBean(TenantCacheProcessor.class).update(updateTenant.getId()); - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java deleted file mode 100644 index fb25fb57be..0000000000 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/cache/processor/impl/UserCacheProcessorImpl.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor.impl; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cache.annotation.CacheConfig; -import org.springframework.cache.annotation.CacheEvict; -import org.springframework.cache.annotation.Cacheable; -import org.springframework.stereotype.Component; - -@Component -@CacheConfig(cacheNames = "user") -public class UserCacheProcessorImpl implements UserCacheProcessor { - - @Autowired - private UserMapper userMapper; - - @Override - @CacheEvict - public void update(int userId) { - // just evict cache - } - - @Override - @Cacheable(sync = true) - public User selectById(int userId) { - return userMapper.selectById(userId); - } - - @Override - public void cacheExpire(Class updateObjClass, String updateObjJson) { - User user = (User) JSONUtils.parseObject(updateObjJson, updateObjClass); - if (user == null) { - return; - } - SpringApplicationContext.getBean(UserCacheProcessor.class).update(user.getId()); - } -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 9143154067..999200279f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -100,15 +100,15 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.utils.DagHelper; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.TenantCacheProcessor; -import org.apache.dolphinscheduler.service.cache.processor.UserCacheProcessor; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.quartz.cron.CronUtils; @@ -155,7 +155,7 @@ public class ProcessService { ExecutionStatus.READY_STOP.ordinal()}; @Autowired - private UserCacheProcessor userCacheProcessor; + private UserMapper userMapper; @Autowired private ProcessDefinitionMapper processDefineMapper; @@ -194,7 +194,7 @@ public class ProcessService { private ErrorCommandMapper errorCommandMapper; @Autowired - private TenantCacheProcessor tenantCacheProcessor; + private TenantMapper tenantMapper; @Autowired private ProjectMapper projectMapper; @@ -233,8 +233,8 @@ public class ProcessService { * @return process instance */ @Transactional - public ProcessInstance handleCommand(Logger logger, String host, Command command, HashMap processDefinitionCacheMaps) { - ProcessInstance processInstance = constructProcessInstance(command, host, processDefinitionCacheMaps); + public ProcessInstance handleCommand(Logger logger, String host, Command command) { + ProcessInstance processInstance = constructProcessInstance(command, host); // cannot construct process instance, return null if (processInstance == null) { logger.error("scan command, command parameter is error: {}", command); @@ -739,7 +739,7 @@ public class ProcessService { public Tenant getTenantForProcess(int tenantId, int userId) { Tenant tenant = null; if (tenantId >= 0) { - tenant = tenantCacheProcessor.queryById(tenantId); + tenant = tenantMapper.queryById(tenantId); } if (userId == 0) { @@ -747,8 +747,8 @@ public class ProcessService { } if (tenant == null) { - User user = userCacheProcessor.selectById(userId); - tenant = tenantCacheProcessor.queryById(user.getTenantId()); + User user = userMapper.selectById(userId); + tenant = tenantMapper.queryById(user.getTenantId()); } return tenant; } @@ -794,19 +794,12 @@ public class ProcessService { * @param host host * @return process instance */ - private ProcessInstance constructProcessInstance(Command command, String host, HashMap processDefinitionCacheMaps) { + private ProcessInstance constructProcessInstance(Command command, String host) { ProcessInstance processInstance; ProcessDefinition processDefinition; CommandType commandType = command.getCommandType(); - String key = String.format("%d-%d", command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); - if (processDefinitionCacheMaps.containsKey(key)) { - processDefinition = processDefinitionCacheMaps.get(key); - } else { - processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); - if (processDefinition != null) { - processDefinitionCacheMaps.put(key, processDefinition); - } - } + + processDefinition = this.findProcessDefinition(command.getProcessDefinitionCode(), command.getProcessDefinitionVersion()); if (processDefinition == null) { logger.error("cannot find the work process define! define code : {}", command.getProcessDefinitionCode()); return null; @@ -1964,11 +1957,11 @@ public class ProcessService { return StringUtils.EMPTY; } int userId = resourceList.get(0).getUserId(); - User user = userCacheProcessor.selectById(userId); + User user = userMapper.selectById(userId); if (Objects.isNull(user)) { return StringUtils.EMPTY; } - Tenant tenant = tenantCacheProcessor.queryById(user.getTenantId()); + Tenant tenant = tenantMapper.queryById(user.getTenantId()); if (Objects.isNull(tenant)) { return StringUtils.EMPTY; } @@ -2038,7 +2031,7 @@ public class ProcessService { if (processInstance == null) { return queue; } - User executor = userCacheProcessor.selectById(processInstance.getExecutorId()); + User executor = userMapper.selectById(processInstance.getExecutorId()); if (executor != null) { queue = executor.getQueue(); } @@ -2149,7 +2142,7 @@ public class ProcessService { * @return User */ public User getUserById(int userId) { - return userCacheProcessor.selectById(userId); + return userMapper.selectById(userId); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java index 83f1f00cef..04d1f785d0 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java @@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; -import org.apache.dolphinscheduler.service.cache.service.impl.CacheNotifyServiceImpl; +import org.apache.dolphinscheduler.service.cache.impl.CacheNotifyServiceImpl; import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.ArrayList; diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java deleted file mode 100644 index ee61564512..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/QueueCacheProcessorTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.Queue; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.impl.QueueCacheProcessorImpl; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -/** - * tenant cache proxy test - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplicationContext.class}) -public class QueueCacheProcessorTest { - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @InjectMocks - private QueueCacheProcessorImpl queueCacheProcessor; - - @Before - public void before() { - PowerMockito.mockStatic(SpringApplicationContext.class); - PowerMockito.when(SpringApplicationContext.getBean(QueueCacheProcessor.class)).thenReturn(queueCacheProcessor); - } - - @Test - public void testCacheExpire() { - Queue queue = new Queue(); - queue.setId(100); - queueCacheProcessor.cacheExpire(Queue.class, JSONUtils.toJsonString(queue)); - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java deleted file mode 100644 index 4ecc970ccd..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/TenantCacheProcessorTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.mapper.TenantMapper; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.impl.TenantCacheProcessorImpl; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -/** - * tenant cache proxy test - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplicationContext.class}) -public class TenantCacheProcessorTest { - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @InjectMocks - private TenantCacheProcessorImpl tenantCacheProcessor; - - @Mock - private TenantMapper tenantMapper; - - @Before - public void before() { - PowerMockito.mockStatic(SpringApplicationContext.class); - PowerMockito.when(SpringApplicationContext.getBean(TenantCacheProcessor.class)).thenReturn(tenantCacheProcessor); - } - - @Test - public void testQueryById() { - Tenant tenant1 = new Tenant(); - tenant1.setId(100); - tenant1.setDescription("test1"); - - Mockito.when(tenantMapper.queryById(100)).thenReturn(tenant1); - Assert.assertEquals(tenant1, tenantCacheProcessor.queryById(100)); - } - - @Test - public void testCacheExpire() { - Tenant tenant1 = new Tenant(); - tenant1.setId(100); - tenant1.setDescription("test1"); - tenantCacheProcessor.cacheExpire(Tenant.class, JSONUtils.toJsonString(tenant1)); - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java deleted file mode 100644 index f786f43c97..0000000000 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/processor/UserCacheProxyTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.service.cache.processor; - -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.UserMapper; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -/** - * tenant cache proxy test - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({SpringApplicationContext.class}) -public class UserCacheProxyTest { - - @Rule - public final ExpectedException exception = ExpectedException.none(); - - @InjectMocks - private UserCacheProcessorImpl userCacheProcessor; - - @Mock - private UserMapper userMapper; - - @Before - public void before() { - PowerMockito.mockStatic(SpringApplicationContext.class); - PowerMockito.when(SpringApplicationContext.getBean(UserCacheProcessor.class)).thenReturn(userCacheProcessor); - } - - @Test - public void testQueryById() { - User user1 = new User(); - user1.setId(100); - - Mockito.when(userMapper.selectById(100)).thenReturn(user1); - Assert.assertEquals(user1, userCacheProcessor.selectById(100)); - } - - @Test - public void testCacheExpire() { - User user = new User(); - user.setId(100); - userCacheProcessor.cacheExpire(User.class, JSONUtils.toJsonString(user)); - } -} diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 2117b86781..510e3a0098 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -64,7 +64,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; -import org.apache.dolphinscheduler.service.cache.processor.impl.UserCacheProcessorImpl; +import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.exceptions.ServiceException; import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest; @@ -116,7 +116,7 @@ public class ProcessServiceTest { @Mock private ProcessInstanceMapper processInstanceMapper; @Mock - private UserCacheProcessorImpl userCacheProcessor; + private UserMapper userMapper; @Mock private TaskInstanceMapper taskInstanceMapper; @Mock @@ -134,8 +134,6 @@ public class ProcessServiceTest { @Mock private TaskGroupQueueMapper taskGroupQueueMapper; - private HashMap processDefinitionCacheMaps = new HashMap<>(); - @Test public void testCreateSubCommand() { ProcessInstance parentInstance = new ProcessInstance(); @@ -263,7 +261,7 @@ public class ProcessServiceTest { command.setCommandType(CommandType.REPEAT_RUNNING); command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\"" + CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}"); - Assert.assertNull(processService.handleCommand(logger, host, command, processDefinitionCacheMaps)); + Assert.assertNull(processService.handleCommand(logger, host, command)); int definitionVersion = 1; long definitionCode = 123; @@ -298,7 +296,7 @@ public class ProcessServiceTest { Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition)); Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); - Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps)); + Assert.assertNotNull(processService.handleCommand(logger, host, command1)); Command command2 = new Command(); command2.setId(2); @@ -308,7 +306,7 @@ public class ProcessServiceTest { command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS); command2.setProcessInstanceId(processInstanceId); Mockito.when(commandMapper.deleteById(2)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command2, processDefinitionCacheMaps)); + Assert.assertNotNull(processService.handleCommand(logger, host, command2)); Command command3 = new Command(); command3.setId(3); @@ -318,7 +316,7 @@ public class ProcessServiceTest { command3.setCommandParam("{\"WaitingThreadInstanceId\":222}"); command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS); Mockito.when(commandMapper.deleteById(3)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command3, processDefinitionCacheMaps)); + Assert.assertNotNull(processService.handleCommand(logger, host, command3)); Command command4 = new Command(); command4.setId(4); @@ -328,7 +326,7 @@ public class ProcessServiceTest { command4.setCommandType(CommandType.REPEAT_RUNNING); command4.setProcessInstanceId(processInstanceId); Mockito.when(commandMapper.deleteById(4)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command4, processDefinitionCacheMaps)); + Assert.assertNotNull(processService.handleCommand(logger, host, command4)); Command command5 = new Command(); command5.setId(5); @@ -342,7 +340,7 @@ public class ProcessServiceTest { command5.setCommandType(CommandType.START_PROCESS); command5.setDryRun(Constants.DRY_RUN_FLAG_NO); Mockito.when(commandMapper.deleteById(5)).thenReturn(1); - ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5, processDefinitionCacheMaps); + ProcessInstance processInstance1 = processService.handleCommand(logger, host, command5); Assert.assertTrue(processInstance1.getGlobalParams().contains("\"testStartParam1\"")); ProcessDefinition processDefinition1 = new ProcessDefinition(); @@ -367,7 +365,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2); Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1); Mockito.when(commandMapper.deleteById(1)).thenReturn(1); - Assert.assertNotNull(processService.handleCommand(logger, host, command1, processDefinitionCacheMaps)); + Assert.assertNotNull(processService.handleCommand(logger, host, command1)); Command command6 = new Command(); command6.setId(6); @@ -378,7 +376,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists); Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true); Mockito.when(commandMapper.deleteById(6)).thenReturn(1); - ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6, processDefinitionCacheMaps); + ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6); Assert.assertTrue(processInstance6 != null); processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD); @@ -397,7 +395,7 @@ public class ProcessServiceTest { command7.setProcessDefinitionVersion(1); Mockito.when(commandMapper.deleteById(7)).thenReturn(1); Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(11L, Constants.RUNNING_PROCESS_STATE, 224)).thenReturn(null); - ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7, processDefinitionCacheMaps); + ProcessInstance processInstance8 = processService.handleCommand(logger, host, command7); Assert.assertTrue(processInstance8 == null); ProcessDefinition processDefinition2 = new ProcessDefinition(); @@ -421,7 +419,7 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndStatusAndNextId(12L, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists); Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1); Mockito.when(commandMapper.deleteById(9)).thenReturn(1); - ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9, processDefinitionCacheMaps); + ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9); Assert.assertTrue(processInstance10 == null); } @@ -462,14 +460,14 @@ public class ProcessServiceTest { Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance); // will throw exception when command id is 0 and delete fail - processService.handleCommand(logger, host, command1, processDefinitionCacheMaps); + processService.handleCommand(logger, host, command1); } @Test public void testGetUserById() { User user = new User(); user.setId(123); - Mockito.when(userCacheProcessor.selectById(123)).thenReturn(user); + Mockito.when(userMapper.selectById(123)).thenReturn(user); Assert.assertEquals(user, processService.getUserById(123)); }