Browse Source

cherry-pick Fix task wake up failed will block the event processing

3.1.4-release
Wenjun Ruan 2 years ago committed by zhuangchong
parent
commit
36284f9b9c
  1. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java
  2. 12
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java
  4. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  5. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java
  6. 1
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

@ -25,7 +25,7 @@ public enum StateEventType {
TASK_STATE_CHANGE(1, "task state change"),
PROCESS_TIMEOUT(2, "process timeout"),
TASK_TIMEOUT(3, "task timeout"),
WAIT_TASK_GROUP(4, "wait task group"),
WAKE_UP_TASK_GROUP(4, "wait task group"),
TASK_RETRY(5, "task retry"),
PROCESS_BLOCKED(6, "process blocked");

12
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskWaitTaskGroupStateHandler.java

@ -30,13 +30,19 @@ public class TaskWaitTaskGroupStateHandler implements StateEventHandler {
private static final Logger logger = LoggerFactory.getLogger(TaskWaitTaskGroupStateHandler.class);
@Override
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable, StateEvent stateEvent) {
public boolean handleStateEvent(WorkflowExecuteRunnable workflowExecuteRunnable,
StateEvent stateEvent) {
logger.info("Handle task instance wait task group event, taskInstanceId: {}", stateEvent.getTaskInstanceId());
return workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent);
if (workflowExecuteRunnable.checkForceStartAndWakeUp(stateEvent)) {
logger.info("Success wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId());
} else {
logger.info("Failed to wake up task instance, taskInstanceId: {}", stateEvent.getTaskInstanceId());
}
return true;
}
@Override
public StateEventType getEventType() {
return StateEventType.WAIT_TASK_GROUP;
return StateEventType.WAKE_UP_TASK_GROUP;
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskEventProcessor.java

@ -56,7 +56,7 @@ public class TaskEventProcessor implements NettyRequestProcessor {
.processInstanceId(taskEventChangeCommand.getProcessInstanceId())
.taskInstanceId(taskEventChangeCommand.getTaskInstanceId())
.key(taskEventChangeCommand.getKey())
.type(StateEventType.WAIT_TASK_GROUP)
.type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(stateEvent.getProcessInstanceId(),

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -443,7 +443,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
TaskStateEvent nextEvent = TaskStateEvent.builder()
.processInstanceId(processInstance.getId())
.taskInstanceId(nextTaskInstance.getId())
.type(StateEventType.WAIT_TASK_GROUP)
.type(StateEventType.WAKE_UP_TASK_GROUP)
.build();
this.stateEvents.add(nextEvent);
} else {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java

@ -105,7 +105,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
return;
}
if (multiThreadFilterMap.containsKey(workflowExecuteThread.getKey())) {
logger.warn("The workflow has been executed by another thread");
logger.debug("The workflow has been executed by another thread");
return;
}
multiThreadFilterMap.put(workflowExecuteThread.getKey(), workflowExecuteThread);

1
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -3135,6 +3135,7 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getId(), taskId);
Host host = new Host(processInstance.getHost());
stateEventCallbackService.sendResult(host, taskEventChangeCommand.convert2Command(taskType));
logger.info("Success send command to master: {}, command: {}", host, taskEventChangeCommand);
}
@Override

Loading…
Cancel
Save