Browse Source

[Feature-6963][MasterServer] unified cache manager (#7187)

* unified cache

* reduce db select

* checkstyle

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
a80a7b3fcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java
  2. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java
  3. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java
  4. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java
  5. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  6. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java
  7. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java
  8. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
  9. 18
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java
  10. 5
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java
  11. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  12. 141
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java
  13. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  14. 40
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  15. 8
      dolphinscheduler-server/src/main/resources/application-master.yaml
  16. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java
  17. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  18. 2
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java

@ -22,6 +22,8 @@ import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.service.cache.CacheNotifyService;
import org.apache.dolphinscheduler.service.cache.impl.CacheKeyGenerator;
import org.apache.commons.lang3.StringUtils;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
@ -34,6 +36,8 @@ import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
@ -50,7 +54,12 @@ import org.springframework.stereotype.Component;
@Component
public class CacheEvictAspect {
private static final String UPDATE_BY_ID = "updateById";
private static final Logger logger = LoggerFactory.getLogger(CacheEvictAspect.class);
/**
* symbol of spring el
*/
private static final String EL_SYMBOL = "#";
@Autowired
private CacheKeyGenerator cacheKeyGenerator;
@ -77,17 +86,18 @@ public class CacheEvictAspect {
CacheType cacheType = getCacheType(cacheConfig, cacheEvict);
if (cacheType != null) {
// todo use springEL is better
if (method.getName().equalsIgnoreCase(UPDATE_BY_ID) && args.length == 1) {
Object updateObj = args[0];
cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, updateObj).convert2Command());
} else if (!cacheEvict.key().isEmpty()) {
List<Name> paramsList = getParamAnnotationsByType(method, Name.class);
String key = parseKey(cacheEvict.key(), paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), Arrays.asList(args));
cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command());
String cacheKey;
if (cacheEvict.key().isEmpty()) {
cacheKey = (String) cacheKeyGenerator.generate(target, method, args);
} else {
Object key = cacheKeyGenerator.generate(target, method, args);
cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, key).convert2Command());
cacheKey = cacheEvict.key();
List<Name> paramsList = getParamAnnotationsByType(method, Name.class);
if (cacheEvict.key().contains(EL_SYMBOL)) {
cacheKey = parseKey(cacheEvict.key(), paramsList.stream().map(o -> o.value()).collect(Collectors.toList()), Arrays.asList(args));
}
}
if (StringUtils.isNotEmpty(cacheKey)) {
cacheNotifyService.notifyMaster(new CacheExpireCommand(cacheType, cacheKey).convert2Command());
}
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionMapper.java

