diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 93aac17c66..25a55fd94e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.builder; import static org.apache.dolphinscheduler.common.Constants.SEC_2_MINUTES_TIME_UNIT; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -27,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; /** @@ -60,6 +61,7 @@ public class TaskExecutionContextBuilder { taskExecutionContext.setDelayTime(taskInstance.getDelayTime()); taskExecutionContext.setVarPool(taskInstance.getVarPool()); taskExecutionContext.setDryRun(taskInstance.getDryRun()); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUBMITTED_SUCCESS); return this; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 29da668b9d..a376587aad 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -85,17 +85,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { @Autowired private WorkerManagerThread workerManager; - /** - * Pre-cache task to avoid extreme situations when kill task. There is no such task in the cache - * - * @param taskExecutionContext task - */ - private void setTaskCache(TaskExecutionContext taskExecutionContext) { - TaskExecutionContext preTaskCache = new TaskExecutionContext(); - preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - TaskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache); - } - @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), @@ -118,7 +107,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { return; } - setTaskCache(taskExecutionContext); + // set cache, it will be used when kill task + TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + // todo custom logger taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort())); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 1c8fe55de6..9873f1eaf6 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -86,16 +86,26 @@ public class TaskKillProcessor implements NettyRequestProcessor { } logger.info("task kill command : {}", killCommand); - Pair> result = doKill(killCommand); - - taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); - - TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); + int taskInstanceId = killCommand.getTaskInstanceId(); + TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); if (taskExecutionContext == null) { logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId()); return; } + + Integer processId = taskExecutionContext.getProcessId(); + if (processId.equals(0)) { + workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); + logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); + return; + } + + Pair> result = doKill(taskExecutionContext); + + taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), + new NettyRemoteChannel(channel, command.getOpaque())); + taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE); taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight())); @@ -110,21 +120,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { * * @return kill result */ - private Pair> doKill(TaskKillRequestCommand killCommand) { + private Pair> doKill(TaskExecutionContext taskExecutionContext) { boolean processFlag = true; List appIds = Collections.emptyList(); - int taskInstanceId = killCommand.getTaskInstanceId(); - TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); try { - Integer processId = taskExecutionContext.getProcessId(); - if (processId.equals(0)) { - workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); - logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); - return Pair.of(true, appIds); - } - String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()); if (!StringUtils.isEmpty(pidsStr)) { String cmd = String.format("kill -9 %s", pidsStr); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 0dd28e5f04..49ac9bc6d1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -17,26 +17,24 @@ package org.apache.dolphinscheduler.server.worker.runner; -import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; -import org.apache.dolphinscheduler.server.worker.cache.ResponseCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + /** * Manage tasks */ @@ -105,9 +103,7 @@ public class WorkerManagerThread implements Runnable { if (taskExecutionContext == null) { return; } - TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId()); - responseCommand.setStatus(ExecutionStatus.KILL.getCode()); - ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); + taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.KILL); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); }