From b40a73713af154e6880d45be4b4ed5e093315300 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Wed, 11 Oct 2023 20:50:15 +0800 Subject: [PATCH] fix duplicate event (#15008) --- .../runner/StateWheelExecuteThread.java | 20 +++++---- .../runner/WorkflowExecuteRunnable.java | 41 ++++++++++++++----- .../runner/WorkflowExecuteThreadPool.java | 25 ++++++++--- 3 files changed, 63 insertions(+), 23 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 04d4dc5621..5f526115d5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -17,12 +17,11 @@ package org.apache.dolphinscheduler.server.master.runner; -import lombok.NonNull; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -35,16 +34,20 @@ import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.dolphinscheduler.service.utils.LoggerUtils; + +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; + +import javax.annotation.PostConstruct; + +import lombok.NonNull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedQueue; - /** * Check thread * 1. timeout task check @@ -401,7 +404,10 @@ public class StateWheelExecuteThread extends BaseDaemonThread { .type(StateEventType.TASK_STATE_CHANGE) .status(TaskExecutionStatus.RUNNING_EXECUTION) .build(); - workflowExecuteThreadPool.submitStateEvent(stateEvent); + // will skip submit check event if existed, avoid event stacking + if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) { + workflowExecuteThreadPool.submitStateEvent(stateEvent); + } } private void addProcessStopEvent(ProcessInstance processInstance) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 8bbd0bb064..50bfc2099c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -88,6 +88,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; import org.apache.dolphinscheduler.service.utils.LoggerUtils; + import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -291,19 +292,19 @@ public class WorkflowExecuteRunnable implements Callable { } catch (StateEventHandleError stateEventHandleError) { logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); this.stateEvents.remove(stateEvent); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); } catch (StateEventHandleException stateEventHandleException) { logger.error("State event handle error, will retry this event: {}", stateEvent, stateEventHandleException); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep // in the stateEvents queue. logger.error("State event handle error, get a unknown exception, will retry this event: {}", stateEvent, e); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } @@ -322,6 +323,18 @@ public class WorkflowExecuteRunnable implements Callable { return key; } + public boolean existStateEvent(StateEvent stateEvent) { + if (CollectionUtils.isNotEmpty(this.stateEvents)) { + Optional optional = this.stateEvents.stream() + .filter(e -> e.getProcessInstanceId() == stateEvent.getProcessInstanceId() + && Objects.equals(e.getTaskInstanceId(), stateEvent.getTaskInstanceId()) + && e.getType() == stateEvent.getType()) + .findFirst(); + return optional.isPresent(); + } + return false; + } + public boolean addStateEvent(StateEvent stateEvent) { if (processInstance.getId() != stateEvent.getProcessInstanceId()) { logger.info("state event would be abounded :{}", stateEvent); @@ -606,7 +619,8 @@ public class WorkflowExecuteRunnable implements Callable { Date scheduleDate = processInstance.getScheduleTime(); if (scheduleDate == null) { if (CollectionUtils.isEmpty(complementListDate)) { - logger.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId()); + logger.info("complementListDate is empty, process complement end. process id:{}", + processInstance.getId()); return true; } @@ -831,7 +845,8 @@ public class WorkflowExecuteRunnable implements Callable { task.getTaskCode(), task.getState()); if (validTaskMap.containsKey(task.getTaskCode())) { - logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", + logger.warn( + "Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", task.getTaskCode()); int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); @@ -980,7 +995,8 @@ public class WorkflowExecuteRunnable implements Callable { taskInstance.getProcessInstanceId(), taskInstance.getTaskGroupPriority()); if (!acquireTaskGroup) { - logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" + + logger.info( + "Submitted task will not be dispatch right now because the first time to try to acquire" + " task group failed, taskInstanceName: {}, taskGroupId: {}", taskInstance.getName(), taskGroupId); return Optional.of(taskInstance); @@ -989,7 +1005,8 @@ public class WorkflowExecuteRunnable implements Callable { boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); if (!dispatchSuccess) { - logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName()); + logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), + taskInstance.getName()); return Optional.empty(); } taskProcessor.action(TaskAction.RUN); @@ -1446,9 +1463,10 @@ public class WorkflowExecuteRunnable implements Callable { List nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); if (!nextTaskList.contains(nextNodeName)) { - logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " + - "dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName - ); + logger.info( + "DependTask is a condition task, and its next condition branch does not hava current task, " + + "dependTaskCode: {}, currentTaskCode: {}", + dependNodeName, nextNodeName); return false; } } else { @@ -1824,7 +1842,8 @@ public class WorkflowExecuteRunnable implements Callable { TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); if (retryTask != null && retryTask.getState().isForceSuccess()) { task.setState(retryTask.getState()); - logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", + logger.info( + "Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", task.getName(), task.getId()); removeTaskFromStandbyList(task); completeTaskMap.put(task.getTaskCode(), task.getId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index e26df60244..510e27297a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner; -import com.google.common.base.Strings; -import lombok.NonNull; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.NetUtils; @@ -34,6 +32,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.utils.LoggerUtils; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.annotation.PostConstruct; + +import lombok.NonNull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,9 +48,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; -import javax.annotation.PostConstruct; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import com.google.common.base.Strings; /** * Used to execute {@link WorkflowExecuteRunnable}. @@ -82,6 +86,17 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { this.setCorePoolSize(masterConfig.getExecThreads()); } + public boolean existStateEvent(StateEvent stateEvent) { + WorkflowExecuteRunnable workflowExecuteThread = + processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); + if (workflowExecuteThread == null) { + logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", + stateEvent); + return false; + } + return workflowExecuteThread.existStateEvent(stateEvent); + } + /** * submit state event */