From 45ebd528d6f70df8f23d930a075cc0d7a451946d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=89=91?= <35831367+fengjian1129@users.noreply.github.com> Date: Tue, 6 Sep 2022 10:12:40 +0800 Subject: [PATCH] [Bug-11650][worker] #11650 fix SQL type task, stop task cause NPE (#11668) --- .../worker/processor/TaskKillProcessor.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 3aadb5a5cc..d73cc5b3db 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.server.worker.processor; import com.google.common.base.Preconditions; import com.google.common.base.Strings; + +import io.micrometer.core.lang.NonNull; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; -import lombok.NonNull; -import org.apache.commons.collections4.CollectionUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; @@ -43,6 +43,11 @@ import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable; import org.apache.dolphinscheduler.service.log.LogClientService; + +import org.apache.commons.collections.CollectionUtils; + + + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -78,7 +83,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), - String.format("invalid command type : %s", command.getType())); + String.format("invalid command type : %s", command.getType())); TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class); if (killCommand == null) { logger.error("task kill request command is null"); @@ -88,7 +93,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { int taskInstanceId = killCommand.getTaskInstanceId(); TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); + TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); if (taskExecutionContext == null) { logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId()); return; @@ -110,7 +115,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { Pair> result = doKill(taskExecutionContext); taskExecutionContext.setCurrentExecutionStatus( - result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); + result.getLeft() ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); sendTaskKillResponseCommand(channel, taskExecutionContext); @@ -123,7 +128,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) { TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus()); - taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA))); + if (taskExecutionContext.getAppIds() != null) { + taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA))); + } taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setHost(taskExecutionContext.getHost()); taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); @@ -138,14 +145,20 @@ public class TaskKillProcessor implements NettyRequestProcessor { }); } + /** + * do kill + * + * @return kill result + */ private Pair> doKill(TaskExecutionContext taskExecutionContext) { // kill system process boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId()); + // find log and kill yarn job Pair> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), - taskExecutionContext.getLogPath(), - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTenantCode()); + taskExecutionContext.getLogPath(), + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTenantCode()); return Pair.of(processFlag && yarnResult.getLeft(), yarnResult.getRight()); } @@ -200,8 +213,8 @@ public class TaskKillProcessor implements NettyRequestProcessor { /** * kill yarn job * - * @param host host - * @param logPath logPath + * @param host host + * @param logPath logPath * @param executePath executePath * @param tenantCode tenantCode * @return Pair> yarn kill result @@ -212,7 +225,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { String tenantCode) { if (logPath == null || executePath == null || tenantCode == null) { logger.error("Kill yarn job error, the input params is illegal, host: {}, logPath: {}, executePath: {}, tenantCode: {}", - host, logPath, executePath, tenantCode); + host, logPath, executePath, tenantCode); return Pair.of(false, Collections.emptyList()); } try (LogClientService logClient = new LogClientService()) { @@ -233,4 +246,4 @@ public class TaskKillProcessor implements NettyRequestProcessor { return Pair.of(false, Collections.emptyList()); } -} +} \ No newline at end of file