Browse Source

[Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668)

3.2.0-release
冯剑 2 years ago committed by GitHub
parent
commit
45ebd528d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

39
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@ -43,6 +43,11 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -78,7 +83,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType())); String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class); TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) { if (killCommand == null) {
logger.error("task kill request command is null"); logger.error("task kill request command is null");
@ -88,7 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
int taskInstanceId = killCommand.getTaskInstanceId(); int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext = TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) { if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId()); logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return; return;
@ -110,7 +115,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Pair<Boolean, List<String>> result = doKill(taskExecutionContext); Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus( taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext); sendTaskKillResponseCommand(channel, taskExecutionContext);
@ -123,7 +128,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) { private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus()); taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA))); if (taskExecutionContext.getAppIds() != null) {
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
}
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost()); taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@ -138,14 +145,20 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}); });
} }
/**
* do kill
*
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) { private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
// kill system process // kill system process
boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId()); boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
// find log and kill yarn job // find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(), taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(), taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode()); taskExecutionContext.getTenantCode());
return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight()); return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
} }
@ -200,8 +213,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/** /**
* kill yarn job * kill yarn job
* *
* @param host host * @param host host
* @param logPath logPath * @param logPath logPath
* @param executePath executePath * @param executePath executePath
* @param tenantCode tenantCode * @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result * @return Pair<Boolean, List < String>> yarn kill result
@ -212,7 +225,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
String tenantCode) { String tenantCode) {
if (logPath == null || executePath == null || tenantCode == null) { if (logPath == null || executePath == null || tenantCode == null) {
logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}", logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}",
host, logPath, executePath, tenantCode); host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList()); return Pair.of(false, Collections.emptyList());
} }
try (LogClientService logClient = new LogClientService()) { try (LogClientService logClient = new LogClientService()) {
@ -233,4 +246,4 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return Pair.of(false, Collections.emptyList()); return Pair.of(false, Collections.emptyList());
} }
} }
Loading…
Cancel
Save