Browse Source

Revert "fix duplicate event (#14986)" (#15006)

This reverts commit 7a38b87c9a.
3.1.9-release
caishunfeng 12 months ago committed by GitHub
parent
commit
160fde19aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java
  2. 41
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

20
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -17,11 +17,12 @@
package org.apache.dolphinscheduler.server.master.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -34,20 +35,16 @@ import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Check thread
* 1. timeout task check
@ -404,10 +401,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
.type(StateEventType.TASK_STATE_CHANGE)
.status(TaskExecutionStatus.RUNNING_EXECUTION)
.build();
// will skip submit check event if existed, avoid event stacking
if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) {
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
private void addProcessStopEvent(ProcessInstance processInstance) {

41
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -88,7 +88,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.dolphinscheduler.service.utils.DagHelper;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@ -292,19 +291,19 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
} catch (StateEventHandleError stateEventHandleError) {
logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError);
this.stateEvents.remove(stateEvent);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (StateEventHandleException stateEventHandleException) {
logger.error("State event handle error, will retry this event: {}",
stateEvent,
stateEventHandleException);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
// we catch the exception here, since if the state event handle failed, the state event will still keep
// in the stateEvents queue.
logger.error("State event handle error, get a unknown exception, will retry this event: {}",
stateEvent,
e);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
@ -323,18 +322,6 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
return key;
}
public boolean existStateEvent(StateEvent stateEvent) {
if (CollectionUtils.isNotEmpty(this.stateEvents)) {
Optional<StateEvent> optional = this.stateEvents.stream()
.filter(e -> e.getProcessInstanceId() == stateEvent.getProcessInstanceId()
&& Objects.equals(e.getTaskInstanceId(), stateEvent.getTaskInstanceId())
&& e.getType() == stateEvent.getType())
.findFirst();
return optional.isPresent();
}
return false;
}
public boolean addStateEvent(StateEvent stateEvent) {
if (processInstance.getId() != stateEvent.getProcessInstanceId()) {
logger.info("state event would be abounded :{}", stateEvent);
@ -619,8 +606,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
Date scheduleDate = processInstance.getScheduleTime();
if (scheduleDate == null) {
if (CollectionUtils.isEmpty(complementListDate)) {
logger.info("complementListDate is empty, process complement end. process id:{}",
processInstance.getId());
logger.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId());
return true;
}
@ -845,8 +831,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
task.getTaskCode(),
task.getState());
if (validTaskMap.containsKey(task.getTaskCode())) {
logger.warn(
"Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}",
task.getTaskCode());
int oldTaskInstanceId = validTaskMap.get(task.getTaskCode());
TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId);
@ -995,8 +980,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
taskInstance.getProcessInstanceId(),
taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info(
"Submitted task will not be dispatch right now because the first time to try to acquire" +
logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" +
" task group failed, taskInstanceName: {}, taskGroupId: {}",
taskInstance.getName(), taskGroupId);
return Optional.of(taskInstance);
@ -1005,8 +989,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH);
if (!dispatchSuccess) {
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(),
taskInstance.getName());
logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName());
return Optional.empty();
}
taskProcessor.action(TaskAction.RUN);
@ -1463,10 +1446,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
List<String> nextTaskList =
DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap());
if (!nextTaskList.contains(nextNodeName)) {
logger.info(
"DependTask is a condition task, and its next condition branch does not hava current task, " +
"dependTaskCode: {}, currentTaskCode: {}",
dependNodeName, nextNodeName);
logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " +
"dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName
);
return false;
}
} else {
@ -1842,8 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskInstance retryTask = processService.findTaskInstanceById(task.getId());
if (retryTask != null && retryTask.getState().isForceSuccess()) {
task.setState(retryTask.getState());
logger.info(
"Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}",
task.getName(), task.getId());
removeTaskFromStandbyList(task);
completeTaskMap.put(task.getTaskCode(), task.getId());

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -32,14 +34,6 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent;
import org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.utils.LoggerUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -48,7 +42,9 @@ import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import com.google.common.base.Strings;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Used to execute {@link WorkflowExecuteRunnable}.
@ -86,17 +82,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
this.setCorePoolSize(masterConfig.getExecThreads());
}
public boolean existStateEvent(StateEvent stateEvent) {
WorkflowExecuteRunnable workflowExecuteThread =
processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId());
if (workflowExecuteThread == null) {
logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}",
stateEvent);
return false;
}
return workflowExecuteThread.existStateEvent(stateEvent);
}
/**
* submit state event
*/

Loading…
Cancel
Save