Browse Source

[Improvement-7213][MasterServer] execute thread pool code optimization (#7258)

* threadpool optimization

* threadpool params

* rebase dev

* ut check fix

* add return

* rebase dev

* event loop

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
41bf1a955e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  3. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  4. 22
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
  5. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  6. 146
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  7. 52
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  8. 177
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  9. 108
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  10. 181
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  11. 8
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -345,8 +345,16 @@ public final class Constants {
*/
public static final String DEFAULT_CRON_STRING = "0 0 0 * * ? *";
/**
* sleep 1000ms
*/
public static final int SLEEP_TIME_MILLIS = 1000;
/**
* short sleep 100ms
*/
public static final int SLEEP_TIME_MILLIS_SHORT = 100;
/**
* one second mils
*/

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -34,6 +34,9 @@ import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import javax.annotation.PostConstruct;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
@ -45,8 +48,6 @@ import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.PostConstruct;
@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
@EnableTransactionManagement
@ -68,9 +69,6 @@ public class MasterServer implements IStoppable {
@Autowired
private MasterSchedulerService masterSchedulerService;
@Autowired
private EventExecuteService eventExecuteService;
@Autowired
private Scheduler scheduler;
@ -89,6 +87,9 @@ public class MasterServer implements IStoppable {
@Autowired
private CacheProcessor cacheProcessor;
@Autowired
private EventExecuteService eventExecuteService;
public static void main(String[] args) {
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
@ -117,11 +118,11 @@ public class MasterServer implements IStoppable {
this.masterRegistryClient.start();
this.masterRegistryClient.setRegistryStoppable(this);
this.eventExecuteService.init();
this.eventExecuteService.start();
this.masterSchedulerService.init();
this.masterSchedulerService.start();
this.eventExecuteService.start();
this.scheduler.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import java.util.ArrayList;
import java.util.List;
@ -63,6 +64,9 @@ public class StateEventResponseService {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@PostConstruct
public void start() {
this.responseWorker = new StateEventResponseWorker();
@ -141,7 +145,7 @@ public class StateEventResponseService {
break;
default:
}
workflowExecuteThread.addStateEvent(stateEvent);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
writeResponse(stateEvent, ExecutionStatus.SUCCESS);
} catch (Exception e) {
logger.error("persist event queue error, event: {}", stateEvent, e);
@ -149,10 +153,6 @@ public class StateEventResponseService {
}
public void addEvent2WorkflowExecute(StateEvent stateEvent) {
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
}
public BlockingQueue<StateEvent> getEventQueue() {
return eventQueue;
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
}

22
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
@ -74,6 +75,9 @@ public class TaskResponseService {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@PostConstruct
public void start() {
this.taskResponseWorker = new TaskResponseWorker();
@ -164,20 +168,16 @@ public class TaskResponseService {
throw new IllegalArgumentException("invalid event type : " + event);
}
if (workflowExecuteThread != null) {
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThread.addStateEvent(stateEvent);
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**
* handle ack event
* @param taskResponseEvent
* @param taskInstance
*/
private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
Channel channel = taskResponseEvent.getChannel();
@ -206,8 +206,6 @@ public class TaskResponseService {
/**
* handle result event
* @param taskResponseEvent
* @param taskInstance
*/
private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
Channel channel = taskResponseEvent.getChannel();

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -96,7 +95,7 @@ public class MasterRegistryClient {
private ScheduledExecutorService heartBeatExecutor;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
/**
* master startup time, ms
@ -298,6 +297,24 @@ public class MasterRegistryClient {
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
// only failover the task owned myself if worker down.
@ -375,16 +392,12 @@ public class MasterRegistryClient {
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId());
if (workflowExecuteThreadNotify == null) {
return;
}
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadNotify.addStateEvent(stateEvent);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**

146
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -18,27 +18,9 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@ -46,48 +28,19 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@Service
public class EventExecuteService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
/**
* dolphinscheduler database interface
*/
@Autowired
private ProcessService processService;
@Autowired
private MasterConfig masterConfig;
private ExecutorService eventExecService;
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/**
*
* workflow exec service
*/
private StateEventCallbackService stateEventCallbackService;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
private ConcurrentHashMap<String, WorkflowExecuteThread> eventHandlerMap = new ConcurrentHashMap();
ListeningExecutorService listeningExecutorService;
public void init() {
eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getExecThreads());
listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService);
this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
}
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Override
public synchronized void start() {
@ -95,20 +48,13 @@ public class EventExecuteService extends Thread {
super.start();
}
public void close() {
eventExecService.shutdown();
logger.info("event service stopped...");
}
@Override
public void run() {
logger.info("Event service started");
while (Stopper.isRunning()) {
try {
eventHandler();
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (Exception e) {
logger.error("Event service thread error", e);
}
@ -117,89 +63,7 @@ public class EventExecuteService extends Thread {
private void eventHandler() {
for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) {
if (workflowExecuteThread.eventSize() == 0
|| StringUtils.isEmpty(workflowExecuteThread.getKey())
|| !workflowExecuteThread.isStart()
|| eventHandlerMap.containsKey(workflowExecuteThread.getKey())) {
continue;
}
int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
logger.info("handle process instance : {} , events count:{}",
processInstanceId,
workflowExecuteThread.eventSize());
logger.info("already exists handler process size:{}", this.eventHandlerMap.size());
eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
if (workflowExecuteThread.workFlowFinish()) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged();
logger.info("process instance {} finished.", processInstanceId);
}
if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
processInstanceExecCacheManager.cache(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread);
}
eventHandlerMap.remove(workflowExecuteThread.getKey());
}
private void notifyProcessChanged() {
if (Flag.NO == workflowExecuteThread.getProcessInstance().getIsSubProcess()) {
return;
}
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(processInstanceId);
for (ProcessInstance processInstance : fatherMaps.keySet()) {
String address = NetUtils.getAddr(masterConfig.getListenPort());
if (processInstance.getHost().equalsIgnoreCase(address)) {
notifyMyself(processInstance, fatherMaps.get(processInstance));
} else {
notifyProcess(processInstance, fatherMaps.get(processInstance));
}
}
}
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
return;
}
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId());
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workflowExecuteThreadNotify.addStateEvent(stateEvent);
}
private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) {
String host = processInstance.getHost();
if (StringUtils.isEmpty(host)) {
logger.info("process {} host is empty, cannot notify task {} now.",
processInstance.getId(), taskInstance.getId());
return;
}
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
logger.info("notify process {} task {} state change, host:{}",
processInstance.getId(), taskInstance.getId(), host);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId()
);
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
}
@Override
public void onFailure(Throwable throwable) {
logger.info("handle events {} failed.", processInstanceId);
logger.info("handle events failed.", throwable);
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
workflowExecuteThreadPool.executeEvent(workflowExecuteThread);
}
}
}

52
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -40,10 +39,8 @@ import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,6 +63,10 @@ public class MasterSchedulerService extends Thread {
*/
@Autowired
private ProcessService processService;
/**
* task processor factory
*/
@Autowired
private TaskProcessorFactory taskProcessorFactory;
@ -95,28 +96,15 @@ public class MasterSchedulerService extends Thread {
private ThreadPoolExecutor masterPrepareExecService;
/**
* master exec service
* workflow exec service
*/
private ThreadPoolExecutor masterExecService;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/**
* process timeout check list
*/
ConcurrentHashMap<Integer, ProcessInstance> processTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task time out check list
*/
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task retry check list
*/
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
/**
@ -124,15 +112,8 @@ public class MasterSchedulerService extends Thread {
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads());
this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
this.processInstanceExecCacheManager,
masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
}
@Override
@ -143,16 +124,6 @@ public class MasterSchedulerService extends Thread {
}
public void close() {
masterExecService.shutdown();
boolean terminated = false;
try {
terminated = masterExecService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
if (!terminated) {
logger.warn("masterExecService shutdown without terminated, increase await time");
}
nettyRemotingClient.close();
logger.info("master schedule service stopped...");
}
@ -205,15 +176,14 @@ public class MasterSchedulerService extends Thread {
, nettyExecutorManager
, processAlertManager
, masterConfig
, taskTimeoutCheckList
, taskRetryCheckList
, stateWheelExecuteThread
, taskProcessorFactory);
this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
this.processTimeoutCheckList.put(processInstance.getId(), processInstance);
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
masterExecService.execute(workflowExecuteThread);
workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
}
}

177
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -25,47 +25,57 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.hadoop.util.ThreadUtil;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 1. timeout check wheel
* 2. dependent task check wheel
*/
@Component
public class StateWheelExecuteThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
private ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList;
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/**
* process timeout check list
*/
private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ProcessInstanceExecCacheManager processInstanceExecCacheManager,
int stateCheckIntervalSecs) {
this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
this.processInstanceExecCacheManager = processInstanceExecCacheManager;
this.stateCheckIntervalSecs = stateCheckIntervalSecs;
}
/**
* task time out check list, key is taskInstanceId, value is processInstanceId
*/
private ConcurrentHashMap<Integer, Integer> taskInstanceTimeoutCheckList = new ConcurrentHashMap<>();
/**
* task retry check list, key is taskInstanceId, value is processInstanceId
*/
private ConcurrentHashMap<Integer, Integer> taskInstanceRetryCheckList = new ConcurrentHashMap<>();
@Autowired
private MasterConfig masterConfig;
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Override
public void run() {
logger.info("state wheel thread start");
while (Stopper.isRunning()) {
try {
checkTask4Timeout();
@ -74,30 +84,83 @@ public class StateWheelExecuteThread extends Thread {
} catch (Exception e) {
logger.error("state wheel thread check error:", e);
}
ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs);
ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);
}
}
public void addProcess4TimeoutCheck(ProcessInstance processInstance) {
this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance);
processInstanceTimeoutCheckList.add(processInstance.getId());
}
public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
public void addTask4TimeoutCheck(TaskInstance taskInstance) {
this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance);
if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
}
}
public void removeTask4TimeoutCheck(TaskInstance taskInstance) {
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
}
public void addTask4RetryCheck(TaskInstance taskInstance) {
this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance);
if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (taskInstance.taskCanRetry()) {
taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId());
}
}
public void checkTask4Timeout() {
public void removeTask4RetryCheck(TaskInstance taskInstance) {
taskInstanceRetryCheckList.remove(taskInstance.getId());
}
private void checkTask4Timeout() {
if (taskInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) {
for (Entry<Integer, Integer> entry : taskInstanceTimeoutCheckList.entrySet()) {
int processInstanceId = entry.getValue();
int taskInstanceId = entry.getKey();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
processInstanceId, taskInstanceId);
taskInstanceTimeoutCheckList.remove(taskInstanceId);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
if (taskInstance == null) {
continue;
}
if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain) {
long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addTaskTimeoutEvent(taskInstance);
taskInstanceTimeoutCheckList.remove(taskInstance.getId());
}
@ -109,8 +172,21 @@ public class StateWheelExecuteThread extends Thread {
if (taskInstanceRetryCheckList.isEmpty()) {
return;
}
for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) {
for (Entry<Integer, Integer> entry : taskInstanceRetryCheckList.entrySet()) {
int processInstanceId = entry.getValue();
int taskInstanceId = entry.getKey();
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}",
processInstanceId, taskInstanceId);
taskInstanceRetryCheckList.remove(taskInstanceId);
continue;
}
TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId);
if (taskInstance == null) {
continue;
}
if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) {
addTaskStateChangeEvent(taskInstance);
taskInstanceRetryCheckList.remove(taskInstance.getId());
@ -125,49 +201,50 @@ public class StateWheelExecuteThread extends Thread {
if (processInstanceTimeoutCheckList.isEmpty()) {
return;
}
for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) {
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (0 >= timeRemain) {
for (Integer processInstanceId : processInstanceTimeoutCheckList) {
if (processInstanceId == null) {
continue;
}
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
if (workflowExecuteThread == null) {
logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId);
processInstanceTimeoutCheckList.remove(processInstanceId);
continue;
}
ProcessInstance processInstance = workflowExecuteThread.getProcessInstance();
if (processInstance == null) {
continue;
}
long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT);
if (timeRemain < 0) {
addProcessTimeoutEvent(processInstance);
processInstanceTimeoutCheckList.remove(processInstance.getId());
}
}
}
private boolean addTaskStateChangeEvent(TaskInstance taskInstance) {
private void addTaskStateChangeEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
addEvent(stateEvent);
return true;
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private boolean addTaskTimeoutEvent(TaskInstance taskInstance) {
private void addTaskTimeoutEvent(TaskInstance taskInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_TIMEOUT);
stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskInstance.getId());
addEvent(stateEvent);
return true;
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private boolean addProcessTimeoutEvent(ProcessInstance processInstance) {
private void addProcessTimeoutEvent(ProcessInstance processInstance) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.PROCESS_TIMEOUT);
stateEvent.setProcessInstanceId(processInstance.getId());
addEvent(stateEvent);
return true;
}
private void addEvent(StateEvent stateEvent) {
if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) {
return;
}
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
workflowExecuteThread.addStateEvent(stateEvent);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
}

