From 41bf1a955e2d76414ee5d8e3371fc163a813b435 Mon Sep 17 00:00:00 2001 From: wind Date: Thu, 16 Dec 2021 23:45:32 +0800 Subject: [PATCH] [Improvement-7213][MasterServer] execute thread pool code optimization (#7258) * threadpool optimization * threadpool params * rebase dev * ut check fix * add return * rebase dev * event loop Co-authored-by: caishunfeng <534328519@qq.com> --- .../dolphinscheduler/common/Constants.java | 8 + .../server/master/MasterServer.java | 15 +- .../queue/StateEventResponseService.java | 12 +- .../processor/queue/TaskResponseService.java | 22 +-- .../master/registry/MasterRegistryClient.java | 29 ++- .../master/runner/EventExecuteService.java | 146 +------------- .../master/runner/MasterSchedulerService.java | 52 ++--- .../runner/StateWheelExecuteThread.java | 177 ++++++++++++----- .../master/runner/WorkflowExecuteThread.java | 108 ++++------- .../runner/WorkflowExecuteThreadPool.java | 181 ++++++++++++++++++ .../master/WorkflowExecuteThreadTest.java | 8 +- 11 files changed, 414 insertions(+), 344 deletions(-) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 7d6428c4cc..6632e485ad 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -345,8 +345,16 @@ public final class Constants { */ public static final String DEFAULT_CRON_STRING = "0 0 0 * * ? *"; + /** + * sleep 1000ms + */ public static final int SLEEP_TIME_MILLIS = 1000; + /** + * short sleep 100ms + */ + public static final int SLEEP_TIME_MILLIS_SHORT = 100; + /** * one second mils */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 41bc63a6dc..11b0b799a6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -34,6 +34,9 @@ import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import javax.annotation.PostConstruct; + import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.slf4j.Logger; @@ -45,8 +48,6 @@ import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.ComponentScan; import org.springframework.transaction.annotation.EnableTransactionManagement; -import javax.annotation.PostConstruct; - @SpringBootApplication @ComponentScan("org.apache.dolphinscheduler") @EnableTransactionManagement @@ -68,9 +69,6 @@ public class MasterServer implements IStoppable { @Autowired private MasterSchedulerService masterSchedulerService; - @Autowired - private EventExecuteService eventExecuteService; - @Autowired private Scheduler scheduler; @@ -89,6 +87,9 @@ public class MasterServer implements IStoppable { @Autowired private CacheProcessor cacheProcessor; + @Autowired + private EventExecuteService eventExecuteService; + public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); @@ -117,11 +118,11 @@ public class MasterServer implements IStoppable { this.masterRegistryClient.start(); this.masterRegistryClient.setRegistryStoppable(this); - this.eventExecuteService.init(); - this.eventExecuteService.start(); this.masterSchedulerService.init(); this.masterSchedulerService.start(); + this.eventExecuteService.start(); + this.scheduler.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 1db91c680e..11fc060a4d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import java.util.ArrayList; import java.util.List; @@ -63,6 +64,9 @@ public class StateEventResponseService { @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + @PostConstruct public void start() { this.responseWorker = new StateEventResponseWorker(); @@ -141,7 +145,7 @@ public class StateEventResponseService { break; default: } - workflowExecuteThread.addStateEvent(stateEvent); + workflowExecuteThreadPool.submitStateEvent(stateEvent); writeResponse(stateEvent, ExecutionStatus.SUCCESS); } catch (Exception e) { logger.error("persist event queue error, event: {}", stateEvent, e); @@ -149,10 +153,6 @@ public class StateEventResponseService { } public void addEvent2WorkflowExecute(StateEvent stateEvent) { - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); - workflowExecuteThread.addStateEvent(stateEvent); - } - public BlockingQueue getEventQueue() { - return eventQueue; + workflowExecuteThreadPool.submitStateEvent(stateEvent); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 7a0af9d667..8a5755cc08 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; @@ -74,6 +75,9 @@ public class TaskResponseService { @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + @PostConstruct public void start() { this.taskResponseWorker = new TaskResponseWorker(); @@ -164,20 +168,16 @@ public class TaskResponseService { throw new IllegalArgumentException("invalid event type : " + event); } - if (workflowExecuteThread != null) { - StateEvent stateEvent = new StateEvent(); - stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); - stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId()); - stateEvent.setExecutionStatus(taskResponseEvent.getState()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - workflowExecuteThread.addStateEvent(stateEvent); - } + StateEvent stateEvent = new StateEvent(); + stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId()); + stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId()); + stateEvent.setExecutionStatus(taskResponseEvent.getState()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + workflowExecuteThreadPool.submitStateEvent(stateEvent); } /** * handle ack event - * @param taskResponseEvent - * @param taskInstance */ private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { Channel channel = taskResponseEvent.getChannel(); @@ -206,8 +206,6 @@ public class TaskResponseService { /** * handle result event - * @param taskResponseEvent - * @param taskInstance */ private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) { Channel channel = taskResponseEvent.getChannel(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 7619535e15..67c8e94d17 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -35,9 +35,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.registry.api.ConnectionState; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; -import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -96,7 +95,7 @@ public class MasterRegistryClient { private ScheduledExecutorService heartBeatExecutor; @Autowired - private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + private WorkflowExecuteThreadPool workflowExecuteThreadPool; /** * master startup time, ms @@ -298,6 +297,24 @@ public class MasterRegistryClient { continue; } processInstanceCacheMap.put(processInstance.getId(), processInstance); + taskInstance.setProcessInstance(processInstance); + + TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() + .buildTaskInstanceRelatedInfo(taskInstance) + .buildProcessInstanceRelatedInfo(processInstance) + .create(); + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskExecutionContext); + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processService.saveTaskInstance(taskInstance); + + StateEvent stateEvent = new StateEvent(); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + stateEvent.setProcessInstanceId(processInstance.getId()); + stateEvent.setExecutionStatus(taskInstance.getState()); + workflowExecuteThreadPool.submitStateEvent(stateEvent); } // only failover the task owned myself if worker down. @@ -375,16 +392,12 @@ public class MasterRegistryClient { taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); processService.saveTaskInstance(taskInstance); - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); - if (workflowExecuteThreadNotify == null) { - return; - } StateEvent stateEvent = new StateEvent(); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setProcessInstanceId(processInstance.getId()); stateEvent.setExecutionStatus(taskInstance.getState()); - workflowExecuteThreadNotify.addStateEvent(stateEvent); + workflowExecuteThreadPool.submitStateEvent(stateEvent); } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index 3da043c50b..6f6f718c2f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -18,27 +18,9 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; -import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.common.enums.StateEvent; -import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; -import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.commons.lang.StringUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -46,48 +28,19 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - @Service public class EventExecuteService extends Thread { private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class); - - /** - * dolphinscheduler database interface - */ - @Autowired - private ProcessService processService; - @Autowired - private MasterConfig masterConfig; - - private ExecutorService eventExecService; + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; /** - * + * workflow exec service */ - private StateEventCallbackService stateEventCallbackService; - @Autowired - private ProcessInstanceExecCacheManager processInstanceExecCacheManager; - - private ConcurrentHashMap eventHandlerMap = new ConcurrentHashMap(); - ListeningExecutorService listeningExecutorService; - - public void init() { - - eventExecService = ThreadUtils.newDaemonFixedThreadExecutor("MasterEventExecution", masterConfig.getExecThreads()); - - listeningExecutorService = MoreExecutors.listeningDecorator(eventExecService); - this.stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class); - - } + private WorkflowExecuteThreadPool workflowExecuteThreadPool; @Override public synchronized void start() { @@ -95,20 +48,13 @@ public class EventExecuteService extends Thread { super.start(); } - public void close() { - eventExecService.shutdown(); - logger.info("event service stopped..."); - } - @Override public void run() { logger.info("Event service started"); while (Stopper.isRunning()) { try { eventHandler(); - - TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS); - + TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); } catch (Exception e) { logger.error("Event service thread error", e); } @@ -117,89 +63,7 @@ public class EventExecuteService extends Thread { private void eventHandler() { for (WorkflowExecuteThread workflowExecuteThread : this.processInstanceExecCacheManager.getAll()) { - if (workflowExecuteThread.eventSize() == 0 - || StringUtils.isEmpty(workflowExecuteThread.getKey()) - || !workflowExecuteThread.isStart() - || eventHandlerMap.containsKey(workflowExecuteThread.getKey())) { - continue; - } - int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); - logger.info("handle process instance : {} , events count:{}", - processInstanceId, - workflowExecuteThread.eventSize()); - logger.info("already exists handler process size:{}", this.eventHandlerMap.size()); - eventHandlerMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); - ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread); - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(Object o) { - if (workflowExecuteThread.workFlowFinish()) { - processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); - notifyProcessChanged(); - logger.info("process instance {} finished.", processInstanceId); - } - if (workflowExecuteThread.getProcessInstance().getId() != processInstanceId) { - processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); - processInstanceExecCacheManager.cache(workflowExecuteThread.getProcessInstance().getId(), workflowExecuteThread); - - } - eventHandlerMap.remove(workflowExecuteThread.getKey()); - } - - private void notifyProcessChanged() { - if (Flag.NO == workflowExecuteThread.getProcessInstance().getIsSubProcess()) { - return; - } - - Map fatherMaps = processService.notifyProcessList(processInstanceId); - for (ProcessInstance processInstance : fatherMaps.keySet()) { - String address = NetUtils.getAddr(masterConfig.getListenPort()); - if (processInstance.getHost().equalsIgnoreCase(address)) { - notifyMyself(processInstance, fatherMaps.get(processInstance)); - } else { - notifyProcess(processInstance, fatherMaps.get(processInstance)); - } - } - } - - private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { - logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); - if (!processInstanceExecCacheManager.contains(processInstance.getId())) { - return; - } - WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecCacheManager.getByProcessInstanceId(processInstance.getId()); - StateEvent stateEvent = new StateEvent(); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(processInstance.getId()); - stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - workflowExecuteThreadNotify.addStateEvent(stateEvent); - } - - private void notifyProcess(ProcessInstance processInstance, TaskInstance taskInstance) { - String host = processInstance.getHost(); - if (StringUtils.isEmpty(host)) { - logger.info("process {} host is empty, cannot notify task {} now.", - processInstance.getId(), taskInstance.getId()); - return; - } - String address = host.split(":")[0]; - int port = Integer.parseInt(host.split(":")[1]); - logger.info("notify process {} task {} state change, host:{}", - processInstance.getId(), taskInstance.getId(), host); - StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( - processInstanceId, 0, workflowExecuteThread.getProcessInstance().getState(), processInstance.getId(), taskInstance.getId() - ); - stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); - } - - @Override - public void onFailure(Throwable throwable) { - logger.info("handle events {} failed.", processInstanceId); - logger.info("handle events failed.", throwable); - } - }; - Futures.addCallback(future, futureCallback, this.listeningExecutorService); + workflowExecuteThreadPool.executeEvent(workflowExecuteThread); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index e13de47feb..f3cdb4bc1c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -40,10 +39,8 @@ import org.apache.commons.collections4.CollectionUtils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +63,10 @@ public class MasterSchedulerService extends Thread { */ @Autowired private ProcessService processService; + + /** + * task processor factory + */ @Autowired private TaskProcessorFactory taskProcessorFactory; @@ -95,28 +96,15 @@ public class MasterSchedulerService extends Thread { private ThreadPoolExecutor masterPrepareExecService; /** - * master exec service + * workflow exec service */ - private ThreadPoolExecutor masterExecService; + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; - /** - * process timeout check list - */ - ConcurrentHashMap processTimeoutCheckList = new ConcurrentHashMap<>(); - - /** - * task time out check list - */ - ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); - - /** - * task retry check list - */ - ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); - + @Autowired private StateWheelExecuteThread stateWheelExecuteThread; /** @@ -124,15 +112,8 @@ public class MasterSchedulerService extends Thread { */ public void init() { this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads()); - this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getExecThreads()); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - - stateWheelExecuteThread = new StateWheelExecuteThread(processTimeoutCheckList, - taskTimeoutCheckList, - taskRetryCheckList, - this.processInstanceExecCacheManager, - masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); } @Override @@ -143,16 +124,6 @@ public class MasterSchedulerService extends Thread { } public void close() { - masterExecService.shutdown(); - boolean terminated = false; - try { - terminated = masterExecService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); - } - if (!terminated) { - logger.warn("masterExecService shutdown without terminated, increase await time"); - } nettyRemotingClient.close(); logger.info("master schedule service stopped..."); } @@ -205,15 +176,14 @@ public class MasterSchedulerService extends Thread { , nettyExecutorManager , processAlertManager , masterConfig - , taskTimeoutCheckList - , taskRetryCheckList + , stateWheelExecuteThread , taskProcessorFactory); this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread); if (processInstance.getTimeout() > 0) { - this.processTimeoutCheckList.put(processInstance.getId(), processInstance); + stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance); } - masterExecService.execute(workflowExecuteThread); + workflowExecuteThreadPool.startWorkflow(workflowExecuteThread); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index d697ab127c..4502b86cf9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -25,47 +25,57 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.hadoop.util.ThreadUtil; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; /** * 1. timeout check wheel * 2. dependent task check wheel */ +@Component public class StateWheelExecuteThread extends Thread { private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); - private ConcurrentHashMap processInstanceTimeoutCheckList; - private ConcurrentHashMap taskInstanceTimeoutCheckList; - private ConcurrentHashMap taskInstanceRetryCheckList; - private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + /** + * process timeout check list + */ + private ConcurrentLinkedQueue processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>(); - private int stateCheckIntervalSecs; - - public StateWheelExecuteThread(ConcurrentHashMap processInstanceTimeoutCheckList, - ConcurrentHashMap taskInstanceTimeoutCheckList, - ConcurrentHashMap taskInstanceRetryCheckList, - ProcessInstanceExecCacheManager processInstanceExecCacheManager, - int stateCheckIntervalSecs) { - this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList; - this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList; - this.taskInstanceRetryCheckList = taskInstanceRetryCheckList; - this.processInstanceExecCacheManager = processInstanceExecCacheManager; - this.stateCheckIntervalSecs = stateCheckIntervalSecs; - } + /** + * task time out check list, key is taskInstanceId, value is processInstanceId + */ + private ConcurrentHashMap taskInstanceTimeoutCheckList = new ConcurrentHashMap<>(); + + /** + * task retry check list, key is taskInstanceId, value is processInstanceId + */ + private ConcurrentHashMap taskInstanceRetryCheckList = new ConcurrentHashMap<>(); + + @Autowired + private MasterConfig masterConfig; + + @Autowired + private WorkflowExecuteThreadPool workflowExecuteThreadPool; + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; @Override public void run() { - - logger.info("state wheel thread start"); while (Stopper.isRunning()) { try { checkTask4Timeout(); @@ -74,30 +84,83 @@ public class StateWheelExecuteThread extends Thread { } catch (Exception e) { logger.error("state wheel thread check error:", e); } - ThreadUtil.sleepAtLeastIgnoreInterrupts(stateCheckIntervalSecs); + ThreadUtil.sleepAtLeastIgnoreInterrupts((long) masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); } } public void addProcess4TimeoutCheck(ProcessInstance processInstance) { - this.processInstanceTimeoutCheckList.put(processInstance.getId(), processInstance); + processInstanceTimeoutCheckList.add(processInstance.getId()); + } + + public void removeProcess4TimeoutCheck(ProcessInstance processInstance) { + processInstanceTimeoutCheckList.remove(processInstance.getId()); } public void addTask4TimeoutCheck(TaskInstance taskInstance) { - this.taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance); + if (taskInstanceTimeoutCheckList.containsKey(taskInstance.getId())) { + return; + } + TaskDefinition taskDefinition = taskInstance.getTaskDefine(); + if (taskDefinition == null) { + logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + return; + } + if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { + taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + } + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + taskInstanceTimeoutCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + } + } + + public void removeTask4TimeoutCheck(TaskInstance taskInstance) { + taskInstanceTimeoutCheckList.remove(taskInstance.getId()); } public void addTask4RetryCheck(TaskInstance taskInstance) { - this.taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance); + if (taskInstanceRetryCheckList.containsKey(taskInstance.getId())) { + return; + } + TaskDefinition taskDefinition = taskInstance.getTaskDefine(); + if (taskDefinition == null) { + logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); + return; + } + if (taskInstance.taskCanRetry()) { + taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + } + + if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { + taskInstanceRetryCheckList.put(taskInstance.getId(), taskInstance.getProcessInstanceId()); + } } - public void checkTask4Timeout() { + public void removeTask4RetryCheck(TaskInstance taskInstance) { + taskInstanceRetryCheckList.remove(taskInstance.getId()); + } + + private void checkTask4Timeout() { if (taskInstanceTimeoutCheckList.isEmpty()) { return; } - for (TaskInstance taskInstance : taskInstanceTimeoutCheckList.values()) { + for (Entry entry : taskInstanceTimeoutCheckList.entrySet()) { + int processInstanceId = entry.getValue(); + int taskInstanceId = entry.getKey(); + + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}", + processInstanceId, taskInstanceId); + taskInstanceTimeoutCheckList.remove(taskInstanceId); + continue; + } + TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + if (taskInstance == null) { + continue; + } if (TimeoutFlag.OPEN == taskInstance.getTaskDefine().getTimeoutFlag()) { - long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (0 >= timeRemain) { + long timeRemain = DateUtils.getRemainTime(taskInstance.getStartTime(), (long) taskInstance.getTaskDefine().getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); + if (timeRemain < 0) { addTaskTimeoutEvent(taskInstance); taskInstanceTimeoutCheckList.remove(taskInstance.getId()); } @@ -109,8 +172,21 @@ public class StateWheelExecuteThread extends Thread { if (taskInstanceRetryCheckList.isEmpty()) { return; } - - for (TaskInstance taskInstance : this.taskInstanceRetryCheckList.values()) { + for (Entry entry : taskInstanceRetryCheckList.entrySet()) { + int processInstanceId = entry.getValue(); + int taskInstanceId = entry.getKey(); + + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}, taskInstanceId:{}", + processInstanceId, taskInstanceId); + taskInstanceRetryCheckList.remove(taskInstanceId); + continue; + } + TaskInstance taskInstance = workflowExecuteThread.getTaskInstance(taskInstanceId); + if (taskInstance == null) { + continue; + } if (taskInstance.taskCanRetry() && taskInstance.retryTaskIntervalOverTime()) { addTaskStateChangeEvent(taskInstance); taskInstanceRetryCheckList.remove(taskInstance.getId()); @@ -125,49 +201,50 @@ public class StateWheelExecuteThread extends Thread { if (processInstanceTimeoutCheckList.isEmpty()) { return; } - for (ProcessInstance processInstance : this.processInstanceTimeoutCheckList.values()) { - - long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); - if (0 >= timeRemain) { + for (Integer processInstanceId : processInstanceTimeoutCheckList) { + if (processInstanceId == null) { + continue; + } + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId); + if (workflowExecuteThread == null) { + logger.warn("can not find workflowExecuteThread, this check event will remove, processInstanceId:{}", processInstanceId); + processInstanceTimeoutCheckList.remove(processInstanceId); + continue; + } + ProcessInstance processInstance = workflowExecuteThread.getProcessInstance(); + if (processInstance == null) { + continue; + } + long timeRemain = DateUtils.getRemainTime(processInstance.getStartTime(), (long) processInstance.getTimeout() * Constants.SEC_2_MINUTES_TIME_UNIT); + if (timeRemain < 0) { addProcessTimeoutEvent(processInstance); processInstanceTimeoutCheckList.remove(processInstance.getId()); } } } - private boolean addTaskStateChangeEvent(TaskInstance taskInstance) { + private void addTaskStateChangeEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_STATE_CHANGE); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); - addEvent(stateEvent); - return true; + workflowExecuteThreadPool.submitStateEvent(stateEvent); } - private boolean addTaskTimeoutEvent(TaskInstance taskInstance) { + private void addTaskTimeoutEvent(TaskInstance taskInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.TASK_TIMEOUT); stateEvent.setProcessInstanceId(taskInstance.getProcessInstanceId()); stateEvent.setTaskInstanceId(taskInstance.getId()); - addEvent(stateEvent); - return true; + workflowExecuteThreadPool.submitStateEvent(stateEvent); } - private boolean addProcessTimeoutEvent(ProcessInstance processInstance) { + private void addProcessTimeoutEvent(ProcessInstance processInstance) { StateEvent stateEvent = new StateEvent(); stateEvent.setType(StateEventType.PROCESS_TIMEOUT); stateEvent.setProcessInstanceId(processInstance.getId()); - addEvent(stateEvent); - return true; - } - - private void addEvent(StateEvent stateEvent) { - if (!processInstanceExecCacheManager.contains(stateEvent.getProcessInstanceId())) { - return; - } - WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); - workflowExecuteThread.addStateEvent(stateEvent); + workflowExecuteThreadPool.submitStateEvent(stateEvent); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 24d49e89a5..c20ddf791e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -96,7 +96,7 @@ import com.google.common.collect.Lists; /** * master exec thread,split dag */ -public class WorkflowExecuteThread implements Runnable { +public class WorkflowExecuteThread { /** * logger of WorkflowExecuteThread @@ -203,16 +203,6 @@ public class WorkflowExecuteThread implements Runnable { */ private List complementListDate = Lists.newLinkedList(); - /** - * task timeout check list - */ - private ConcurrentHashMap taskTimeoutCheckList; - - /** - * task retry check list - */ - private ConcurrentHashMap taskRetryCheckList; - /** * state event queue */ @@ -223,6 +213,11 @@ public class WorkflowExecuteThread implements Runnable { */ private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); + /** + * state wheel execute thread + */ + private StateWheelExecuteThread stateWheelExecuteThread; + /** * constructor of WorkflowExecuteThread * @@ -231,7 +226,7 @@ public class WorkflowExecuteThread implements Runnable { * @param nettyExecutorManager nettyExecutorManager * @param processAlertManager processAlertManager * @param masterConfig masterConfig - * @param taskTimeoutCheckList taskTimeoutCheckList + * @param stateWheelExecuteThread stateWheelExecuteThread * @param taskProcessorFactory taskProcessorFactory */ public WorkflowExecuteThread(ProcessInstance processInstance @@ -239,32 +234,17 @@ public class WorkflowExecuteThread implements Runnable { , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager , MasterConfig masterConfig - , ConcurrentHashMap taskTimeoutCheckList - , ConcurrentHashMap taskRetryCheckList + , StateWheelExecuteThread stateWheelExecuteThread , TaskProcessorFactory taskProcessorFactory) { this.processService = processService; this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; this.processAlertManager = processAlertManager; - this.taskTimeoutCheckList = taskTimeoutCheckList; - this.taskRetryCheckList = taskRetryCheckList; + this.stateWheelExecuteThread = stateWheelExecuteThread; this.taskProcessorFactory = taskProcessorFactory; } - @Override - public void run() { - try { - if (!this.isStart()) { - startProcess(); - } else { - handleEvents(); - } - } catch (Exception e) { - logger.error("handler error:", e); - } - } - /** * the process start nodes are submitted completely. */ @@ -272,9 +252,14 @@ public class WorkflowExecuteThread implements Runnable { return this.isStart; } - private void handleEvents() { + /** + * handle event + */ + public void handleEvents() { + if (!isStart) { + return; + } while (!this.stateEvents.isEmpty()) { - try { StateEvent stateEvent = this.stateEvents.peek(); if (stateEventHandler(stateEvent)) { @@ -282,7 +267,6 @@ public class WorkflowExecuteThread implements Runnable { } } catch (Exception e) { logger.error("state handle error:", e); - } } } @@ -457,8 +441,8 @@ public class WorkflowExecuteThread implements Runnable { task.getRetryTimes(), task.getMaxRetryTimes(), task.getRetryInterval()); - this.addTimeoutCheck(task); - this.addRetryCheck(task); + stateWheelExecuteThread.addTask4TimeoutCheck(task); + stateWheelExecuteThread.addTask4RetryCheck(task); } else { submitStandByTask(); } @@ -467,8 +451,8 @@ public class WorkflowExecuteThread implements Runnable { completeTaskMap.put(Long.toString(task.getTaskCode()), task.getId()); activeTaskProcessorMaps.remove(task.getId()); - taskTimeoutCheckList.remove(task.getId()); - taskRetryCheckList.remove(task.getId()); + stateWheelExecuteThread.removeTask4TimeoutCheck(task); + stateWheelExecuteThread.removeTask4RetryCheck(task); if (task.getState().typeIsSuccess()) { processInstance.setVarPool(task.getVarPool()); @@ -660,13 +644,21 @@ public class WorkflowExecuteThread implements Runnable { return false; } - private void startProcess() throws Exception { - if (this.taskInstanceMap.size() == 0) { + /** + * process start handle + */ + public void startProcess() { + if (this.taskInstanceMap.size() > 0) { + return; + } + try { isStart = false; buildFlowDag(); initTaskQueue(); submitPostNode(null); isStart = true; + } catch (Exception e) { + logger.error("start process error, process instance id:{}", processInstance.getId(), e); } } @@ -837,8 +829,8 @@ public class WorkflowExecuteThread implements Runnable { activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor); taskProcessor.run(); - addTimeoutCheck(taskInstance); - addRetryCheck(taskInstance); + stateWheelExecuteThread.addTask4TimeoutCheck(taskInstance); + stateWheelExecuteThread.addTask4RetryCheck(taskInstance); if (taskProcessor.taskState().typeIsFinished()) { StateEvent stateEvent = new StateEvent(); @@ -871,42 +863,6 @@ public class WorkflowExecuteThread implements Runnable { } } - private void addTimeoutCheck(TaskInstance taskInstance) { - if (taskTimeoutCheckList.containsKey(taskInstance.getId())) { - return; - } - TaskDefinition taskDefinition = taskInstance.getTaskDefine(); - if (taskDefinition == null) { - logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); - return; - } - if (TimeoutFlag.OPEN == taskDefinition.getTimeoutFlag()) { - this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - } - if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - this.taskTimeoutCheckList.put(taskInstance.getId(), taskInstance); - } - } - - private void addRetryCheck(TaskInstance taskInstance) { - if (taskRetryCheckList.containsKey(taskInstance.getId())) { - return; - } - TaskDefinition taskDefinition = taskInstance.getTaskDefine(); - if (taskDefinition == null) { - logger.error("taskDefinition is null, taskId:{}", taskInstance.getId()); - return; - } - - if (taskInstance.taskCanRetry()) { - this.taskRetryCheckList.put(taskInstance.getId(), taskInstance); - } - - if (taskInstance.isDependTask() || taskInstance.isSubProcess()) { - this.taskRetryCheckList.put(taskInstance.getId(), taskInstance); - } - } - /** * find task instance in db. * in case submit more than one same name task in the same time. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java new file mode 100644 index 0000000000..45870558f4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.enums.StateEventType; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; +import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import org.apache.commons.lang.StringUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +@Component +public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { + + private static final Logger logger = LoggerFactory.getLogger(WorkflowExecuteThreadPool.class); + + @Autowired + private MasterConfig masterConfig; + + @Autowired + private ProcessService processService; + + @Autowired + private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + + @Autowired + private StateEventCallbackService stateEventCallbackService; + + /** + * multi-thread filter, avoid handling workflow at the same time + */ + private ConcurrentHashMap multiThreadFilterMap = new ConcurrentHashMap(); + + @PostConstruct + private void init() { + this.setDaemon(true); + this.setThreadNamePrefix("Workflow-Execute-Thread-"); + this.setMaxPoolSize(masterConfig.getExecThreads()); + this.setCorePoolSize(masterConfig.getExecThreads()); + } + + /** + * submit state event + */ + public void submitStateEvent(StateEvent stateEvent) { + WorkflowExecuteThread workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + if (workflowExecuteThread == null) { + logger.error("workflowExecuteThread is null, processInstanceId:{}", stateEvent.getProcessInstanceId()); + return; + } + workflowExecuteThread.addStateEvent(stateEvent); + } + + /** + * start workflow + */ + public void startWorkflow(WorkflowExecuteThread workflowExecuteThread) { + submit(workflowExecuteThread::startProcess); + } + + /** + * execute workflow + */ + public void executeEvent(WorkflowExecuteThread workflowExecuteThread) { + if (!workflowExecuteThread.isStart() || workflowExecuteThread.eventSize() == 0) { + return; + } + if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { + return; + } + int processInstanceId = workflowExecuteThread.getProcessInstance().getId(); + ListenableFuture future = this.submitListenable(() -> { + workflowExecuteThread.handleEvents(); + multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); + }); + future.addCallback(new ListenableFutureCallback() { + @Override + public void onFailure(Throwable ex) { + logger.error("handle events {} failed", processInstanceId, ex); + multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + } + + @Override + public void onSuccess(Object result) { + if (workflowExecuteThread.workFlowFinish()) { + processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId); + notifyProcessChanged(workflowExecuteThread.getProcessInstance()); + logger.info("process instance {} finished.", processInstanceId); + } + multiThreadFilterMap.remove(workflowExecuteThread.getKey()); + } + }); + } + + /** + * notify process change + */ + private void notifyProcessChanged(ProcessInstance finishProcessInstance) { + if (Flag.NO == finishProcessInstance.getIsSubProcess()) { + return; + } + Map fatherMaps = processService.notifyProcessList(finishProcessInstance.getId()); + for (ProcessInstance processInstance : fatherMaps.keySet()) { + String address = NetUtils.getAddr(masterConfig.getListenPort()); + if (processInstance.getHost().equalsIgnoreCase(address)) { + this.notifyMyself(processInstance, fatherMaps.get(processInstance)); + } else { + this.notifyProcess(finishProcessInstance, processInstance, fatherMaps.get(processInstance)); + } + } + } + + /** + * notify myself + */ + private void notifyMyself(ProcessInstance processInstance, TaskInstance taskInstance) { + logger.info("notify process {} task {} state change", processInstance.getId(), taskInstance.getId()); + if (!processInstanceExecCacheManager.contains(processInstance.getId())) { + return; + } + StateEvent stateEvent = new StateEvent(); + stateEvent.setTaskInstanceId(taskInstance.getId()); + stateEvent.setType(StateEventType.TASK_STATE_CHANGE); + stateEvent.setProcessInstanceId(processInstance.getId()); + stateEvent.setExecutionStatus(ExecutionStatus.RUNNING_EXECUTION); + this.submitStateEvent(stateEvent); + } + + /** + * notify process's master + */ + private void notifyProcess(ProcessInstance finishProcessInstance, ProcessInstance processInstance, TaskInstance taskInstance) { + String host = processInstance.getHost(); + if (StringUtils.isEmpty(host)) { + logger.error("process {} host is empty, cannot notify task {} now", processInstance.getId(), taskInstance.getId()); + return; + } + String address = host.split(":")[0]; + int port = Integer.parseInt(host.split(":")[1]); + StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand( + finishProcessInstance.getId(), 0, finishProcessInstance.getState(), processInstance.getId(), taskInstance.getId() + ); + stateEventCallbackService.sendResult(address, port, stateEventChangeCommand.convert2Command()); + } +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index 936b70f74d..8db1bd970e 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.runner.StateWheelExecuteThread; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -84,6 +85,8 @@ public class WorkflowExecuteThreadTest { private TaskProcessorFactory taskProcessorFactory; + private StateWheelExecuteThread stateWheelExecuteThread; + @Before public void init() throws Exception { processService = mock(ProcessService.class); @@ -107,9 +110,8 @@ public class WorkflowExecuteThreadTest { processDefinition.setGlobalParamList(Collections.emptyList()); Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition); - ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); - ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList, taskProcessorFactory)); + stateWheelExecuteThread = mock(StateWheelExecuteThread.class); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, stateWheelExecuteThread, taskProcessorFactory)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true);