Browse Source

[Bug-7292][ApiServer] fix cache error when standalone (#7293)

3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
0f7e38ed3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
  2. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  3. 33
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  4. 28
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  5. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
  6. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  7. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
  8. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java

@ -24,12 +24,9 @@ import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Around;
@ -39,7 +36,6 @@ import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.expression.EvaluationContext; import org.springframework.expression.EvaluationContext;
@ -61,6 +57,11 @@ public class CacheEvictAspect {
*/ */
private static final String EL_SYMBOL = "#"; private static final String EL_SYMBOL = "#";
/**
* prefix of spring el
*/
private static final String P = "p";
@Autowired @Autowired
private CacheKeyGenerator cacheKeyGenerator; private CacheKeyGenerator cacheKeyGenerator;
@ -91,9 +92,8 @@ public class CacheEvictAspect {
cacheKey = (String) cacheKeyGenerator.generate(target, method, args); cacheKey = (String) cacheKeyGenerator.generate(target, method, args);
} else { } else {
cacheKey = cacheEvict.key(); cacheKey = cacheEvict.key();
List<Name> paramsList = getParamAnnotationsByType(method, Name.class);
if (cacheEvict.key().contains(EL_SYMBOL)) { if (cacheEvict.key().contains(EL_SYMBOL)) {
cacheKey = parseKey(cacheEvict.key(), paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), Arrays.asList(args)); cacheKey = parseKey(cacheEvict.key(), Arrays.asList(args));
} }
} }
if (StringUtils.isNotEmpty(cacheKey)) { if (StringUtils.isNotEmpty(cacheKey)) {
@ -123,11 +123,11 @@ public class CacheEvictAspect {
return null; return null;
} }
private String parseKey(String key, List<String> paramNameList, List<Object> paramList) { private String parseKey(String key, List<Object> paramList) {
SpelExpressionParser spelParser = new SpelExpressionParser(); SpelExpressionParser spelParser = new SpelExpressionParser();
EvaluationContext ctx = new StandardEvaluationContext(); EvaluationContext ctx = new StandardEvaluationContext();
for (int i = 0; i < paramNameList.size(); i++) { for (int i = 0; i < paramList.size(); i++) {
ctx.setVariable("p" + i, paramList.get(i)); ctx.setVariable(P + i, paramList.get(i));
} }
Object obj = spelParser.parseExpression(key).getValue(ctx); Object obj = spelParser.parseExpression(key).getValue(ctx);
if (null == obj) { if (null == obj) {
@ -135,18 +135,4 @@ public class CacheEvictAspect {
} }
return obj.toString(); return obj.toString();
} }
private <T extends Annotation> List<T> getParamAnnotationsByType(Method method, Class<T> annotationClass) {
List<T> annotationsList = new ArrayList<>();
Annotation[][] annotations = method.getParameterAnnotations();
for (int i = 0; i < annotations.length; i++) {
Annotation[] annotationsI = annotations[i];
for (Annotation annotation : annotationsI) {
if (annotation.annotationType().equals(annotationClass)) {
annotationsList.add((T) annotation);
}
}
}
return annotationsList;
}
} }

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -33,9 +33,19 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/** /**
* process definition log mapper interface * process definition log mapper interface
*/ */
@CacheConfig(cacheNames = "processDefinition") @CacheConfig(cacheNames = "processDefinition", keyGenerator = "cacheKeyGenerator")
public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> { public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinitionLog> {
/**
* query the certain process definition version info by process definition code and version number
*
* @param code process definition code
* @param version version number
* @return the process definition version info
*/
@Cacheable(sync = true)
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
/** /**
* query process definition log by name * query process definition log by name
* *
@ -63,16 +73,6 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
*/ */
ProcessDefinitionLog queryMaxVersionDefinitionLog(@Param("code") long code); ProcessDefinitionLog queryMaxVersionDefinitionLog(@Param("code") long code);
/**
* query the certain process definition version info by process definition code and version number
*
* @param code process definition code
* @param version version number
* @return the process definition version info
*/
@Cacheable(sync = true, key = "#processDefinitionCode + '_' + #version")
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
/** /**
* query the paging process definition version list by pagination info * query the paging process definition version list by pagination info
* *

33
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
@ -54,30 +53,30 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* update * update
*/ */
@CacheEvict(key = "#p0.code") @CacheEvict(key = "#p0.code")
int updateById(@Name("processDefinition") @Param("et") ProcessDefinition processDefinition); int updateById(@Param("et") ProcessDefinition processDefinition);
/** /**
* query process definition by code list * delete process definition by code
* *
* @param codes codes * @param code code
* @return process definition list * @return delete result
*/ */
List<ProcessDefinition> queryByCodes(@Param("codes") Collection<Long> codes); @CacheEvict
int deleteByCode(@Param("code") long code);
/** /**
* delete process definition by code * query process definition by code list
* *
* @param code code * @param codes codes
* @return delete result * @return process definition list
*/ */
@CacheEvict(key = "#code") List<ProcessDefinition> queryByCodes(@Param("codes") Collection<Long> codes);
int deleteByCode(@Name("code") @Param("code") long code);
/** /**
* verify process definition by name * verify process definition by name
* *
* @param projectCode projectCode * @param projectCode projectCode
* @param name name * @param name name
* @return process definition * @return process definition
*/ */
ProcessDefinition verifyByDefineName(@Param("projectCode") long projectCode, ProcessDefinition verifyByDefineName(@Param("projectCode") long projectCode,
@ -87,7 +86,7 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* query process definition by name * query process definition by name
* *
* @param projectCode projectCode * @param projectCode projectCode
* @param name name * @param name name
* @return process definition * @return process definition
*/ */
ProcessDefinition queryByDefineName(@Param("projectCode") long projectCode, ProcessDefinition queryByDefineName(@Param("projectCode") long projectCode,
@ -104,11 +103,11 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
/** /**
* process definition page * process definition page
* *
* @param page page * @param page page
* @param searchVal searchVal * @param searchVal searchVal
* @param userId userId * @param userId userId
* @param projectCode projectCode * @param projectCode projectCode
* @param isAdmin isAdmin * @param isAdmin isAdmin
* @return process definition IPage * @return process definition IPage
*/ */
IPage<ProcessDefinition> queryDefineListPaging(IPage<ProcessDefinition> page, IPage<ProcessDefinition> queryDefineListPaging(IPage<ProcessDefinition> page,

28
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -25,7 +25,6 @@ import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
@ -45,15 +44,25 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode * @param processCode processCode
* @return ProcessTaskRelation list * @return ProcessTaskRelation list
*/ */
@Cacheable(sync = true) @Cacheable(unless = "#result == null || #result.size() == 0")
List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode, List<ProcessTaskRelation> queryByProcessCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode); @Param("processCode") long processCode);
/** /**
* update * update
*/ */
@CacheEvict(key = "#processTaskRelation.projectCode + '_' + #processTaskRelation.processDefinitionCode") @CacheEvict(key = "#p0.projectCode + '_' + #p0.processDefinitionCode")
int updateById(@Name("processTaskRelation") @Param("et") ProcessTaskRelation processTaskRelation); int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
/**
* delete process task relation by processCode
*
* @param projectCode projectCode
* @param processCode processCode
* @return int
*/
@CacheEvict
int deleteByCode(@Param("projectCode") long projectCode, @Param("processCode") long processCode);
/** /**
* process task relation by taskCode * process task relation by taskCode
@ -71,17 +80,6 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
*/ */
List<ProcessTaskRelation> queryByTaskCode(@Param("taskCode") long taskCode); List<ProcessTaskRelation> queryByTaskCode(@Param("taskCode") long taskCode);
/**
* delete process task relation by processCode
*
* @param projectCode projectCode
* @param processCode processCode
* @return int
*/
@CacheEvict(key = "#projectCode + '_' + #processCode")
int deleteByCode(@Name("projectCode") @Param("projectCode") long projectCode,
@Name("processCode") @Param("processCode") long processCode);
/** /**
* batch insert process task relation * batch insert process task relation
* *

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.mapper; package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.Schedule;
@ -22,7 +23,6 @@ import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
@ -36,8 +36,24 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
@CacheConfig(cacheNames = "schedule", keyGenerator = "cacheKeyGenerator") @CacheConfig(cacheNames = "schedule", keyGenerator = "cacheKeyGenerator")
public interface ScheduleMapper extends BaseMapper<Schedule> { public interface ScheduleMapper extends BaseMapper<Schedule> {
@CacheEvict(key = "#p0.processDefinitionCode")
int insert(Schedule entity);
@CacheEvict(key = "#p0.processDefinitionCode")
int updateById(@Param("et") Schedule entity);
/**
* query schedule list by process definition code
*
* @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
@Cacheable(sync = true)
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/** /**
* scheduler page * scheduler page
*
* @param page page * @param page page
* @param processDefinitionCode processDefinitionCode * @param processDefinitionCode processDefinitionCode
* @param searchVal searchVal * @param searchVal searchVal
@ -49,6 +65,7 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/** /**
* query schedule list by project name * query schedule list by project name
*
* @param projectName projectName * @param projectName projectName
* @return schedule list * @return schedule list
*/ */
@ -56,6 +73,7 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/** /**
* query schedule list by process definition codes * query schedule list by process definition codes
*
* @param processDefineCodes processDefineCodes * @param processDefineCodes processDefineCodes
* @return schedule list * @return schedule list
*/ */
@ -63,22 +81,9 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
/** /**
* query schedule list by process definition code * query schedule list by process definition code
*
* @param processDefinitionCode processDefinitionCode * @param processDefinitionCode processDefinitionCode
* @return schedule * @return schedule
*/ */
Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); Schedule queryByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
/**
* query schedule list by process definition code
* @param processDefinitionCode processDefinitionCode
* @return schedule list
*/
@Cacheable(sync = true)
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode);
@CacheEvict(key = "#entity.processDefinitionCode")
int insert(@Name("entity") Schedule entity);
@CacheEvict(key = "#entity.processDefinitionCode")
int updateById(@Name("entity") @Param("et")Schedule entity);
} }

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -25,7 +25,6 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
@ -40,13 +39,6 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
@CacheConfig(cacheNames = "taskDefinition", keyGenerator = "cacheKeyGenerator") @CacheConfig(cacheNames = "taskDefinition", keyGenerator = "cacheKeyGenerator")
public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> { public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
/**
* query max version for definition
*
* @param code taskDefinitionCode
*/
Integer queryMaxVersionForDefinition(@Param("code") long code);
/** /**
* query task definition log * query task definition log
* *
@ -54,15 +46,31 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version version * @param version version
* @return task definition log * @return task definition log
*/ */
@Cacheable(sync = true, key = "#code + '_' + #version") @Cacheable(sync = true)
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
@Param("version") int version);
/** /**
* update * update
*/ */
@CacheEvict(key = "#taskDefinitionLog.code + '_' + #taskDefinitionLog.version") @CacheEvict(key = "#p0.code + '_' + #p0.version")
int updateById(@Name("taskDefinitionLog") @Param("et") TaskDefinitionLog taskDefinitionLog); int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
/**
* delete the certain task definition version by task definition code and version
*
* @param code task definition code
* @param version task definition version
* @return delete result
*/
@CacheEvict
int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version);
/**
* query max version for definition
*
* @param code taskDefinitionCode
*/
Integer queryMaxVersionForDefinition(@Param("code") long code);
/** /**
* @param taskDefinitions taskDefinition list * @param taskDefinitions taskDefinition list
@ -78,16 +86,6 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
*/ */
int batchInsert(@Param("taskDefinitionLogs") List<TaskDefinitionLog> taskDefinitionLogs); int batchInsert(@Param("taskDefinitionLogs") List<TaskDefinitionLog> taskDefinitionLogs);
/**
* delete the certain task definition version by task definition code and version
*
* @param code task definition code
* @param version task definition version
* @return delete result
*/
@CacheEvict(key = "#code + '_' #version")
int deleteByCodeAndVersion(@Name("code") @Param("code") long code, @Name("version") @Param("version") int version);
/** /**
* query the paging task definition version list by pagination info * query the paging task definition version list by pagination info
* *

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
@ -53,8 +52,8 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/** /**
* update * update
*/ */
@CacheEvict(key = "#tenant.id") @CacheEvict(key = "#p0.id")
int updateById(@Name("tenant") @Param("et") Tenant tenant); int updateById(@Param("et") Tenant tenant);
/** /**
* query tenant by code * query tenant by code

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java

@ -23,7 +23,6 @@ import org.apache.ibatis.annotations.Param;
import java.util.List; import java.util.List;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Cacheable;
@ -53,8 +52,8 @@ public interface UserMapper extends BaseMapper<User> {
/** /**
* update * update
*/ */
@CacheEvict(key = "#user.id") @CacheEvict(key = "#p0.id")
int updateById(@Name("user") @Param("et") User user); int updateById(@Param("et") User user);
/** /**
* query all general user * query all general user
@ -166,6 +165,7 @@ public interface UserMapper extends BaseMapper<User> {
/** /**
* query authed user list by projectId * query authed user list by projectId
*
* @param projectId projectId * @param projectId projectId
* @return user list * @return user list
*/ */

Loading…
Cancel
Save