108
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -96,7 +96,7 @@ import com.google.common.collect.Lists;
/**
* master exec thread,split dag
*/
public class WorkflowExecuteThread implements Runnable {
public class WorkflowExecuteThread {
/**
* logger of WorkflowExecuteThread
@ -203,16 +203,6 @@ public class WorkflowExecuteThread implements Runnable {
*/
private List<Date> complementListDate = Lists.newLinkedList();
/**
* task timeout check list
*/
private ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList;
/**
* task retry check list
*/
private ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList;
/**
* state event queue
*/
@ -223,6 +213,11 @@ public class WorkflowExecuteThread implements Runnable {
*/
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* state wheel execute thread
*/
private StateWheelExecuteThread stateWheelExecuteThread;
/**
* constructor of WorkflowExecuteThread
*
@ -231,7 +226,7 @@ public class WorkflowExecuteThread implements Runnable {
* @param nettyExecutorManager nettyExecutorManager
* @param processAlertManager processAlertManager
* @param masterConfig masterConfig
* @param taskTimeoutCheckList taskTimeoutCheckList
* @param stateWheelExecuteThread stateWheelExecuteThread
* @param taskProcessorFactory taskProcessorFactory
*/
public WorkflowExecuteThread(ProcessInstance processInstance
@ -239,32 +234,17 @@ public class WorkflowExecuteThread implements Runnable {
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
, MasterConfig masterConfig
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList
, StateWheelExecuteThread stateWheelExecuteThread
, TaskProcessorFactory taskProcessorFactory) {
this.processService = processService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
this.processAlertManager = processAlertManager;
this.taskTimeoutCheckList = taskTimeoutCheckList;
this.taskRetryCheckList = taskRetryCheckList;
this.stateWheelExecuteThread = stateWheelExecuteThread;
this.taskProcessorFactory = taskProcessorFactory;
}
@Override
public void run() {
try {
if (!this.isStart()) {
startProcess();
} else {
handleEvents();
}
} catch (Exception e) {
logger.error("handler error:", e);
}
}
/**
* the process start nodes are submitted completely.
*/
@ -272,9 +252,14 @@ public class WorkflowExecuteThread implements Runnable {
return this.isStart;
}
private void handleEvents() {
/**
* handle event
*/
public void handleEvents() {
if (!isStart) {
return;
}
while (!this.stateEvents.isEmpty()) {
try {
StateEvent stateEvent = this.stateEvents.peek();
if (stateEventHandler(stateEvent)) {
@ -282,7 +267,6 @@ public class WorkflowExecuteThread implements Runnable {
}
} catch (Exception e) {
logger.error("state handle error:", e);
}
}
}
@ -457,8 +441,8 @@ public class WorkflowExecuteThread implements Runnable {
task.getRetryTimes(),
task.getMaxRetryTimes(),
task.getRetryInterval());
this.addTimeoutCheck(task);
this.addRetryCheck(task);
stateWheelExecuteThread.addTask4TimeoutCheck(task);
stateWheelExecuteThread.addTask4RetryCheck(task);
} else {
submitStandByTask();
}
@ -467,8 +451,8 @@ public class WorkflowExecuteThread implements Runnable {
completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId());
activeTaskProcessorMaps.remove(task.getId());
taskTimeoutCheckList.remove(task.getId());
taskRetryCheckList.remove(task.getId());
stateWheelExecuteThread.removeTask4TimeoutCheck(task);
stateWheelExecuteThread.removeTask4RetryCheck(task);
if (task.getState().typeIsSuccess()) {
processInstance.setVarPool(task.getVarPool());
@ -660,13 +644,21 @@ public class WorkflowExecuteThread implements Runnable {
return false;
}
private void startProcess() throws Exception {
if (this.taskInstanceMap.size() == 0) {
/**
* process start handle
*/
public void startProcess() {
if (this.taskInstanceMap.size() > 0) {
return;
}
try {
isStart = false;
buildFlowDag();
initTaskQueue();
submitPostNode(null);
isStart = true;
} catch (Exception e) {
logger.error("start process error, process instance id:{}", processInstance.getId(), e);
}
}
@ -837,8 +829,8 @@ public class WorkflowExecuteThread implements Runnable {
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.run();
addTimeoutCheck(taskInstance);
addRetryCheck(taskInstance);
stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance);
stateWheelExecuteThread.addTask4RetryCheck(taskInstance);
if (taskProcessor.taskState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
@ -871,42 +863,6 @@ public class WorkflowExecuteThread implements Runnable {
}
}
private void addTimeoutCheck(TaskInstance taskInstance) {
if (taskTimeoutCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance);
}
}
private void addRetryCheck(TaskInstance taskInstance) {
if (taskRetryCheckList.containsKey(taskInstance.getId())) {
return;
}
TaskDefinition taskDefinition = taskInstance.getTaskDefine();
if (taskDefinition == null) {
logger.error("taskDefinition is null, taskId:{}", taskInstance.getId());
return;
}
if (taskInstance.taskCanRetry()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
if (taskInstance.isDependTask() || taskInstance.isSubProcess()) {
this.taskRetryCheckList.put(taskInstance.getId(), taskInstance);
}
}
/**
* find task instance in db.
* in case submit more than one same name task in the same time.

181
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.lang.StringUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThreadPool.class);
@Autowired
private MasterConfig masterConfig;
@Autowired
private ProcessService processService;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
@Autowired
private StateEventCallbackService stateEventCallbackService;
/**
* multi-thread filter, avoid handling workflow at the same time
*/
private ConcurrentHashMap<String, WorkflowExecuteThread> multiThreadFilterMap = new ConcurrentHashMap();
@PostConstruct
private void init() {
this.setDaemon(true);
this.setThreadNamePrefix("Workflow-Execute-Thread-");
this.setMaxPoolSize(masterConfig.getExecThreads());
this.setCorePoolSize(masterConfig.getExecThreads());
}
/**
* submit state event
*/
public void submitStateEvent(StateEvent stateEvent) {
WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.error("workflowExecuteThread is null, processInstanceId:{}", stateEvent.getProcessInstanceId());
return;
}
workflowExecuteThread.addStateEvent(stateEvent);
}
/**
* start workflow
*/
public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) {
submit(workflowExecuteThread::startProcess);
}
/**
* execute workflow
*/
public void executeEvent(WorkflowExecuteThread workflowExecuteThread) {
if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
return;
}
int processInstanceId = workflowExecuteThread.getProcessInstance().getId();
ListenableFuture future = this.submitListenable(() -> {
workflowExecuteThread.handleEvents();
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);
});
future.addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
logger.error("handle events {} failed", processInstanceId, ex);
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
}
@Override
public void onSuccess(Object result) {
if (workflowExecuteThread.workFlowFinish()) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("process instance {} finished.", processInstanceId);
}
multiThreadFilterMap.remove(workflowExecuteThread.getKey());
}
});
}
/**
* notify process change
*/
private void notifyProcessChanged(ProcessInstance finishProcessInstance) {
if (Flag.NO == finishProcessInstance.getIsSubProcess()) {
return;
}
Map<ProcessInstance, TaskInstance> fatherMaps = processService.notifyProcessList(finishProcessInstance.getId());
for (ProcessInstance processInstance : fatherMaps.keySet()) {
String address = NetUtils.getAddr(masterConfig.getListenPort());
if (processInstance.getHost().equalsIgnoreCase(address)) {
this.notifyMyself(processInstance, fatherMaps.get(processInstance));
} else {
this.notifyProcess(finishProcessInstance, processInstance, fatherMaps.get(processInstance));
}
}
}
/**
* notify myself
*/
private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) {
logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId());
if (!processInstanceExecCacheManager.contains(processInstance.getId())) {
return;
}
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
this.submitStateEvent(stateEvent);
}
/**
* notify process's master
*/
private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) {
String host = processInstance.getHost();
if (StringUtils.isEmpty(host)) {
logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId());
return;
}
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId()
);
stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command());
}
}

8
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -84,6 +85,8 @@ public class WorkflowExecuteThreadTest {
private TaskProcessorFactory taskProcessorFactory;
private StateWheelExecuteThread stateWheelExecuteThread;
@Before
public void init() throws Exception {
processService = mock(ProcessService.class);
@ -107,9 +110,8 @@ public class WorkflowExecuteThreadTest {
processDefinition.setGlobalParamList(Collections.emptyList());
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList, taskProcessorFactory));
stateWheelExecuteThread = mock(StateWheelExecuteThread.class);
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);

Loading…
Cancel
Save