Browse Source

fix duplicate event (#15008)

3.1.9-release
caishunfeng 12 months ago committed by GitHub
parent
commit
b40a73713a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  2. 41
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

18
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -17,12 +17,11 @@
package org.apache.dolphinscheduler.server.master.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -35,16 +34,20 @@ import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Check thread
* 1. timeout task check
@ -401,8 +404,11 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
.type(StateEventType.TASK_STATE_CHANGE)
.status(TaskExecutionStatus.RUNNING_EXECUTION)
.build();
// will skip submit check event if existed, avoid event stacking
if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) {
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
}
private void addProcessStopEvent(ProcessInstance processInstance) {
WorkflowStateEvent stateEvent = WorkflowStateEvent.builder()

41
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -88,6 +88,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.utils.DagHelper;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -291,19 +292,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} catch (StateEventHandleError stateEventHandleError) {
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event: {}",
stateEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep
// in the stateEvents queue.
logger.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
@ -322,6 +323,18 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return key;
}
public boolean existStateEvent(StateEvent stateEvent) {
if (CollectionUtils.isNotEmpty(this.stateEvents)) {
Optional<StateEvent> optional = this.stateEvents.stream()
.filter(e -> e.getProcessInstanceId() == stateEvent.getProcessInstanceId()
&& Objects.equals(e.getTaskInstanceId(), stateEvent.getTaskInstanceId())
&& e.getType() == stateEvent.getType())
.findFirst();
return optional.isPresent();
}
return false;
}
public boolean addStateEvent(StateEvent stateEvent) {
if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.info("state event would be abounded :{}", stateEvent);
@ -606,7 +619,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
Date scheduleDate = processInstance.getScheduleTime();
if (scheduleDate == null) {
if (CollectionUtils.isEmpty(complementListDate)) {
logger.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId());
logger.info("complementListDate is empty, process complement end. process id:{}",
processInstance.getId());
return true;
}
@ -831,7 +845,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
logger.warn(
"Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
task.getTaskCode());
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
@ -980,7 +995,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" +
logger.info(
"Submitted task will not be dispatch right now because the first time to try to acquire" +
" task group failed, taskInstanceName: {}, taskGroupId: {}",
taskInstance.getName(), taskGroupId);
return Optional.of(taskInstance);
@ -989,7 +1005,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName());
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(),
taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
@ -1446,9 +1463,10 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<String> nextTaskList =
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " +
"dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName
);
logger.info(
"DependTask is a condition task, and its next condition branch does not hava current task, " +
"dependTaskCode: {}, currentTaskCode: {}",
dependNodeName, nextNodeName);
return false;
}
} else {
@ -1824,7 +1842,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
logger.info(
"Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.runner;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -34,6 +32,14 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -42,9 +48,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Strings;
/**
* Used to execute {@link WorkflowExecuteRunnable}.
@ -82,6 +86,17 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
this.setCorePoolSize(masterConfig.getExecThreads());
}
public boolean existStateEvent(StateEvent stateEvent) {
WorkflowExecuteRunnable workflowExecuteThread =
processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}",
stateEvent);
return false;
}
return workflowExecuteThread.existStateEvent(stateEvent);
}
/**
* submit state event
*/

Loading…
Cancel
Save