Browse Source

Fix kill task failed will cause the taskGroup cannot release and add taskGroup log (#11469)

3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
4d13a5104b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  2. 91
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

39
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -31,12 +33,8 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
import com.google.auto.service.AutoService;
/**
* common task processor
*/
@ -148,24 +146,29 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskInstance.setState(TaskExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext =
new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
} catch (ExecuteException e) {
logger.error("kill task error:", e);
if (StringUtils.isNotEmpty(taskInstance.getHost())) {
killRemoteTask();
}
} catch (Exception e) {
logger.error("master kill task error, taskInstance id: {}", taskInstance.getId(), e);
return false;
}
logger.info("master kill taskInstance name :{} taskInstance id:{}",
logger.info("master success kill taskInstance name: {} taskInstance id: {}",
taskInstance.getName(), taskInstance.getId());
return true;
}
private void killRemoteTask() throws ExecuteException {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand();
killCommand.setTaskInstanceId(taskInstance.getId());
ExecutionContext executionContext =
new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER, taskInstance);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
}
}

91
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -17,24 +17,25 @@
package org.apache.dolphinscheduler.service.process;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@ -116,8 +117,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SubProcessParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.TaskTimeoutParameter;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.TaskEventChangeCommand;
import org.apache.dolphinscheduler.remote.command.WorkflowStateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.cron.CronUtils;
@ -128,8 +129,11 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Arrays;
@ -144,20 +148,20 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.micrometer.core.annotation.Counted;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR;
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
/**
* process relative dao that some mappers in this.
@ -2981,8 +2985,10 @@ public class ProcessServiceImpl implements ProcessService {
// try to get taskGroup
int count = taskGroupMapper.selectAvailableCountById(groupId);
if (count == 1 && robTaskGroupResource(taskGroupQueue)) {
logger.info("Success acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
return true;
}
logger.info("Failed to acquire taskGroup, taskInstanceId: {}, taskGroupId: {}", taskId, groupId);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return false;
}
@ -2997,11 +3003,13 @@ public class ProcessServiceImpl implements ProcessService {
taskGroupQueue.getId(),
TaskGroupQueueStatus.WAIT_QUEUE.getCode());
if (affectedCount > 0) {
logger.info("Success rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
taskGroupQueue.setStatus(TaskGroupQueueStatus.ACQUIRE_SUCCESS);
this.taskGroupQueueMapper.updateById(taskGroupQueue);
this.taskGroupQueueMapper.updateInQueue(Flag.NO.getCode(), taskGroupQueue.getId());
return true;
}
logger.info("Failed to rob taskGroup, taskInstanceId: {}, taskGroupId: {}", taskGroupQueue.getTaskId(), taskGroupQueue.getId());
return false;
}
@ -3024,26 +3032,31 @@ public class ProcessServiceImpl implements ProcessService {
TaskGroup taskGroup;
TaskGroupQueue thisTaskGroupQueue;
logger.info("Begin to release task group: {}", taskInstance.getTaskGroupId());
try {
do {
taskGroup = taskGroupMapper.selectById(taskInstance.getTaskGroupId());
if (taskGroup == null) {
logger.error("The taskGroup is null, taskGroupId: {}", taskInstance.getTaskGroupId());
return null;
}
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
logger.info("The taskGroupQueue's status is release, taskInstanceId: {}", taskInstance.getId());
return null;
}
} while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode()
&& taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
taskGroup.getUseSize(),
thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1);
} catch (Exception e) {
logger.error("release the task group error", e);
return null;
}
logger.info("updateTask:{}", taskInstance.getName());
logger.info("Finished to release task group, taskGroupId: {}", taskInstance.getTaskGroupId());
logger.info("Begin to release task group queue, taskGroupId: {}", taskInstance.getTaskGroupId());
changeTaskGroupQueueStatus(taskInstance.getId(), TaskGroupQueueStatus.RELEASE);
TaskGroupQueue taskGroupQueue;
do {
@ -3052,11 +3065,13 @@ public class ProcessServiceImpl implements ProcessService {
Flag.NO.getCode(),
Flag.NO.getCode());
if (taskGroupQueue == null) {
logger.info("The taskGroupQueue is null, taskGroup: {}", taskGroup.getId());
return null;
}
} while (this.taskGroupQueueMapper.updateInQueueCAS(Flag.NO.getCode(),
Flag.YES.getCode(),
taskGroupQueue.getId()) != 1);
logger.info("Finished to release task group queue: taskGroupId: {}", taskInstance.getTaskGroupId());
return this.taskInstanceMapper.selectById(taskGroupQueue.getTaskId());
}

Loading…
Cancel
Save