From 4d13a5104b9a4ab16e16253ad2936107031e59e4 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Tue, 23 Aug 2022 10:05:03 +0800 Subject: [PATCH] Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469) --- .../runner/task/CommonTaskProcessor.java | 39 ++++---- .../service/process/ProcessServiceImpl.java | 91 +++++++++++-------- 2 files changed, 74 insertions(+), 56 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 24b7fd0a7c..e5413c1490 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.runner.task; +import com.google.auto.service.AutoService; +import org.apache.commons.lang3.StringUtils; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; @@ -31,12 +33,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import org.apache.commons.lang3.StringUtils; - import java.util.Date; -import com.google.auto.service.AutoService; - /** * common task processor */ @@ -148,24 +146,29 @@ public class CommonTaskProcessor extends BaseTaskProcessor { taskInstance.setState(TaskExecutionStatus.KILL); taskInstance.setEndTime(new Date()); processService.updateTaskInstance(taskInstance); - - TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); - killCommand.setTaskInstanceId(taskInstance.getId()); - - ExecutionContext executionContext = - new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance); - - Host host = Host.of(taskInstance.getHost()); - executionContext.setHost(host); - - nettyExecutorManager.executeDirectly(executionContext); - } catch (ExecuteException e) { - logger.error("kill task error:", e); + if (StringUtils.isNotEmpty(taskInstance.getHost())) { + killRemoteTask(); + } + } catch (Exception e) { + logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e); return false; } - logger.info("master kill taskInstance name :{} taskInstance id:{}", + logger.info("master success kill taskInstance name: {} taskInstance id: {}", taskInstance.getName(), taskInstance.getId()); return true; } + + private void killRemoteTask() throws ExecuteException { + TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); + killCommand.setTaskInstanceId(taskInstance.getId()); + + ExecutionContext executionContext = + new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance); + + Host host = Host.of(taskInstance.getHost()); + executionContext.setHost(host); + + nettyExecutorManager.executeDirectly(executionContext); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index b4faf51168..130e9f1ae9 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -17,24 +17,25 @@ package org.apache.dolphinscheduler.service.process; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; -import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; -import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; -import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; -import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; -import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; - -import static java.util.stream.Collectors.toSet; - +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import io.micrometer.core.annotation.Counted; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.ReleaseState; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; +import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.graph.DAG; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNodeRelation; @@ -116,8 +117,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter; -import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand; +import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.cron.CronUtils; @@ -128,8 +129,11 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.Arrays; @@ -144,20 +148,20 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Transactional; - -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - -import io.micrometer.core.annotation.Counted; +import static java.util.stream.Collectors.toSet; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST; +import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE; +import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID; +import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS; +import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR; +import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN; +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID; /** * process relative dao that some mappers in this. @@ -2981,8 +2985,10 @@ public class ProcessServiceImpl implements ProcessService { // try to get taskGroup int count = taskGroupMapper.selectAvailableCountById(groupId); if (count == 1 && robTaskGroupResource(taskGroupQueue)) { + logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId); return true; } + logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId); this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return false; } @@ -2997,11 +3003,13 @@ public class ProcessServiceImpl implements ProcessService { taskGroupQueue.getId(), TaskGroupQueueStatus.WAIT_QUEUE.getCode()); if (affectedCount > 0) { + logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId()); taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS); this.taskGroupQueueMapper.updateById(taskGroupQueue); this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId()); return true; } + logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId()); return false; } @@ -3024,26 +3032,31 @@ public class ProcessServiceImpl implements ProcessService { TaskGroup taskGroup; TaskGroupQueue thisTaskGroupQueue; + logger.info("Begin to release task group: {}", taskInstance.getTaskGroupId()); try { do { taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId()); if (taskGroup == null) { + logger.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId()); return null; } thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId()); if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) { + logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId()); return null; } } while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), - taskGroup.getUseSize(), - thisTaskGroupQueue.getId(), - TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); + taskGroup.getUseSize(), + thisTaskGroupQueue.getId(), + TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1); } catch (Exception e) { logger.error("release the task group error", e); return null; } - logger.info("updateTask:{}", taskInstance.getName()); + logger.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId()); + + logger.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId()); changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE); TaskGroupQueue taskGroupQueue; do { @@ -3052,11 +3065,13 @@ public class ProcessServiceImpl implements ProcessService { Flag.NO.getCode(), Flag.NO.getCode()); if (taskGroupQueue == null) { + logger.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId()); return null; } } while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(), Flag.YES.getCode(), taskGroupQueue.getId()) != 1); + logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId()); return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId()); }