diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index 5f526115d5..04d4dc5621 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -17,11 +17,12 @@ package org.apache.dolphinscheduler.server.master.runner; +import lombok.NonNull; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; -import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -34,20 +35,16 @@ import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent; import org.apache.dolphinscheduler.server.master.runner.task.TaskInstanceKey; import org.apache.dolphinscheduler.service.utils.LoggerUtils; - -import java.util.Optional; -import java.util.concurrent.ConcurrentLinkedQueue; - -import javax.annotation.PostConstruct; - -import lombok.NonNull; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; +import javax.annotation.PostConstruct; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; + /** * Check thread * 1. timeout task check @@ -404,10 +401,7 @@ public class StateWheelExecuteThread extends BaseDaemonThread { .type(StateEventType.TASK_STATE_CHANGE) .status(TaskExecutionStatus.RUNNING_EXECUTION) .build(); - // will skip submit check event if existed, avoid event stacking - if (!workflowExecuteThreadPool.existStateEvent(stateEvent)) { - workflowExecuteThreadPool.submitStateEvent(stateEvent); - } + workflowExecuteThreadPool.submitStateEvent(stateEvent); } private void addProcessStopEvent(ProcessInstance processInstance) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 50bfc2099c..8bbd0bb064 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -88,7 +88,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; import org.apache.dolphinscheduler.service.utils.LoggerUtils; - import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -292,19 +291,19 @@ public class WorkflowExecuteRunnable implements Callable { } catch (StateEventHandleError stateEventHandleError) { logger.error("State event handle error, will remove this event: {}", stateEvent, stateEventHandleError); this.stateEvents.remove(stateEvent); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (StateEventHandleException stateEventHandleException) { logger.error("State event handle error, will retry this event: {}", stateEvent, stateEventHandleException); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } catch (Exception e) { // we catch the exception here, since if the state event handle failed, the state event will still keep // in the stateEvents queue. logger.error("State event handle error, get a unknown exception, will retry this event: {}", stateEvent, e); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS_SHORT); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); } finally { LoggerUtils.removeWorkflowAndTaskInstanceIdMDC(); } @@ -323,18 +322,6 @@ public class WorkflowExecuteRunnable implements Callable { return key; } - public boolean existStateEvent(StateEvent stateEvent) { - if (CollectionUtils.isNotEmpty(this.stateEvents)) { - Optional optional = this.stateEvents.stream() - .filter(e -> e.getProcessInstanceId() == stateEvent.getProcessInstanceId() - && Objects.equals(e.getTaskInstanceId(), stateEvent.getTaskInstanceId()) - && e.getType() == stateEvent.getType()) - .findFirst(); - return optional.isPresent(); - } - return false; - } - public boolean addStateEvent(StateEvent stateEvent) { if (processInstance.getId() != stateEvent.getProcessInstanceId()) { logger.info("state event would be abounded :{}", stateEvent); @@ -619,8 +606,7 @@ public class WorkflowExecuteRunnable implements Callable { Date scheduleDate = processInstance.getScheduleTime(); if (scheduleDate == null) { if (CollectionUtils.isEmpty(complementListDate)) { - logger.info("complementListDate is empty, process complement end. process id:{}", - processInstance.getId()); + logger.info("complementListDate is empty, process complement end. process id:{}", processInstance.getId()); return true; } @@ -845,8 +831,7 @@ public class WorkflowExecuteRunnable implements Callable { task.getTaskCode(), task.getState()); if (validTaskMap.containsKey(task.getTaskCode())) { - logger.warn( - "Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", + logger.warn("Have same taskCode taskInstance when init task queue, need to check taskExecutionStatus, taskCode:{}", task.getTaskCode()); int oldTaskInstanceId = validTaskMap.get(task.getTaskCode()); TaskInstance oldTaskInstance = taskInstanceMap.get(oldTaskInstanceId); @@ -995,8 +980,7 @@ public class WorkflowExecuteRunnable implements Callable { taskInstance.getProcessInstanceId(), taskInstance.getTaskGroupPriority()); if (!acquireTaskGroup) { - logger.info( - "Submitted task will not be dispatch right now because the first time to try to acquire" + + logger.info("Submitted task will not be dispatch right now because the first time to try to acquire" + " task group failed, taskInstanceName: {}, taskGroupId: {}", taskInstance.getName(), taskGroupId); return Optional.of(taskInstance); @@ -1005,8 +989,7 @@ public class WorkflowExecuteRunnable implements Callable { boolean dispatchSuccess = taskProcessor.action(TaskAction.DISPATCH); if (!dispatchSuccess) { - logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), - taskInstance.getName()); + logger.error("Dispatch standby process {} task {} failed", processInstance.getName(), taskInstance.getName()); return Optional.empty(); } taskProcessor.action(TaskAction.RUN); @@ -1463,10 +1446,9 @@ public class WorkflowExecuteRunnable implements Callable { List nextTaskList = DagHelper.parseConditionTask(dependNodeName, skipTaskNodeMap, dag, getCompleteTaskInstanceMap()); if (!nextTaskList.contains(nextNodeName)) { - logger.info( - "DependTask is a condition task, and its next condition branch does not hava current task, " + - "dependTaskCode: {}, currentTaskCode: {}", - dependNodeName, nextNodeName); + logger.info("DependTask is a condition task, and its next condition branch does not hava current task, " + + "dependTaskCode: {}, currentTaskCode: {}", dependNodeName, nextNodeName + ); return false; } } else { @@ -1842,8 +1824,7 @@ public class WorkflowExecuteRunnable implements Callable { TaskInstance retryTask = processService.findTaskInstanceById(task.getId()); if (retryTask != null && retryTask.getState().isForceSuccess()) { task.setState(retryTask.getState()); - logger.info( - "Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", + logger.info("Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", task.getName(), task.getId()); removeTaskFromStandbyList(task); completeTaskMap.put(task.getTaskCode(), task.getId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 510e27297a..e26df60244 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner; +import com.google.common.base.Strings; +import lombok.NonNull; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.utils.NetUtils; @@ -32,14 +34,6 @@ import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.event.TaskStateEvent; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.utils.LoggerUtils; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import javax.annotation.PostConstruct; - -import lombok.NonNull; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -48,7 +42,9 @@ import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; -import com.google.common.base.Strings; +import javax.annotation.PostConstruct; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Used to execute {@link WorkflowExecuteRunnable}. @@ -86,17 +82,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { this.setCorePoolSize(masterConfig.getExecThreads()); } - public boolean existStateEvent(StateEvent stateEvent) { - WorkflowExecuteRunnable workflowExecuteThread = - processInstanceExecCacheManager.getByProcessInstanceId(stateEvent.getProcessInstanceId()); - if (workflowExecuteThread == null) { - logger.warn("Submit state event error, cannot from workflowExecuteThread from cache manager, stateEvent:{}", - stateEvent); - return false; - } - return workflowExecuteThread.existStateEvent(stateEvent); - } - /** * submit state event */