Browse Source

[Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668) (#11958)

Co-authored-by: 冯剑 <35831367+fengjian1129@users.noreply.github.com>
3.1.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
f034a09d25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

39
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.micrometer.core.lang.NonNull;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@ -43,6 +43,11 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -78,7 +83,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) {
logger.error("task kill request command is null");
@ -88,7 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
@ -110,7 +115,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
@ -123,7 +128,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
if (taskExecutionContext.getAppIds() != null) {
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
}
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@ -138,14 +145,20 @@ public class TaskKillProcessor implements NettyRequestProcessor {
});
}
/**
* do kill
*
* @return kill result
*/
private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
// kill system process
boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
// find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTenantCode());
return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight());
}
@ -200,8 +213,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* kill yarn job
*
* @param host host
* @param logPath logPath
* @param host host
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
* @return Pair<Boolean, List < String>> yarn kill result
@ -212,7 +225,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
String tenantCode) {
if (logPath == null || executePath == null || tenantCode == null) {
logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}",
host, logPath, executePath, tenantCode);
host, logPath, executePath, tenantCode);
return Pair.of(false, Collections.emptyList());
}
try (LogClientService logClient = new LogClientService()) {
@ -233,4 +246,4 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return Pair.of(false, Collections.emptyList());
}
}
}
Loading…
Cancel
Save