Wenjun Ruan
1 year ago
committed by
GitHub
13 changed files with 0 additions and 418 deletions
@ -1,42 +0,0 @@
|
||||
# Cache |
||||
|
||||
## Purpose |
||||
|
||||
Due to the large database read operations during the master-server scheduling process. Such as read tables like `tenant`, `user`, `processDefinition`, etc. Operations stress read pressure to the DB, and slow down the entire core scheduling process. |
||||
|
||||
By considering this part of the business data is a high-read and low-write scenario, a cache module is introduced to reduce the DB read pressure and speed up the core scheduling process. |
||||
|
||||
## Cache Settings |
||||
|
||||
```yaml |
||||
spring: |
||||
cache: |
||||
# default disable cache, you can enable by `type: caffeine` |
||||
type: none |
||||
cache-names: |
||||
- tenant |
||||
- user |
||||
- processDefinition |
||||
- processTaskRelation |
||||
- taskDefinition |
||||
caffeine: |
||||
spec: maximumSize=100,expireAfterWrite=300s,recordStats |
||||
``` |
||||
|
||||
The cache module uses [spring-cache](https://spring.io/guides/gs/caching/), so you can set cache config like whether to enable cache (`none` to disable by default), cache types in the spring `application.yaml` directly. |
||||
|
||||
Currently, implements the config of [caffeine](https://github.com/ben-manes/caffeine), you can assign cache configs like cache size, expire time, etc. |
||||
|
||||
## Cache Read |
||||
|
||||
The cache module adopts the `@Cacheable` annotation from spring-cache and you can annotate the annotation in the related mapper layer. Refer to the `TenantMapper`. |
||||
|
||||
## Cache Evict |
||||
|
||||
The business data updates come from the api-server, and the cache side is in the master-server. Then it is necessary to monitor the data updates from the api-server (use aspect point cut interceptor `@CacheEvict`), and notify the master-server of `cacheEvictCommand` when processing a cache eviction. |
||||
|
||||
Note: the final strategy for cache update comes from the expiration strategy configuration in caffeine, therefore configure it under the business scenarios; |
||||
|
||||
The sequence diagram shows below: |
||||
|
||||
<img src="../../../img/cache-evict.png" alt="cache-evict" style="zoom: 67%;" /> |
@ -1,42 +0,0 @@
|
||||
### 缓存 |
||||
|
||||
#### 缓存目的 |
||||
|
||||
由于在master-server调度过程中,会产生大量的数据库读取操作,如tenant,user,processDefinition等,一方面对DB产生很大的读压力,另一方面则会使整个核心调度流程变得缓慢; |
||||
|
||||
考虑到这部分业务数据是读多写少的场景,故引入了缓存模块,以减少DB读压力,加快核心调度流程; |
||||
|
||||
#### 缓存设置 |
||||
|
||||
```yaml |
||||
spring: |
||||
cache: |
||||
# default enable cache, you can disable by `type: none` |
||||
type: none |
||||
cache-names: |
||||
- tenant |
||||
- user |
||||
- processDefinition |
||||
- processTaskRelation |
||||
- taskDefinition |
||||
caffeine: |
||||
spec: maximumSize=100,expireAfterWrite=300s,recordStats |
||||
``` |
||||
|
||||
缓存模块采用[spring-cache](https://spring.io/guides/gs/caching/)机制,可直接在spring配置文件中配置是否开启缓存(默认`none`关闭), 缓存类型; |
||||
|
||||
目前采用[caffeine](https://github.com/ben-manes/caffeine)进行缓存管理,可自由设置缓存相关配置,如缓存大小、过期时间等; |
||||
|
||||
#### 缓存读取 |
||||
|
||||
缓存采用spring-cache的注解,配置在相关的mapper层,可参考如:`TenantMapper`. |
||||
|
||||
#### 缓存更新 |
||||
|
||||
业务数据的更新来自于api-server, 而缓存端在master-server, 故需要对api-server的数据更新做监听(aspect切面拦截`@CacheEvict`),当需要进行缓存驱逐时会通知master-server,master-server接收到cacheEvictCommand后进行缓存驱逐; |
||||
|
||||
需要注意的是:缓存更新的兜底策略来自于用户在caffeine中的过期策略配置,请结合业务进行配置; |
||||
|
||||
时序图如下图所示: |
||||
|
||||
<img src="../../../img/cache-evict.png" alt="cache-evict" style="zoom: 67%;" /> |
@ -1,159 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.aspect; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.CacheType; |
||||
import org.apache.dolphinscheduler.common.model.Server; |
||||
import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; |
||||
import org.apache.dolphinscheduler.extract.master.IMasterCacheService; |
||||
import org.apache.dolphinscheduler.extract.master.transportor.CacheExpireRequest; |
||||
import org.apache.dolphinscheduler.registry.api.RegistryClient; |
||||
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; |
||||
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator; |
||||
|
||||
import org.apache.commons.collections4.CollectionUtils; |
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.lang.reflect.Method; |
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.aspectj.lang.ProceedingJoinPoint; |
||||
import org.aspectj.lang.annotation.Around; |
||||
import org.aspectj.lang.annotation.Aspect; |
||||
import org.aspectj.lang.annotation.Pointcut; |
||||
import org.aspectj.lang.reflect.MethodSignature; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.cache.annotation.CacheConfig; |
||||
import org.springframework.cache.annotation.CacheEvict; |
||||
import org.springframework.expression.EvaluationContext; |
||||
import org.springframework.expression.spel.standard.SpelExpressionParser; |
||||
import org.springframework.expression.spel.support.StandardEvaluationContext; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
/** |
||||
* aspect for cache evict |
||||
*/ |
||||
@Aspect |
||||
@Component |
||||
@Slf4j |
||||
public class CacheEvictAspect { |
||||
|
||||
/** |
||||
* symbol of spring el |
||||
*/ |
||||
private static final String EL_SYMBOL = "#"; |
||||
|
||||
/** |
||||
* prefix of spring el |
||||
*/ |
||||
private static final String P = "p"; |
||||
|
||||
@Autowired |
||||
private CacheKeyGenerator cacheKeyGenerator; |
||||
|
||||
@Autowired |
||||
private RegistryClient registryClient; |
||||
|
||||
@Pointcut("@annotation(org.springframework.cache.annotation.CacheEvict)") |
||||
public void cacheEvictPointCut() { |
||||
// Do nothing because of it's a pointcut
|
||||
} |
||||
|
||||
@Around("cacheEvictPointCut()") |
||||
public Object doAround(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { |
||||
MethodSignature sign = (MethodSignature) proceedingJoinPoint.getSignature(); |
||||
Method method = sign.getMethod(); |
||||
Object target = proceedingJoinPoint.getTarget(); |
||||
Object[] args = proceedingJoinPoint.getArgs(); |
||||
|
||||
Object result = proceedingJoinPoint.proceed(); |
||||
|
||||
CacheConfig cacheConfig = method.getDeclaringClass().getAnnotation(CacheConfig.class); |
||||
CacheEvict cacheEvict = method.getAnnotation(CacheEvict.class); |
||||
|
||||
CacheType cacheType = getCacheType(cacheConfig, cacheEvict); |
||||
if (cacheType != null) { |
||||
String cacheKey; |
||||
if (cacheEvict.key().isEmpty()) { |
||||
cacheKey = (String) cacheKeyGenerator.generate(target, method, args); |
||||
} else { |
||||
cacheKey = cacheEvict.key(); |
||||
if (cacheEvict.key().contains(EL_SYMBOL)) { |
||||
cacheKey = parseKey(cacheEvict.key(), Arrays.asList(args)); |
||||
} |
||||
} |
||||
if (StringUtils.isNotEmpty(cacheKey)) { |
||||
notifyMaster(cacheType, cacheKey); |
||||
} |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
private CacheType getCacheType(CacheConfig cacheConfig, CacheEvict cacheEvict) { |
||||
String cacheName = null; |
||||
if (cacheEvict.cacheNames().length > 0) { |
||||
cacheName = cacheEvict.cacheNames()[0]; |
||||
} |
||||
if (cacheConfig.cacheNames().length > 0) { |
||||
cacheName = cacheConfig.cacheNames()[0]; |
||||
} |
||||
if (cacheName == null) { |
||||
return null; |
||||
} |
||||
for (CacheType cacheType : CacheType.values()) { |
||||
if (cacheType.getCacheName().equals(cacheName)) { |
||||
return cacheType; |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
private String parseKey(String key, List<Object> paramList) { |
||||
SpelExpressionParser spelParser = new SpelExpressionParser(); |
||||
EvaluationContext ctx = new StandardEvaluationContext(); |
||||
for (int i = 0; i < paramList.size(); i++) { |
||||
ctx.setVariable(P + i, paramList.get(i)); |
||||
} |
||||
Object obj = spelParser.parseExpression(key).getValue(ctx); |
||||
if (null == obj) { |
||||
throw new RuntimeException("parseKey error"); |
||||
} |
||||
return obj.toString(); |
||||
} |
||||
|
||||
private void notifyMaster(CacheType cacheType, String cacheKey) { |
||||
try { |
||||
List<Server> serverList = registryClient.getServerList(RegistryNodeType.MASTER); |
||||
if (CollectionUtils.isEmpty(serverList)) { |
||||
return; |
||||
} |
||||
for (Server server : serverList) { |
||||
IMasterCacheService masterCacheService = SingletonJdkDynamicRpcClientProxyFactory |
||||
.getProxyClient(server.getHost() + ":" + server.getPort(), IMasterCacheService.class); |
||||
masterCacheService.cacheExpire(new CacheExpireRequest(cacheType, cacheKey)); |
||||
} |
||||
} catch (Exception e) { |
||||
log.error("notify master error", e); |
||||
} |
||||
|
||||
} |
||||
} |
@ -1,30 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.extract.master; |
||||
|
||||
import org.apache.dolphinscheduler.extract.base.RpcMethod; |
||||
import org.apache.dolphinscheduler.extract.base.RpcService; |
||||
import org.apache.dolphinscheduler.extract.master.transportor.CacheExpireRequest; |
||||
|
||||
@RpcService |
||||
public interface IMasterCacheService { |
||||
|
||||
@RpcMethod |
||||
void cacheExpire(CacheExpireRequest cacheExpireRequest); |
||||
|
||||
} |
@ -1,34 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.extract.master.transportor; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.CacheType; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Data; |
||||
import lombok.NoArgsConstructor; |
||||
|
||||
@Data |
||||
@AllArgsConstructor |
||||
@NoArgsConstructor |
||||
public class CacheExpireRequest { |
||||
|
||||
private CacheType cacheType; |
||||
private String cacheKey; |
||||
|
||||
} |
@ -1,52 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.master.rpc; |
||||
|
||||
import org.apache.dolphinscheduler.common.enums.CacheType; |
||||
import org.apache.dolphinscheduler.extract.master.IMasterCacheService; |
||||
import org.apache.dolphinscheduler.extract.master.transportor.CacheExpireRequest; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.cache.Cache; |
||||
import org.springframework.cache.CacheManager; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
@Slf4j |
||||
@Service |
||||
public class MasterCacheServiceImpl implements IMasterCacheService { |
||||
|
||||
@Autowired |
||||
private CacheManager cacheManager; |
||||
|
||||
@Override |
||||
public void cacheExpire(CacheExpireRequest cacheExpireRequest) { |
||||
if (cacheExpireRequest.getCacheKey().isEmpty()) { |
||||
return; |
||||
} |
||||
|
||||
CacheType cacheType = cacheExpireRequest.getCacheType(); |
||||
Cache cache = cacheManager.getCache(cacheType.getCacheName()); |
||||
if (cache != null) { |
||||
cache.evict(cacheExpireRequest.getCacheKey()); |
||||
log.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), cacheExpireRequest.getCacheKey()); |
||||
} |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue