From d810c487fd8b015faefb72dca92ed41fb719b60d Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 25 May 2023 14:42:32 +0800 Subject: [PATCH] Fix taskGroupQueue doesn't removed from inQueue where wakeup failed (#14200) * Fix taskGroupQueue doesn't removed from inQueue where wakeup failed * Fix avaliable taskGroup is 0 the TaskGroupQueue doesn't out queue --- .../runner/WorkflowExecuteRunnable.java | 43 ++++++++++------- .../runner/operator/TaskTimeoutOperator.java | 35 +++++++++++++- .../service/process/ProcessServiceImpl.java | 46 +++++++++++-------- 3 files changed, 87 insertions(+), 37 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 7f77a5c99f..c7b7a37bac 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -496,24 +496,33 @@ public class WorkflowExecuteRunnable implements Callable { * */ public void releaseTaskGroup(TaskInstance taskInstance) throws RemotingException, InterruptedException { - if (taskInstance.getTaskGroupId() > 0) { - TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); - if (nextTaskInstance != null) { - if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) { - TaskStateEvent nextEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) - .taskInstanceId(nextTaskInstance.getId()) - .type(StateEventType.WAKE_UP_TASK_GROUP) - .build(); - this.stateEvents.add(nextEvent); - } else { - ProcessInstance processInstance = - this.processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - this.masterRpcClient.sendSyncCommand(Host.of(processInstance.getHost()), - new TaskWakeupRequest(processInstance.getId(), nextTaskInstance.getId()).convert2Command()); - } - } + // todo: use Integer + if (taskInstance.getTaskGroupId() <= 0) { + log.info("The current TaskInstance: {} doesn't use taskGroup, no need to release taskGroup", + taskInstance.getName()); + } + TaskInstance nextTaskInstance = processService.releaseTaskGroup(taskInstance); + if (nextTaskInstance == null) { + log.info( + "The current TaskInstance: {} is the last taskInstance in the taskGroup, no need to wakeup next taskInstance", + taskInstance.getName()); + return; + } + if (nextTaskInstance.getProcessInstanceId() == taskInstance.getProcessInstanceId()) { + TaskStateEvent nextEvent = TaskStateEvent.builder() + .processInstanceId(processInstance.getId()) + .taskInstanceId(nextTaskInstance.getId()) + .type(StateEventType.WAKE_UP_TASK_GROUP) + .build(); + stateEvents.add(nextEvent); + } else { + ProcessInstance processInstance = + processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); + masterRpcClient.sendSyncCommand( + Host.of(processInstance.getHost()), + new TaskWakeupRequest(processInstance.getId(), nextTaskInstance.getId()).convert2Command()); } + log.info("Success send wakeup message to next taskInstance: {}", nextTaskInstance.getId()); } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java index 17de1e9539..146c3b894d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java @@ -19,9 +19,18 @@ package org.apache.dolphinscheduler.server.master.runner.operator; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; +import org.apache.dolphinscheduler.remote.command.task.TaskKillRequest; +import org.apache.dolphinscheduler.remote.exceptions.RemotingException; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; +import org.apache.commons.lang3.StringUtils; + +import java.util.Date; + import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -34,6 +43,9 @@ public class TaskTimeoutOperator implements TaskOperator { @Autowired private TaskInstanceDao taskInstanceDao; + @Autowired + private MasterRpcClient masterRpcClient; + @Override public void handle(DefaultTaskExecuteRunnable taskExecuteRunnable) { // Right now, if the task is running in worker, the timeout strategy will be handled at worker side. @@ -48,8 +60,27 @@ public class TaskTimeoutOperator implements TaskOperator { taskInstance.getName(), taskTimeoutStrategy.name()); return; } - taskExecuteRunnable.kill(); - log.info("TaskInstance: {} timeout, killed the task instance", taskInstance.getName()); + try { + timeoutTaskInstanceInDB(taskInstance); + killRemoteTaskInstanceInThreadPool(taskInstance); + log.info("TaskInstance: {} timeout, killed the task instance", taskInstance.getName()); + } catch (Exception ex) { + log.error("TaskInstance timeout {} failed", taskInstance.getName(), ex); + } } + + private void timeoutTaskInstanceInDB(TaskInstance taskInstance) { + taskInstance.setState(TaskExecutionStatus.FAILURE); + taskInstance.setEndTime(new Date()); + taskInstanceDao.updateTaskInstance(taskInstance); + } + + private void killRemoteTaskInstanceInThreadPool(TaskInstance taskInstance) throws RemotingException { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return; + } + TaskKillRequest killCommand = new TaskKillRequest(taskInstance.getId()); + masterRpcClient.send(Host.of(taskInstance.getHost()), killCommand.convert2Command()); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index d0f56f329f..485e357509 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -2334,14 +2334,17 @@ public class ProcessServiceImpl implements ProcessService { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupId); if (taskGroup == null) { // we don't throw exception here, to avoid the task group has been deleted during workflow running + log.warn("The taskGroup is not exist no need to acquire taskGroup, taskGroupId: {}", taskGroupId); return true; } // if task group is not applicable if (taskGroup.getStatus() == Flag.NO.getCode()) { + log.warn("The taskGroup status is {}, no need to acquire taskGroup, taskGroupId: {}", taskGroup.getStatus(), + taskGroupId); return true; } // Create a waiting taskGroupQueue, after acquire resource, we can update the status to ACQUIRE_SUCCESS - TaskGroupQueue taskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstanceId); + TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstanceId); if (taskGroupQueue == null) { taskGroupQueue = insertIntoTaskGroupQueue( taskInstanceId, @@ -2350,14 +2353,12 @@ public class ProcessServiceImpl implements ProcessService { workflowInstanceId, taskGroupPriority, TaskGroupQueueStatus.WAIT_QUEUE); + log.info("Insert TaskGroupQueue: {} successfully", taskGroupQueue.getId()); } else { log.info("The task queue is already exist, taskId: {}", taskInstanceId); if (taskGroupQueue.getStatus() == TaskGroupQueueStatus.ACQUIRE_SUCCESS) { return true; } - taskGroupQueue.setInQueue(Flag.NO.getCode()); - taskGroupQueue.setStatus(TaskGroupQueueStatus.WAIT_QUEUE); - this.taskGroupQueueMapper.updateById(taskGroupQueue); } // check if there already exist higher priority tasks List highPriorityTasks = taskGroupQueueMapper.queryHighPriorityTasks( @@ -2368,14 +2369,15 @@ public class ProcessServiceImpl implements ProcessService { return false; } // try to get taskGroup - int count = taskGroupMapper.selectAvailableCountById(taskGroupId); - if (count == 1 && robTaskGroupResource(taskGroupQueue)) { - log.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskInstanceId, taskGroupId); - return true; + int availableTaskGroupCount = taskGroupMapper.selectAvailableCountById(taskGroupId); + if (availableTaskGroupCount < 1) { + log.info( + "Failed to acquire taskGroup, there is no avaliable taskGroup, taskInstanceId: {}, taskGroupId: {}", + taskInstanceId, taskGroupId); + taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); + return false; } - log.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskInstanceId, taskGroupId); - this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); - return false; + return robTaskGroupResource(taskGroupQueue); } /** @@ -2387,10 +2389,13 @@ public class ProcessServiceImpl implements ProcessService { for (int i = 0; i < 10; i++) { TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId()); if (taskGroup.getGroupSize() <= taskGroup.getUseSize()) { + // remove + taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); log.info("The current task Group is full, taskGroup: {}", taskGroup); return false; } - int affectedCount = taskGroupMapper.robTaskGroupResource(taskGroup.getId(), + int affectedCount = taskGroupMapper.robTaskGroupResource( + taskGroup.getId(), taskGroup.getUseSize(), taskGroupQueue.getId(), TaskGroupQueueStatus.WAIT_QUEUE.getCode()); @@ -2404,6 +2409,7 @@ public class ProcessServiceImpl implements ProcessService { } } log.info("Failed to rob taskGroup, taskGroupQueue: {}", taskGroupQueue); + taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return false; } @@ -2431,10 +2437,11 @@ public class ProcessServiceImpl implements ProcessService { do { taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); if (taskGroup == null) { - log.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId()); + log.error("The taskGroup is not exist no need to release taskGroup, taskGroupId: {}", + taskInstance.getTaskGroupId()); return null; } - thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); + thisTaskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { log.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); return null; @@ -2458,20 +2465,22 @@ public class ProcessServiceImpl implements ProcessService { changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); TaskGroupQueue taskGroupQueue; do { - taskGroupQueue = this.taskGroupQueueMapper.queryTheHighestPriorityTasks(taskGroup.getId(), + taskGroupQueue = taskGroupQueueMapper.queryTheHighestPriorityTasks( + taskGroup.getId(), TaskGroupQueueStatus.WAIT_QUEUE.getCode(), Flag.NO.getCode(), Flag.NO.getCode()); if (taskGroupQueue == null) { - log.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId()); + log.info("There is no taskGroupQueue need to be wakeup taskGroup: {}", taskGroup.getId()); return null; } - } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), + } while (this.taskGroupQueueMapper.updateInQueueCAS( + Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1); log.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}", taskInstance.getTaskGroupId(), taskGroupQueue.getId()); - return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); + return taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); } /** @@ -2505,6 +2514,7 @@ public class ProcessServiceImpl implements ProcessService { .processId(workflowInstanceId) .priority(taskGroupPriority) .status(status) + .forceStart(Flag.NO.getCode()) .inQueue(Flag.NO.getCode()) .createTime(now) .updateTime(now)