Browse Source

[Improvement-6540][server] Optimize the code related to the processInstanceExecMaps object (#6556)

* [DS-6540][refactor-server] Optimize the code related to the processInstanceExecMaps object
3.0.0/version-upgrade
ououtt 3 years ago committed by GitHub
parent
commit
a3982f5025
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 66
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java
  3. 65
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java
  4. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java
  5. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  6. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  7. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  8. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  9. 12
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  10. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  11. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  12. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  13. 78
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java
  14. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
  15. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
  16. 1
      pom.xml

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -30,13 +30,10 @@ import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
@ -100,8 +97,6 @@ public class MasterServer implements IStoppable {
@Autowired @Autowired
private EventExecuteService eventExecuteService; private EventExecuteService eventExecuteService;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();
/** /**
* master server startup, not use web service * master server startup, not use web service
* *
@ -121,27 +116,21 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort()); serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
TaskAckProcessor ackProcessor = new TaskAckProcessor(); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
ackProcessor.init(processInstanceExecMaps); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
taskResponseProcessor.init(processInstanceExecMaps);
StateEventProcessor stateEventProcessor = new StateEventProcessor();
stateEventProcessor.init(processInstanceExecMaps);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, new StateEventProcessor());
this.nettyRemotingServer.start(); this.nettyRemotingServer.start();
// self tolerant // self tolerant
this.masterRegistryClient.init(this.processInstanceExecMaps); this.masterRegistryClient.init();
this.masterRegistryClient.start(); this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this); this.masterRegistryClient.setRegistryStoppable(this);
this.eventExecuteService.init(this.processInstanceExecMaps); this.eventExecuteService.init();
this.eventExecuteService.start(); this.eventExecuteService.start();
// scheduler start // scheduler start
this.masterSchedulerService.init(this.processInstanceExecMaps); this.masterSchedulerService.init();
this.masterSchedulerService.start(); this.masterSchedulerService.start();

66
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ProcessInstanceExecCacheManager.java vendored

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import java.util.Collection;
/**
* cache of process instance id and WorkflowExecuteThread
*/
public interface ProcessInstanceExecCacheManager {
/**
* get WorkflowExecuteThread by process instance id
*
* @param processInstanceId processInstanceId
* @return WorkflowExecuteThread
*/
WorkflowExecuteThread getByProcessInstanceId(int processInstanceId);
/**
* judge the process instance does it exist
*
* @param processInstanceId processInstanceId
* @return true - if process instance id exists in cache
*/
boolean contains(int processInstanceId);
/**
* remove cache by process instance id
*
* @param processInstanceId processInstanceId
*/
void removeByProcessInstanceId(int processInstanceId);
/**
* cache
*
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
*
* @return all WorkflowExecuteThread in cache
*/
Collection<WorkflowExecuteThread> getAll();
}

65
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImpl.java vendored

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.stereotype.Component;
import com.google.common.collect.ImmutableList;
/**
* cache of process instance id and WorkflowExecuteThread
*/
@Component
public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecCacheManager {
private final ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps = new ConcurrentHashMap<>();
@Override
public WorkflowExecuteThread getByProcessInstanceId(int processInstanceId) {
return processInstanceExecMaps.get(processInstanceId);
}
@Override
public boolean contains(int processInstanceId) {
return processInstanceExecMaps.containsKey(processInstanceId);
}
@Override
public void removeByProcessInstanceId(int processInstanceId) {
processInstanceExecMaps.remove(processInstanceId);
}
@Override
public void cache(int processInstanceId, WorkflowExecuteThread workflowExecuteThread) {
if (workflowExecuteThread == null) {
return;
}
processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
}
@Override
public Collection<WorkflowExecuteThread> getAll() {
return ImmutableList.copyOf(processInstanceExecMaps.values());
}
}

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/StateEventProcessor.java

@ -26,11 +26,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.StateEventResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -51,10 +48,6 @@ public class StateEventProcessor implements NettyRequestProcessor {
stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class); stateEventResponseService = SpringApplicationContext.getBean(StateEventResponseService.class);
} }
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.stateEventResponseService.init(processInstanceExecMaps);
}
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType())); Preconditions.checkArgument(CommandType.STATE_EVENT_REQUEST == command.getType(), String.format("invalid command type: %s", command.getType()));

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -28,11 +28,8 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -62,10 +59,6 @@ public class TaskAckProcessor implements NettyRequestProcessor {
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}
/** /**
* task ack process * task ack process
* *

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -27,11 +27,8 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -61,10 +58,6 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}
/** /**
* task final result response * task final result response
* need master process , state persistence * need master process , state persistence

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -21,12 +21,12 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -34,6 +34,7 @@ import javax.annotation.PreDestroy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -59,13 +60,8 @@ public class StateEventResponseService {
*/ */
private Thread responseWorker; private Thread responseWorker;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper; @Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
}
}
@PostConstruct @PostConstruct
public void start() { public void start() {
@ -131,12 +127,12 @@ public class StateEventResponseService {
private void persist(StateEvent stateEvent) { private void persist(StateEvent stateEvent) {
try { try {
if (!this.processInstanceMapper.containsKey(stateEvent.getProcessInstanceId())) { if (!this.processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
writeResponse(stateEvent, ExecutionStatus.FAILURE); writeResponse(stateEvent, ExecutionStatus.FAILURE);
return; return;
} }
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(stateEvent.getProcessInstanceId()); WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent); workflowExecuteThread.addStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS); writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) { } catch (Exception e) {

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -25,13 +25,13 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -71,13 +71,8 @@ public class TaskResponseService {
*/ */
private Thread taskResponseWorker; private Thread taskResponseWorker;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper; @Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceMapper) {
if (this.processInstanceMapper == null) {
this.processInstanceMapper = processInstanceMapper;
}
}
@PostConstruct @PostConstruct
public void start() { public void start() {
@ -194,7 +189,7 @@ public class TaskResponseService {
default: default:
throw new IllegalArgumentException("invalid event type : " + event); throw new IllegalArgumentException("invalid event type : " + event);
} }
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) { if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent(); StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());

12
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@ -48,7 +49,6 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -92,7 +92,8 @@ public class MasterRegistryClient {
*/ */
private ScheduledExecutorService heartBeatExecutor; private ScheduledExecutorService heartBeatExecutor;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; @Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/** /**
* master startup time, ms * master startup time, ms
@ -101,11 +102,10 @@ public class MasterRegistryClient {
private String localNodePath; private String localNodePath;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { public void init() {
this.startupTime = System.currentTimeMillis(); this.startupTime = System.currentTimeMillis();
this.registryClient = RegistryClient.getInstance(); this.registryClient = RegistryClient.getInstance();
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
this.processInstanceExecMaps = processInstanceExecMaps;
} }
public void start() { public void start() {
@ -308,10 +308,10 @@ public class MasterRegistryClient {
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance); processService.saveTaskInstance(taskInstance);
if (!processInstanceExecMaps.containsKey(processInstance.getId())) { if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
return; return;
} }
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId());
StateEvent stateEvent = new StateEvent(); StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setType(StateEventType.TASK_STATE_CHANGE);

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -72,17 +73,16 @@ public class EventExecuteService extends Thread {
*/ */
private StateEventCallbackService stateEventCallbackService; private StateEventCallbackService stateEventCallbackService;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap(); private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap();
ListeningExecutorService listeningExecutorService; ListeningExecutorService listeningExecutorService;
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { public void init() {
eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads()); eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getMasterExecThreads());
this.processInstanceExecMaps = processInstanceExecMaps;
listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService);
this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
@ -115,7 +115,7 @@ public class EventExecuteService extends Thread {
} }
private void eventHandler() { private void eventHandler() {
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecMaps.values()) { for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
if (workflowExecuteThread.eventSize() == 0 if (workflowExecuteThread.eventSize() == 0
|| StringUtils.isEmpty(workflowExecuteThread.getKey()) || StringUtils.isEmpty(workflowExecuteThread.getKey())
|| eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
@ -132,13 +132,13 @@ public class EventExecuteService extends Thread {
@Override @Override
public void onSuccess(Object o) { public void onSuccess(Object o) {
if (workflowExecuteThread.workFlowFinish()) { if (workflowExecuteThread.workFlowFinish()) {
processInstanceExecMaps.remove(processInstanceId); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(); notifyProcessChanged();
logger.info("process instance {} finished.", processInstanceId); logger.info("process instance {} finished.", processInstanceId);
} }
if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) { if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) {
processInstanceExecMaps.remove(processInstanceId); processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
processInstanceExecMaps.put(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); processInstanceExecCacheManager.cache(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread);
} }
eventHandlerMap.remove(workflowExecuteThread.getKey()); eventHandlerMap.remove(workflowExecuteThread.getKey());
@ -160,10 +160,10 @@ public class EventExecuteService extends Thread {
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
if (!processInstanceExecMaps.containsKey(processInstance.getId())) { if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
return; return;
} }
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId()); WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId());
StateEvent stateEvent = new StateEvent(); StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setType(StateEventType.TASK_STATE_CHANGE);

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
@ -94,10 +95,9 @@ public class MasterSchedulerService extends Thread {
*/ */
private ThreadPoolExecutor masterExecService; private ThreadPoolExecutor masterExecService;
/** @Autowired
* process instance execution list private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
*/
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
/** /**
* process timeout check list * process timeout check list
*/ */
@ -119,15 +119,14 @@ public class MasterSchedulerService extends Thread {
/** /**
* constructor of MasterSchedulerService * constructor of MasterSchedulerService
*/ */
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) { public void init() {
this.processInstanceExecMaps = processInstanceExecMaps;
this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig(); NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
taskTimeoutCheckList, taskTimeoutCheckList,
this.processInstanceExecMaps, this.processInstanceExecCacheManager,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
} }
@ -203,7 +202,7 @@ public class MasterSchedulerService extends Thread {
, masterConfig , masterConfig
, taskTimeoutCheckList); , taskTimeoutCheckList);
this.processInstanceExecMaps.put(processInstance.getId(), workflowExecuteThread); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) { if (processInstance.getTimeout() > 0) {
this.processTimeoutCheckList.put(processInstance.getId(), processInstance); this.processTimeoutCheckList.put(processInstance.getId(), processInstance);
} }

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.hadoop.util.ThreadUtil; import org.apache.hadoop.util.ThreadUtil;
@ -44,17 +45,17 @@ public class StateWheelExecuteThread extends Thread {
ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList; ConcurrentHashMap<Integer, ProcessInstance> processInstanceCheckList;
ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList; ConcurrentHashMap<Integer, TaskInstance> taskInstanceCheckList;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps; private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private int stateCheckIntervalSecs; private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances, public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstances,
ConcurrentHashMap<Integer, TaskInstance> taskInstances, ConcurrentHashMap<Integer, TaskInstance> taskInstances,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps, ProcessInstanceExecCacheManager processInstanceExecCacheManager,
int stateCheckIntervalSecs) { int stateCheckIntervalSecs) {
this.processInstanceCheckList = processInstances; this.processInstanceCheckList = processInstances;
this.taskInstanceCheckList = taskInstances; this.taskInstanceCheckList = taskInstances;
this.processInstanceExecMaps = processInstanceExecMaps; this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.stateCheckIntervalSecs = stateCheckIntervalSecs; this.stateCheckIntervalSecs = stateCheckIntervalSecs;
} }
@ -121,10 +122,10 @@ public class StateWheelExecuteThread extends Thread {
private void putEvent(StateEvent stateEvent) { private void putEvent(StateEvent stateEvent) {
if (!processInstanceExecMaps.containsKey(stateEvent.getProcessInstanceId())) { if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
return; return;
} }
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecMaps.get(stateEvent.getProcessInstanceId()); WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent); workflowExecuteThread.addStateEvent(stateEvent);
} }

78
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java vendored

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import java.util.Collection;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ProcessInstanceExecCacheManagerImplTest {
@InjectMocks
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@Mock
private WorkflowExecuteThread workflowExecuteThread;
@Before
public void before() {
Mockito.when(workflowExecuteThread.getKey()).thenReturn("workflowExecuteThread1");
processInstanceExecCacheManager.cache(1, workflowExecuteThread);
}
@Test
public void testGetByProcessInstanceId() {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
Assert.assertEquals("workflowExecuteThread1", workflowExecuteThread.getKey());
}
@Test
public void testContains() {
Assert.assertTrue(processInstanceExecCacheManager.contains(1));
}
@Test
public void testCacheNull() {
processInstanceExecCacheManager.cache(2, null);
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
Assert.assertNull(workflowExecuteThread);
}
@Test
public void testRemoveByProcessInstanceId() {
processInstanceExecCacheManager.removeByProcessInstanceId(1);
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(1);
Assert.assertNull(workflowExecuteThread);
}
@Test
public void testGetAll() {
Collection<WorkflowExecuteThread> workflowExecuteThreads = processInstanceExecCacheManager.getAll();
Assert.assertEquals(1, workflowExecuteThreads.size());
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date; import java.util.Date;
@ -52,6 +53,9 @@ public class TaskResponseServiceTest {
private TaskInstance taskInstance; private TaskInstance taskInstance;
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@Before @Before
public void before() { public void before() {
taskRspService.start(); taskRspService.start();

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
@ -66,6 +67,9 @@ public class MasterRegistryClientTest {
@Mock @Mock
private ProcessService processService; private ProcessService processService;
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@Before @Before
public void before() throws Exception { public void before() throws Exception {
PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class)); PowerMockito.suppress(PowerMockito.constructor(RegistryClient.class));

1
pom.xml

@ -1046,6 +1046,7 @@
<include>**/server/worker/task/sql/SqlTaskTest.java</include> <include>**/server/worker/task/sql/SqlTaskTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include> <include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/server/worker/runner/WorkerManagerThreadTest.java</include> <include>**/server/worker/runner/WorkerManagerThreadTest.java</include>
<include>**/server/master/cache/impl/ProcessInstanceExecCacheManagerImplTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include> <include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include> <include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/registry/RegistryClientTest.java</include> <include>**/service/registry/RegistryClientTest.java</include>

Loading…
Cancel
Save