Browse Source

Fix task wake up failed will block the event (#13466)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
12dd60fa46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
  2. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  5. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  6. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

@ -25,7 +25,7 @@ public enum StateEventType {
TASK_STATE_CHANGE(1, "task state change"), TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"), PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"), TASK_TIMEOUT(3, "task timeout"),
WAIT_TASK_GROUP(4, "wait task group"), WAKE_UP_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry"), TASK_RETRY(5, "task retry"),
PROCESS_BLOCKED(6, "process blocked"); PROCESS_BLOCKED(6, "process blocked");

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java

@ -32,16 +32,18 @@ public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
@Override @Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) throws StateEventHandleFailure { StateEvent stateEvent) {
logger.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId()); logger.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId());
if (!workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) { if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
throw new StateEventHandleFailure("Task state event handle failed due to robing taskGroup resource failed"); logger.info("Success wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId());
} else {
logger.info("Failed to wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId());
} }
return true; return true;
} }
@Override @Override
public StateEventType getEventType() { public StateEventType getEventType() {
return StateEventType.WAIT_TASK_GROUP; return StateEventType.WAKE_UP_TASK_GROUP;
} }
} }

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java

@ -58,7 +58,7 @@ public class TaskEventProcessor implements NettyRequestProcessor {
.processInstanceId(taskEventChangeCommand.getProcessInstanceId()) .processInstanceId(taskEventChangeCommand.getProcessInstanceId())
.taskInstanceId(taskEventChangeCommand.getTaskInstanceId()) .taskInstanceId(taskEventChangeCommand.getTaskInstanceId())
.key(taskEventChangeCommand.getKey()) .key(taskEventChangeCommand.getKey())
.type(StateEventType.WAIT_TASK_GROUP) .type(StateEventType.WAKE_UP_TASK_GROUP)
.build(); .build();
try { try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(), LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -482,7 +482,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskStateEvent nextEvent = TaskStateEvent.builder() TaskStateEvent nextEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId()) .processInstanceId(processInstance.getId())
.taskInstanceId(nextTaskInstance.getId()) .taskInstanceId(nextTaskInstance.getId())
.type(StateEventType.WAIT_TASK_GROUP) .type(StateEventType.WAKE_UP_TASK_GROUP)
.build(); .build();
this.stateEvents.add(nextEvent); this.stateEvents.add(nextEvent);
} else { } else {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -109,7 +109,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
return; return;
} }
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) { if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
logger.warn("The workflow has been executed by another thread"); logger.debug("The workflow has been executed by another thread");
return; return;
} }
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread); multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -2637,6 +2637,7 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getId(), taskId); processInstance.getId(), taskId);
Host host = new Host(processInstance.getHost()); Host host = new Host(processInstance.getHost());
stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType)); stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType));
logger.info("Success send command to master: {}, command: {}", host, taskEventChangeCommand);
} }
@Override @Override

Loading…
Cancel
Save