From a80a7b3fcf6154764ef82b3b6536c8ee54f90444 Mon Sep 17 00:00:00 2001 From: wind Date: Tue, 7 Dec 2021 15:55:22 +0800 Subject: [PATCH] [Feature-6963][MasterServer] unified cache manager (#7187) * unified cache * reduce db select * checkstyle Co-authored-by: caishunfeng <534328519@qq.com> --- .../api/aspect/CacheEvictAspect.java | 32 ++-- .../dao/mapper/ProcessDefinitionMapper.java | 9 +- .../dao/mapper/ProcessTaskRelationMapper.java | 43 +++--- .../dao/mapper/ScheduleMapper.java | 2 +- .../dao/mapper/TaskDefinitionLogMapper.java | 11 +- .../dao/mapper/TenantMapper.java | 5 +- .../dao/mapper/UserMapper.java | 5 +- .../dao/mapper/WorkerGroupMapper.java | 10 +- .../remote/command/CacheExpireCommand.java | 18 +-- .../command/cache/CacheExpireCommandTest.java | 5 +- .../server/master/config/MasterConfig.java | 9 -- .../master/processor/CacheProcessor.java | 143 +----------------- .../master/runner/EventExecuteService.java | 7 +- .../master/runner/WorkflowExecuteThread.java | 44 +++++- .../main/resources/application-master.yaml | 8 +- .../master/processor/CacheProcessorTest.java | 2 +- .../service/process/ProcessService.java | 15 +- .../service/cache/CacheNotifyServiceTest.java | 2 +- 18 files changed, 138 insertions(+), 232 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java index 336be21d9a..39d697545c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java @@ -22,6 +22,8 @@ import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; import org.apache.dolphinscheduler.service.cache.CacheNotifyService; import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator; +import org.apache.commons.lang3.StringUtils; + import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.ArrayList; @@ -34,6 +36,8 @@ import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.bind.Name; import org.springframework.cache.annotation.CacheConfig; @@ -50,7 +54,12 @@ import org.springframework.stereotype.Component; @Component public class CacheEvictAspect { - private static final String UPDATE_BY_ID = "updateById"; + private static final Logger logger = LoggerFactory.getLogger(CacheEvictAspect.class); + + /** + * symbol of spring el + */ + private static final String EL_SYMBOL = "#"; @Autowired private CacheKeyGenerator cacheKeyGenerator; @@ -77,17 +86,18 @@ public class CacheEvictAspect { CacheType cacheType = getCacheType(cacheConfig, cacheEvict); if (cacheType != null) { - // todo use springEL is better - if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length == 1) { - Object updateObj = args[0]; - cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, updateObj).convert2Command()); - } else if (!cacheEvict.key().isEmpty()) { - List paramsList = getParamAnnotationsByType(method, Name.class); - String key = parseKey(cacheEvict.key(), paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), Arrays.asList(args)); - cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command()); + String cacheKey; + if (cacheEvict.key().isEmpty()) { + cacheKey = (String) cacheKeyGenerator.generate(target, method, args); } else { - Object key = cacheKeyGenerator.generate(target, method, args); - cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command()); + cacheKey = cacheEvict.key(); + List paramsList = getParamAnnotationsByType(method, Name.class); + if (cacheEvict.key().contains(EL_SYMBOL)) { + cacheKey = parseKey(cacheEvict.key(), paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), Arrays.asList(args)); + } + } + if (StringUtils.isNotEmpty(cacheKey)) { + cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, cacheKey).convert2Command()); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java index ca287e25c6..954b55ba12 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.springframework.boot.context.properties.bind.Name; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; @@ -52,8 +53,8 @@ public interface ProcessDefinitionMapper extends BaseMapper { /** * update */ - @CacheEvict - int updateById(@Param("et") ProcessDefinition processDefinition); + @CacheEvict(key = "#processDefinition.code") + int updateById(@Name("processDefinition") @Param("et") ProcessDefinition processDefinition); /** * query process definition by code list @@ -69,8 +70,8 @@ public interface ProcessDefinitionMapper extends BaseMapper { * @param code code * @return delete result */ - @CacheEvict - int deleteByCode(@Param("code") long code); + @CacheEvict(key = "#code") + int deleteByCode(@Name("code") @Param("code") long code); /** * verify process definition by name diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 5b3ea752ec..010b8e4d28 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param; import java.util.List; import java.util.Map; +import org.springframework.boot.context.properties.bind.Name; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; @@ -51,8 +52,8 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryUpstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); @@ -110,7 +111,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryDownstreamByCode(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode); @@ -118,24 +119,24 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode,@Param("preTaskCodes") Long[] preTaskCodes); + List queryUpstreamByCodes(@Param("projectCode") long projectCode, @Param("taskCode") long taskCode, @Param("preTaskCodes") Long[] preTaskCodes); /** * count upstream by codes * * @param projectCode projectCode - * @param taskCode taskCode - * @param processDefinitionCodes processDefinitionCodes + * @param taskCode taskCode + * @param processDefinitionCodes processDefinitionCodes * @return upstream count list group by process definition code */ List> countUpstreamByCodeGroupByProcessDefinitionCode(@Param("projectCode") long projectCode, - @Param("processDefinitionCodes") Long[] processDefinitionCodes, - @Param("taskCode") long taskCode); + @Param("processDefinitionCodes") Long[] processDefinitionCodes, + @Param("taskCode") long taskCode); /** * batch update process task relation pre task @@ -148,10 +149,10 @@ public interface ProcessTaskRelationMapper extends BaseMapper queryByCode(@Param("projectCode") long projectCode, @@ -162,7 +163,7 @@ public interface ProcessTaskRelationMapper extends BaseMapper { @CacheEvict(key = "#entity.processDefinitionCode") int insert(@Name("entity") Schedule entity); - @CacheEvict + @CacheEvict(key = "#entity.processDefinitionCode") int updateById(@Name("entity") @Param("et")Schedule entity); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java index 75e1b29c6d..30ee855e8a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java @@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param; import java.util.Collection; import java.util.List; +import org.springframework.boot.context.properties.bind.Name; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; @@ -53,15 +54,15 @@ public interface TaskDefinitionLogMapper extends BaseMapper { * @param version version * @return task definition log */ - @Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion") + @Cacheable(sync = true, key = "#code + '_' + #version") TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version); /** * update */ - @CacheEvict - int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog); + @CacheEvict(key = "#taskDefinitionLog.code + '_' + #taskDefinitionLog.version") + int updateById(@Name("taskDefinitionLog") @Param("et") TaskDefinitionLog taskDefinitionLog); /** * @param taskDefinitions taskDefinition list @@ -84,8 +85,8 @@ public interface TaskDefinitionLogMapper extends BaseMapper { * @param version task definition version * @return delete result */ - @CacheEvict - int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version); + @CacheEvict(key = "#code + '_' #version") + int deleteByCodeAndVersion(@Name("code") @Param("code") long code, @Name("version") @Param("version") int version); /** * query the paging task definition version list by pagination info diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java index 843122fe36..0def8b85fc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.ibatis.annotations.Param; +import org.springframework.boot.context.properties.bind.Name; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; @@ -52,8 +53,8 @@ public interface TenantMapper extends BaseMapper { /** * update */ - @CacheEvict - int updateById(@Param("et") Tenant tenant); + @CacheEvict(key = "#tenant.id") + int updateById(@Name("tenant") @Param("et") Tenant tenant); /** * query tenant by code diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java index 1d5ad8f328..460c9542ec 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java @@ -23,6 +23,7 @@ import org.apache.ibatis.annotations.Param; import java.util.List; +import org.springframework.boot.context.properties.bind.Name; import org.springframework.cache.annotation.CacheConfig; import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; @@ -52,8 +53,8 @@ public interface UserMapper extends BaseMapper { /** * update */ - @CacheEvict - int updateById(@Param("et") User user); + @CacheEvict(key = "#user.id") + int updateById(@Name("user") @Param("et") User user); /** * query all general user diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java index b932c0b026..bfe01f83b3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java @@ -37,22 +37,24 @@ public interface WorkerGroupMapper extends BaseMapper { /** * query all worker group + * * @return worker group list */ - @Cacheable(sync = true, key = "'all'") + @Cacheable(sync = true, key = "all") List queryAllWorkerGroup(); - @CacheEvict + @CacheEvict(key = "all") int deleteById(Integer id); - @CacheEvict + @CacheEvict(key = "all") int insert(WorkerGroup entity); - @CacheEvict + @CacheEvict(key = "all") int updateById(@Param("et") WorkerGroup entity); /** * query worer grouop by name + * * @param name name * @return worker group list */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java index 26170af13b..a32d4fce13 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java @@ -28,29 +28,23 @@ import java.io.Serializable; public class CacheExpireCommand implements Serializable { private CacheType cacheType; - private Class updateObjClass; - private String updateObjJson; + private String cacheKey; public CacheExpireCommand() { super(); } - public CacheExpireCommand(CacheType cacheType, Object updateObj) { + public CacheExpireCommand(CacheType cacheType, String cacheKey) { this.cacheType = cacheType; - this.updateObjClass = updateObj.getClass(); - this.updateObjJson = JSONUtils.toJsonString(updateObj); + this.cacheKey = cacheKey; } public CacheType getCacheType() { return cacheType; } - public Class getUpdateObjClass() { - return updateObjClass; - } - - public String getUpdateObjJson() { - return updateObjJson; + public String getCacheKey() { + return cacheKey; } /** @@ -68,6 +62,6 @@ public class CacheExpireCommand implements Serializable { @Override public String toString() { - return String.format("CacheExpireCommand{CacheType=%s, updateObjClass=%s, updateObjJson=%s}", cacheType, updateObjClass, updateObjJson); + return String.format("CacheExpireCommand{CacheType=%s, cacheKey=%s}", cacheType, cacheKey); } } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java index 1c91d15fb0..23512341ee 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java @@ -29,10 +29,7 @@ public class CacheExpireCommandTest { @Test public void testConvert2Command() { - CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, 1); - Assert.assertEquals(Integer.class, cacheExpireCommand.getUpdateObjClass()); - Assert.assertEquals("1", cacheExpireCommand.getUpdateObjJson()); - + CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, "1"); Command command = cacheExpireCommand.convert2Command(); Assert.assertEquals(CommandType.CACHE_EXPIRE, command.getType()); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index d4a9958203..88ecfbde57 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -39,7 +39,6 @@ public class MasterConfig { private int stateWheelInterval; private double maxCpuLoadAvg; private double reservedMemory; - private boolean cacheProcessDefinition; public int getListenPort() { return listenPort; @@ -136,12 +135,4 @@ public class MasterConfig { public void setReservedMemory(double reservedMemory) { this.reservedMemory = reservedMemory; } - - public boolean isCacheProcessDefinition() { - return cacheProcessDefinition; - } - - public void setCacheProcessDefinition(boolean cacheProcessDefinition) { - this.cacheProcessDefinition = cacheProcessDefinition; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java index c66a57b5d4..6db7f65d7f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java @@ -19,14 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.common.enums.CacheType; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; -import org.apache.dolphinscheduler.dao.entity.Queue; -import org.apache.dolphinscheduler.dao.entity.Schedule; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.Tenant; -import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.remote.command.CacheExpireCommand; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -67,142 +59,15 @@ public class CacheProcessor implements NettyRequestProcessor { cacheManager = SpringApplicationContext.getBean(CacheManager.class); } - Object object = JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), cacheExpireCommand.getUpdateObjClass()); - if (object == null) { + if (cacheExpireCommand.getCacheKey().isEmpty()) { return; } CacheType cacheType = cacheExpireCommand.getCacheType(); - switch (cacheType) { - case TENANT: - if (object instanceof Tenant) { - Tenant tenant = (Tenant) object; - tenantCacheExpire(tenant); - } - break; - case USER: - if (object instanceof User) { - User user = (User) object; - userCacheExpire(user); - } - break; - case QUEUE: - if (object instanceof Queue) { - Queue queue = (Queue) object; - queueCacheExpire(queue); - } - break; - case PROCESS_DEFINITION: - if (object instanceof ProcessDefinition) { - ProcessDefinition processDefinition = (ProcessDefinition) object; - processDefinitionCacheExpire(processDefinition); - } - break; - case TASK_DEFINITION: - if (object instanceof TaskDefinition) { - TaskDefinition taskDefinition = (TaskDefinition) object; - taskDefinitionCacheExpire(taskDefinition); - } - break; - case PROCESS_TASK_RELATION: - if (object instanceof ProcessTaskRelation) { - ProcessTaskRelation processTaskRelation = (ProcessTaskRelation) object; - processTaskRelationCacheExpire(processTaskRelation); - } - break; - case WORKER_GROUP: - if (object instanceof WorkerGroup) { - WorkerGroup workerGroup = (WorkerGroup) object; - workerGroupCacheExpire(workerGroup); - } - break; - case SCHEDULE: - if (object instanceof Schedule) { - Schedule schedule = (Schedule) object; - scheduleCacheExpire(schedule); - } - break; - default: - logger.error("no support cache type:{}", cacheType); - } - - // if delete operation, just send key - if (object instanceof String) { - Cache cache = cacheManager.getCache(cacheType.getCacheName()); - if (cache != null) { - cache.evict(object); - logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), object); - } - } - } - - private void tenantCacheExpire(Tenant tenant) { - Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName()); - if (cache != null) { - cache.evict(tenant.getId()); - logger.info("cache evict, type:{}, key:{}", CacheType.TENANT.getCacheName(), tenant.getId()); - } - } - - private void userCacheExpire(User user) { - Cache cache = cacheManager.getCache(CacheType.USER.getCacheName()); - if (cache != null) { - cache.evict(user.getId()); - logger.info("cache evict, type:{}, key:{}", CacheType.USER.getCacheName(), user.getId()); - } - } - - private void queueCacheExpire(Queue queue) { - Cache cache = cacheManager.getCache(CacheType.USER.getCacheName()); - if (cache != null) { - cache.clear(); - logger.info("cache evict, type:{}, clear", CacheType.USER.getCacheName()); - } - } - - private void processDefinitionCacheExpire(ProcessDefinition processDefinition) { - Cache cache = cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName()); - if (cache != null) { - cache.evict(processDefinition.getCode()); - cache.evict(processDefinition.getCode() + "_" + processDefinition.getVersion()); - logger.info("cache evict, type:{}, key:{}", - CacheType.PROCESS_DEFINITION.getCacheName(), processDefinition.getCode() + "_" + processDefinition.getVersion()); - } - } - - private void processTaskRelationCacheExpire(ProcessTaskRelation processTaskRelation) { - Cache cache = cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName()); - if (cache != null) { - cache.evict(processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode()); - logger.info("cache evict, type:{}, key:{}", - CacheType.PROCESS_TASK_RELATION.getCacheName(), processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode()); - } - } - - private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) { - Cache cache = cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName()); - if (cache != null) { - cache.evict(taskDefinition.getCode() + "_" + taskDefinition.getVersion()); - logger.info("cache evict, type:{}, key:{}", - CacheType.TASK_DEFINITION.getCacheName(), taskDefinition.getCode() + "_" + taskDefinition.getVersion()); - } - } - - private void workerGroupCacheExpire(WorkerGroup workerGroup) { - Cache cache = cacheManager.getCache(CacheType.WORKER_GROUP.getCacheName()); - if (cache != null) { - cache.evict("all"); - logger.info("cache evict, type:{}, key:{}", - CacheType.WORKER_GROUP.getCacheName(), "all"); - } - } - - private void scheduleCacheExpire(Schedule schedule) { - Cache cache = cacheManager.getCache(CacheType.SCHEDULE.getCacheName()); + Cache cache = cacheManager.getCache(cacheType.getCacheName()); if (cache != null) { - cache.evict(schedule.getProcessDefinitionCode()); - logger.info("cache evict, type:{}, key:{}", - CacheType.SCHEDULE.getCacheName(), schedule.getProcessDefinitionCode()); + cache.evict(cacheExpireCommand.getCacheKey()); + logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), cacheExpireCommand.getCacheKey()); } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index c40618ab1c..3da043c50b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.thread.Stopper; @@ -146,9 +147,11 @@ public class EventExecuteService extends Thread { } private void notifyProcessChanged() { - Map fatherMaps - = processService.notifyProcessList(processInstanceId, 0); + if (Flag.NO == workflowExecuteThread.getProcessInstance().getIsSubProcess()) { + return; + } + Map fatherMaps = processService.notifyProcessList(processInstanceId); for (ProcessInstance processInstance : fatherMaps.keySet()) { String address = NetUtils.getAddr(masterConfig.getListenPort()); if (processInstance.getHost().equalsIgnoreCase(address)) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 9116ce5771..fe60ec61d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -51,9 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; import org.apache.dolphinscheduler.dao.entity.ProjectUser; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.DagHelper; @@ -84,6 +86,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -682,11 +685,10 @@ public class WorkflowExecuteThread implements Runnable { ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser); } - List taskInstances = processService.findValidTaskListByProcessId(processInstance.getId()); - ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); - processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser); - //release task group - processService.releaseAllTaskGroup(processInstance.getId()); + if (checkTaskQueue()) { + //release task group + processService.releaseAllTaskGroup(processInstance.getId()); + } } public void checkSerialProcess(ProcessDefinition processDefinition) { @@ -725,8 +727,10 @@ public class WorkflowExecuteThread implements Runnable { processInstance.setProcessDefinition(processDefinition); List recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam()); - List taskNodeList = - processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList()); + + List processTaskRelations = processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()); + List taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations); + List taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs); forbiddenTaskMap.clear(); taskNodeList.forEach(taskNode -> { @@ -759,7 +763,7 @@ public class WorkflowExecuteThread implements Runnable { completeTaskMap.clear(); errorTaskMap.clear(); - if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) { + if (!isNewProcessInstance()) { List validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId()); for (TaskInstance task : validTaskInstanceList) { validTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); @@ -1625,4 +1629,28 @@ public class WorkflowExecuteThread implements Runnable { TaskDependType depNodeType) throws Exception { return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); } + + /** + * check task queue + */ + private boolean checkTaskQueue() { + AtomicBoolean result = new AtomicBoolean(false); + taskInstanceMap.forEach((id, taskInstance) -> { + if (taskInstance != null && taskInstance.getTaskGroupId() > 0) { + result.set(true); + } + }); + return result.get(); + } + + /** + * is new process instance + */ + private boolean isNewProcessInstance() { + if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) { + return true; + } else { + return false; + } + } } diff --git a/dolphinscheduler-server/src/main/resources/application-master.yaml b/dolphinscheduler-server/src/main/resources/application-master.yaml index 866b2000ed..12ed516369 100644 --- a/dolphinscheduler-server/src/main/resources/application-master.yaml +++ b/dolphinscheduler-server/src/main/resources/application-master.yaml @@ -18,7 +18,7 @@ spring: application: name: master-server cache: - # default enable cache, you can disable by `type: none` + # default unable cache, you can disable by `type: caffeine` type: none cache-names: - tenant @@ -26,6 +26,8 @@ spring: - processDefinition - processTaskRelation - taskDefinition + - workerGroup + - schedule caffeine: spec: maximumSize=100,expireAfterWrite=300s,recordStats @@ -37,8 +39,6 @@ master: pre-exec-threads: 10 # master execute thread number to limit process instances in parallel exec-threads: 100 - # master execute task number in parallel per process instance - exec-task-num: 20 # master dispatch task number per batch dispatch-task-number: 3 # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight @@ -54,8 +54,6 @@ master: max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 - # master cache process definition, default: true - cache-process-definition: true server: port: 5679 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java index 6f1907ccec..5c177ca94e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java @@ -66,7 +66,7 @@ public class CacheProcessorTest { public void testProcess() { Tenant tenant = new Tenant(); tenant.setId(1); - CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, tenant); + CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, "1"); Command command = cacheExpireCommand.convert2Command(); cacheProcessor.process(channel, command); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c3ee5c873f..e11a1089c9 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2418,6 +2418,19 @@ public class ProcessService { return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet); } + public List getTaskDefineLogListByRelation(List processTaskRelations) { + List taskDefinitionLogs = com.google.common.collect.Lists.newArrayList(); + for (ProcessTaskRelation processTaskRelation : processTaskRelations) { + if (processTaskRelation.getPreTaskCode() > 0) { + taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion())); + } + if (processTaskRelation.getPostTaskCode() > 0) { + taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion())); + } + } + return taskDefinitionLogs; + } + /** * find task definition by code and version */ @@ -2499,7 +2512,7 @@ public class ProcessService { return taskNodeList; } - public Map notifyProcessList(int processId, int taskId) { + public Map notifyProcessList(int processId) { HashMap processTaskMap = new HashMap<>(); //find sub tasks ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(processId); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java index 04d1f785d0..a3dafb677b 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java @@ -61,7 +61,7 @@ public class CacheNotifyServiceTest { public void testNotifyMaster() { User user1 = new User(); user1.setId(100); - Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, user1).convert2Command(); + Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, "100").convert2Command(); NettyServerConfig serverConfig = new NettyServerConfig();