@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -52,8 +53,8 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
/**
* update
*/
@CacheEvict
int updateById(@Param("et") ProcessDefinition processDefinition);
@CacheEvict(key = "#processDefinition.code")
int updateById(@Name("processDefinition") @Param("et") ProcessDefinition processDefinition);
/**
* query process definition by code list
@ -69,8 +70,8 @@ public interface ProcessDefinitionMapper extends BaseMapper<ProcessDefinition> {
* @param code code
* @return delete result
*/
@CacheEvict
int deleteByCode(@Param("code") long code);
@CacheEvict(key = "#code")
int deleteByCode(@Name("code") @Param("code") long code);
/**
* verify process definition by name

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java

@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -51,8 +52,8 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
/**
* update
*/
@CacheEvict
int updateById(@Param("et") ProcessTaskRelation processTaskRelation);
@CacheEvict(key = "#processTaskRelation.projectCode + '_' + #processTaskRelation.processDefinitionCode")
int updateById(@Name("processTaskRelation") @Param("et") ProcessTaskRelation processTaskRelation);
/**
* process task relation by taskCode
@ -77,9 +78,9 @@ public interface ProcessTaskRelationMapper extends BaseMapper<ProcessTaskRelatio
* @param processCode processCode
* @return int
*/
@CacheEvict
int deleteByCode(@Param("projectCode") long projectCode,
@Param("processCode") long processCode);
@CacheEvict(key = "#projectCode + '_' + #processCode")
int deleteByCode(@Name("projectCode") @Param("projectCode") long projectCode,
@Name("processCode") @Param("processCode") long processCode);
/**
* batch insert process task relation

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ScheduleMapper.java

@ -79,6 +79,6 @@ public interface ScheduleMapper extends BaseMapper<Schedule> {
@CacheEvict(key = "#entity.processDefinitionCode")
int insert(@Name("entity") Schedule entity);
@CacheEvict
@CacheEvict(key = "#entity.processDefinitionCode")
int updateById(@Name("entity") @Param("et")Schedule entity);
}

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -25,6 +25,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.Collection;
import java.util.List;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -53,15 +54,15 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version version
* @return task definition log
*/
@Cacheable(sync = true, key = "#taskCode + '_' + #taskDefinitionVersion")
@Cacheable(sync = true, key = "#code + '_' + #version")
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
@Param("version") int version);
/**
* update
*/
@CacheEvict
int updateById(@Param("et") TaskDefinitionLog taskDefinitionLog);
@CacheEvict(key = "#taskDefinitionLog.code + '_' + #taskDefinitionLog.version")
int updateById(@Name("taskDefinitionLog") @Param("et") TaskDefinitionLog taskDefinitionLog);
/**
* @param taskDefinitions taskDefinition list
@ -84,8 +85,8 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @param version task definition version
* @return delete result
*/
@CacheEvict
int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version);
@CacheEvict(key = "#code + '_' #version")
int deleteByCodeAndVersion(@Name("code") @Param("code") long code, @Name("version") @Param("version") int version);
/**
* query the paging task definition version list by pagination info

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.ibatis.annotations.Param;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -52,8 +53,8 @@ public interface TenantMapper extends BaseMapper<Tenant> {
/**
* update
*/
@CacheEvict
int updateById(@Param("et") Tenant tenant);
@CacheEvict(key = "#tenant.id")
int updateById(@Name("tenant") @Param("et") Tenant tenant);
/**
* query tenant by code

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java

@ -23,6 +23,7 @@ import org.apache.ibatis.annotations.Param;
import java.util.List;
import org.springframework.boot.context.properties.bind.Name;
import org.springframework.cache.annotation.CacheConfig;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
@ -52,8 +53,8 @@ public interface UserMapper extends BaseMapper<User> {
/**
* update
*/
@CacheEvict
int updateById(@Param("et") User user);
@CacheEvict(key = "#user.id")
int updateById(@Name("user") @Param("et") User user);
/**
* query all general user

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java

@ -37,22 +37,24 @@ public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
/**
* query all worker group
*
* @return worker group list
*/
@Cacheable(sync = true, key = "'all'")
@Cacheable(sync = true, key = "all")
List<WorkerGroup> queryAllWorkerGroup();
@CacheEvict
@CacheEvict(key = "all")
int deleteById(Integer id);
@CacheEvict
@CacheEvict(key = "all")
int insert(WorkerGroup entity);
@CacheEvict
@CacheEvict(key = "all")
int updateById(@Param("et") WorkerGroup entity);
/**
* query worer grouop by name
*
* @param name name
* @return worker group list
*/

18
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CacheExpireCommand.java

