From 9564bdeefb06333f464b205f32c6b9522edb412b Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 3 Jan 2023 09:52:03 +0800 Subject: [PATCH] cherry-pick Fix task group cannot release when kill task #13314 --- .../runner/WorkflowExecuteRunnable.java | 45 ++++++++++++------- .../runner/task/CommonTaskProcessor.java | 10 +++-- .../service/process/ProcessServiceImpl.java | 8 +++- .../plugin/task/api/utils/LogUtils.java | 45 ++++++++++++++++++- 4 files changed, 86 insertions(+), 22 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index bfe6903213..939f4baca0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -62,6 +62,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -341,23 +342,33 @@ public class WorkflowExecuteRunnable implements Callable { public boolean checkForceStartAndWakeUp(StateEvent stateEvent) { TaskGroupQueue taskGroupQueue = this.processService.loadTaskGroupQueue(stateEvent.getTaskInstanceId()); if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) { + logger.info("Begin to force start taskGroupQueue: {}", taskGroupQueue.getId()); TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getTaskId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()); + logger.info("Success force start taskGroupQueue: {}", taskGroupQueue.getId()); return true; } if (taskGroupQueue.getInQueue() == Flag.YES.getCode()) { + logger.info("Begin to wake up taskGroupQueue: {}", taskGroupQueue.getId()); boolean acquireTaskGroup = processService.robTaskGroupResource(taskGroupQueue); if (acquireTaskGroup) { TaskInstance taskInstance = this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId()); ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskInstance.getTaskCode()); taskProcessor.action(TaskAction.DISPATCH); + logger.info("Success wake up taskGroupQueue: {}", taskGroupQueue.getId()); return true; } + logger.warn("Failed to wake up taskGroupQueue, taskGroupQueueId: {}", taskGroupQueue.getId()); + return false; + } else { + logger.info( + "Failed to wake up the taskGroupQueue: {}, since the taskGroupQueue is not in queue, will no need to wake up.", + taskGroupQueue); + return true; } - return false; } public void processTimeout() { @@ -425,7 +436,6 @@ public class WorkflowExecuteRunnable implements Callable { * */ public void releaseTaskGroup(TaskInstance taskInstance) { - logger.info("Release task group"); if (taskInstance.getTaskGroupId() > 0) { TaskInstance nextTaskInstance = this.processService.releaseTaskGroup(taskInstance); if (nextTaskInstance != null) { @@ -1765,19 +1775,24 @@ public class WorkflowExecuteRunnable implements Callable { if (taskInstanceId == null || taskInstanceId.equals(0)) { continue; } - TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); - if (taskInstance == null || taskInstance.getState().isFinished()) { - continue; - } - taskProcessor.action(TaskAction.STOP); - if (taskProcessor.taskInstance().getState().isFinished()) { - TaskStateEvent taskStateEvent = TaskStateEvent.builder() - .processInstanceId(processInstance.getId()) - .taskInstanceId(taskInstance.getId()) - .status(taskProcessor.taskInstance().getState()) - .type(StateEventType.TASK_STATE_CHANGE) - .build(); - this.addStateEvent(taskStateEvent); + LogUtils.setWorkflowAndTaskInstanceIDMDC(processInstance.getId(), taskInstanceId); + try { + TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceId); + if (taskInstance == null || taskInstance.getState().isFinished()) { + continue; + } + taskProcessor.action(TaskAction.STOP); + if (taskProcessor.taskInstance().getState().isFinished()) { + TaskStateEvent taskStateEvent = TaskStateEvent.builder() + .processInstanceId(processInstance.getId()) + .taskInstanceId(taskInstance.getId()) + .status(taskProcessor.taskInstance().getState()) + .type(StateEventType.TASK_STATE_CHANGE) + .build(); + this.addStateEvent(taskStateEvent); + } + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 73326720f6..5b2bf630e2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -138,11 +138,13 @@ public class CommonTaskProcessor extends BaseTaskProcessor { public boolean killTask() { try { - taskInstance = processService.findTaskInstanceById(taskInstance.getId()); + logger.info("Begin to kill task: {}", taskInstance.getName()); if (taskInstance == null) { + logger.warn("Kill task failed, the task instance is not exist"); return true; } if (taskInstance.getState().isFinished()) { + logger.warn("Kill task failed, the task instance is already finished"); return true; } // we don't wait the kill response @@ -153,12 +155,12 @@ public class CommonTaskProcessor extends BaseTaskProcessor { killRemoteTask(); } } catch (Exception e) { - logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e); + logger.error("Master kill task: {} error, taskInstance id: {}", taskInstance.getName(), + taskInstance.getId(), e); return false; } - logger.info("master success kill taskInstance name: {} taskInstance id: {}", - taskInstance.getName(), taskInstance.getId()); + logger.info("Master success kill task: {}, taskInstanceId: {}", taskInstance.getName(), taskInstance.getId()); return true; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index e580f1f74d..8e63b3128f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -3040,6 +3040,10 @@ public class ProcessServiceImpl implements ProcessService { logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); return null; } + if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.WAIT_QUEUE) { + logger.info("The taskGroupQueue's status is in waiting, will not need to release task group"); + break; + } } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize(), @@ -3066,7 +3070,8 @@ public class ProcessServiceImpl implements ProcessService { } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1); - logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId()); + logger.info("Finished to release task group queue: taskGroupId: {}, taskGroupQueueId: {}", + taskInstance.getTaskGroupId(), taskGroupQueue.getId()); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); } @@ -3080,6 +3085,7 @@ public class ProcessServiceImpl implements ProcessService { @Override public void changeTaskGroupQueueStatus(int taskId, TaskGroupQueueStatus status) { TaskGroupQueue taskGroupQueue = taskGroupQueueMapper.queryByTaskId(taskId); + taskGroupQueue.setInQueue(Flag.NO.getCode()); taskGroupQueue.setStatus(status); taskGroupQueue.setUpdateTime(new Date(System.currentTimeMillis())); taskGroupQueueMapper.updateById(taskGroupQueue); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java index c3833c071a..c05c96dfcb 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -17,10 +17,10 @@ package org.apache.dolphinscheduler.plugin.task.api.utils; +import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; @@ -37,6 +37,7 @@ import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; +import org.slf4j.MDC; @Slf4j @UtilityClass @@ -73,4 +74,44 @@ public class LogUtils { return Collections.emptyList(); } } + + public static String readWholeFileContent(String filePath) { + String line; + StringBuilder sb = new StringBuilder(); + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)))) { + while ((line = br.readLine()) != null) { + sb.append(line + "\r\n"); + } + return sb.toString(); + } catch (IOException e) { + log.error("read file error", e); + } + return ""; + } + + public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) { + setWorkflowInstanceIdMDC(workflowInstanceId); + setTaskInstanceIdMDC(taskInstanceId); + } + + public static void setWorkflowInstanceIdMDC(Integer workflowInstanceId) { + MDC.put(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); + } + + public static void setTaskInstanceIdMDC(Integer taskInstanceId) { + MDC.put(Constants.TASK_INSTANCE_ID_MDC_KEY, String.valueOf(taskInstanceId)); + } + + public static void removeWorkflowAndTaskInstanceIdMDC() { + removeWorkflowInstanceIdMDC(); + removeTaskInstanceIdMDC(); + } + + public static void removeWorkflowInstanceIdMDC() { + MDC.remove(Constants.WORKFLOW_INSTANCE_ID_MDC_KEY); + } + + public static void removeTaskInstanceIdMDC() { + MDC.remove(Constants.TASK_INSTANCE_ID_MDC_KEY); + } }