@ -28,29 +28,23 @@ import java.io.Serializable;
public class CacheExpireCommand implements Serializable {
private CacheType cacheType;
private Class updateObjClass;
private String updateObjJson;
private String cacheKey;
public CacheExpireCommand() {
super();
}
public CacheExpireCommand(CacheType cacheType, Object updateObj) {
public CacheExpireCommand(CacheType cacheType, String cacheKey) {
this.cacheType = cacheType;
this.updateObjClass = updateObj.getClass();
this.updateObjJson = JSONUtils.toJsonString(updateObj);
this.cacheKey = cacheKey;
}
public CacheType getCacheType() {
return cacheType;
}
public Class getUpdateObjClass() {
return updateObjClass;
}
public String getUpdateObjJson() {
return updateObjJson;
public String getCacheKey() {
return cacheKey;
}
/**
@ -68,6 +62,6 @@ public class CacheExpireCommand implements Serializable {
@Override
public String toString() {
return String.format("CacheExpireCommand{CacheType=%s, updateObjClass=%s, updateObjJson=%s}", cacheType, updateObjClass, updateObjJson);
return String.format("CacheExpireCommand{CacheType=%s, cacheKey=%s}", cacheType, cacheKey);
}
}

5
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/cache/CacheExpireCommandTest.java vendored

@ -29,10 +29,7 @@ public class CacheExpireCommandTest {
@Test
public void testConvert2Command() {
CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, 1);
Assert.assertEquals(Integer.class, cacheExpireCommand.getUpdateObjClass());
Assert.assertEquals("1", cacheExpireCommand.getUpdateObjJson());
CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, "1");
Command command = cacheExpireCommand.convert2Command();
Assert.assertEquals(CommandType.CACHE_EXPIRE, command.getType());
}

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -39,7 +39,6 @@ public class MasterConfig {
private int stateWheelInterval;
private double maxCpuLoadAvg;
private double reservedMemory;
private boolean cacheProcessDefinition;
public int getListenPort() {
return listenPort;
@ -136,12 +135,4 @@ public class MasterConfig {
public void setReservedMemory(double reservedMemory) {
this.reservedMemory = reservedMemory;
}
public boolean isCacheProcessDefinition() {
return cacheProcessDefinition;
}
public void setCacheProcessDefinition(boolean cacheProcessDefinition) {
this.cacheProcessDefinition = cacheProcessDefinition;
}
}

141
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessor.java

@ -19,14 +19,6 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.CacheType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.remote.command.CacheExpireCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -67,142 +59,15 @@ public class CacheProcessor implements NettyRequestProcessor {
cacheManager = SpringApplicationContext.getBean(CacheManager.class);
}
Object object = JSONUtils.parseObject(cacheExpireCommand.getUpdateObjJson(), cacheExpireCommand.getUpdateObjClass());
if (object == null) {
if (cacheExpireCommand.getCacheKey().isEmpty()) {
return;
}
CacheType cacheType = cacheExpireCommand.getCacheType();
switch (cacheType) {
case TENANT:
if (object instanceof Tenant) {
Tenant tenant = (Tenant) object;
tenantCacheExpire(tenant);
}
break;
case USER:
if (object instanceof User) {
User user = (User) object;
userCacheExpire(user);
}
break;
case QUEUE:
if (object instanceof Queue) {
Queue queue = (Queue) object;
queueCacheExpire(queue);
}
break;
case PROCESS_DEFINITION:
if (object instanceof ProcessDefinition) {
ProcessDefinition processDefinition = (ProcessDefinition) object;
processDefinitionCacheExpire(processDefinition);
}
break;
case TASK_DEFINITION:
if (object instanceof TaskDefinition) {
TaskDefinition taskDefinition = (TaskDefinition) object;
taskDefinitionCacheExpire(taskDefinition);
}
break;
case PROCESS_TASK_RELATION:
if (object instanceof ProcessTaskRelation) {
ProcessTaskRelation processTaskRelation = (ProcessTaskRelation) object;
processTaskRelationCacheExpire(processTaskRelation);
}
break;
case WORKER_GROUP:
if (object instanceof WorkerGroup) {
WorkerGroup workerGroup = (WorkerGroup) object;
workerGroupCacheExpire(workerGroup);
}
break;
case SCHEDULE:
if (object instanceof Schedule) {
Schedule schedule = (Schedule) object;
scheduleCacheExpire(schedule);
}
break;
default:
logger.error("no support cache type:{}", cacheType);
}
// if delete operation, just send key
if (object instanceof String) {
Cache cache = cacheManager.getCache(cacheType.getCacheName());
if (cache != null) {
cache.evict(object);
logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), object);
}
}
}
private void tenantCacheExpire(Tenant tenant) {
Cache cache = cacheManager.getCache(CacheType.TENANT.getCacheName());
if (cache != null) {
cache.evict(tenant.getId());
logger.info("cache evict, type:{}, key:{}", CacheType.TENANT.getCacheName(), tenant.getId());
}
}
private void userCacheExpire(User user) {
Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
if (cache != null) {
cache.evict(user.getId());
logger.info("cache evict, type:{}, key:{}", CacheType.USER.getCacheName(), user.getId());
}
}
private void queueCacheExpire(Queue queue) {
Cache cache = cacheManager.getCache(CacheType.USER.getCacheName());
if (cache != null) {
cache.clear();
logger.info("cache evict, type:{}, clear", CacheType.USER.getCacheName());
}
}
private void processDefinitionCacheExpire(ProcessDefinition processDefinition) {
Cache cache = cacheManager.getCache(CacheType.PROCESS_DEFINITION.getCacheName());
if (cache != null) {
cache.evict(processDefinition.getCode());
cache.evict(processDefinition.getCode() + "_" + processDefinition.getVersion());
logger.info("cache evict, type:{}, key:{}",
CacheType.PROCESS_DEFINITION.getCacheName(), processDefinition.getCode() + "_" + processDefinition.getVersion());
}
}
private void processTaskRelationCacheExpire(ProcessTaskRelation processTaskRelation) {
Cache cache = cacheManager.getCache(CacheType.PROCESS_TASK_RELATION.getCacheName());
if (cache != null) {
cache.evict(processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
logger.info("cache evict, type:{}, key:{}",
CacheType.PROCESS_TASK_RELATION.getCacheName(), processTaskRelation.getProjectCode() + "_" + processTaskRelation.getProcessDefinitionCode());
}
}
private void taskDefinitionCacheExpire(TaskDefinition taskDefinition) {
Cache cache = cacheManager.getCache(CacheType.TASK_DEFINITION.getCacheName());
if (cache != null) {
cache.evict(taskDefinition.getCode() + "_" + taskDefinition.getVersion());
logger.info("cache evict, type:{}, key:{}",
CacheType.TASK_DEFINITION.getCacheName(), taskDefinition.getCode() + "_" + taskDefinition.getVersion());
}
}
private void workerGroupCacheExpire(WorkerGroup workerGroup) {
Cache cache = cacheManager.getCache(CacheType.WORKER_GROUP.getCacheName());
if (cache != null) {
cache.evict("all");
logger.info("cache evict, type:{}, key:{}",
CacheType.WORKER_GROUP.getCacheName(), "all");
}
}
private void scheduleCacheExpire(Schedule schedule) {
Cache cache = cacheManager.getCache(CacheType.SCHEDULE.getCacheName());
if (cache != null) {
cache.evict(schedule.getProcessDefinitionCode());
logger.info("cache evict, type:{}, key:{}",
CacheType.SCHEDULE.getCacheName(), schedule.getProcessDefinitionCode());
cache.evict(cacheExpireCommand.getCacheKey());
logger.info("cache evict, type:{}, key:{}", cacheType.getCacheName(), cacheExpireCommand.getCacheKey());
}
}
}

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
@ -146,9 +147,11 @@ public class EventExecuteService extends Thread {
}
private void notifyProcessChanged() {
Map<ProcessInstance, TaskInstance> fatherMaps
= processService.notifyProcessList(processInstanceId, 0);
if (Flag.NO == workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
return;
}
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(processInstanceId);
for (ProcessInstance processInstance : fatherMaps.keySet()) {
String address = NetUtils.getAddr(masterConfig.getListenPort());
if (processInstance.getHost().equalsIgnoreCase(address)) {

40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -51,9 +51,11 @@ import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.DagHelper;
@ -84,6 +86,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -682,12 +685,11 @@ public class WorkflowExecuteThread implements Runnable {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, getValidTaskList(), projectUser);
}
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(processInstance.getId());
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
processAlertManager.sendAlertProcessInstance(processInstance, taskInstances, projectUser);
if (checkTaskQueue()) {
//release task group
processService.releaseAllTaskGroup(processInstance.getId());
}
}
public void checkSerialProcess(ProcessDefinition processDefinition) {
int nextInstanceId = processInstance.getNextProcessInstanceId();
@ -725,8 +727,10 @@ public class WorkflowExecuteThread implements Runnable {
processInstance.setProcessDefinition(processDefinition);
List<TaskInstance> recoverNodeList = getStartTaskInstanceList(processInstance.getCommandParam());
List<TaskNode> taskNodeList =
processService.transformTask(processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode()), Lists.newArrayList());
List<ProcessTaskRelation> processTaskRelations = processService.findRelationByCode(processDefinition.getProjectCode(), processDefinition.getCode());
List<TaskDefinitionLog> taskDefinitionLogs = processService.getTaskDefineLogListByRelation(processTaskRelations);
List<TaskNode> taskNodeList = processService.transformTask(processTaskRelations, taskDefinitionLogs);
forbiddenTaskMap.clear();
taskNodeList.forEach(taskNode -> {
@ -759,7 +763,7 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskMap.clear();
errorTaskMap.clear();
if (ExecutionStatus.SUBMITTED_SUCCESS != processInstance.getState()) {
if (!isNewProcessInstance()) {
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance task : validTaskInstanceList) {
validTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
@ -1625,4 +1629,28 @@ public class WorkflowExecuteThread implements Runnable {
TaskDependType depNodeType) throws Exception {
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
}
/**
* check task queue
*/
private boolean checkTaskQueue() {
AtomicBoolean result = new AtomicBoolean(false);
taskInstanceMap.forEach((id, taskInstance) -> {
if (taskInstance != null && taskInstance.getTaskGroupId() > 0) {
result.set(true);
}
});
return result.get();
}
/**
* is new process instance
*/
private boolean isNewProcessInstance() {
if (ExecutionStatus.RUNNING_EXECUTION == processInstance.getState() && processInstance.getRunTimes() == 1) {
return true;
} else {
return false;
}
}
}

8
dolphinscheduler-server/src/main/resources/application-master.yaml

@ -18,7 +18,7 @@ spring:
application:
name: master-server
cache:
# default enable cache, you can disable by `type: none`
# default unable cache, you can disable by `type: caffeine`
type: none
cache-names:
- tenant
@ -26,6 +26,8 @@ spring:
- processDefinition
- processTaskRelation
- taskDefinition
- workerGroup
- schedule
caffeine:
spec: maximumSize=100,expireAfterWrite=300s,recordStats
@ -37,8 +39,6 @@ master:
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
# master execute task number in parallel per process instance
exec-task-num: 20
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
@ -54,8 +54,6 @@ master:
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
# master cache process definition, default: true
cache-process-definition: true
server:
port: 5679

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/CacheProcessorTest.java

@ -66,7 +66,7 @@ public class CacheProcessorTest {
public void testProcess() {
Tenant tenant = new Tenant();
tenant.setId(1);
CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, tenant);
CacheExpireCommand cacheExpireCommand = new CacheExpireCommand(CacheType.TENANT, "1");
Command command = cacheExpireCommand.convert2Command();
cacheProcessor.process(channel, command);

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2418,6 +2418,19 @@ public class ProcessService {
return taskDefinitionLogMapper.queryByTaskDefinitions(taskDefinitionSet);
}
public List<TaskDefinitionLog> getTaskDefineLogListByRelation(List<ProcessTaskRelation> processTaskRelations) {
List<TaskDefinitionLog> taskDefinitionLogs = com.google.common.collect.Lists.newArrayList();
for (ProcessTaskRelation processTaskRelation : processTaskRelations) {
if (processTaskRelation.getPreTaskCode() > 0) {
taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(processTaskRelation.getPreTaskCode(), processTaskRelation.getPreTaskVersion()));
}
if (processTaskRelation.getPostTaskCode() > 0) {
taskDefinitionLogs.add((TaskDefinitionLog) this.findTaskDefinition(processTaskRelation.getPostTaskCode(), processTaskRelation.getPostTaskVersion()));
}
}
return taskDefinitionLogs;
}
/**
* find task definition by code and version
*/
@ -2499,7 +2512,7 @@ public class ProcessService {
return taskNodeList;
}
public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId, int taskId) {
public Map<ProcessInstance, TaskInstance> notifyProcessList(int processId) {
HashMap<ProcessInstance, TaskInstance> processTaskMap = new HashMap<>();
//find sub tasks
ProcessInstanceMap processInstanceMap = processInstanceMapMapper.queryBySubProcessId(processId);

2
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/cache/CacheNotifyServiceTest.java vendored

@ -61,7 +61,7 @@ public class CacheNotifyServiceTest {
public void testNotifyMaster() {
User user1 = new User();
user1.setId(100);
Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, user1).convert2Command();
Command cacheExpireCommand = new CacheExpireCommand(CacheType.USER, "100").convert2Command();
NettyServerConfig serverConfig = new NettyServerConfig();

Loading…
Cancel
Save