Browse Source

cherry-pick [Bug-8110][WorkerServer] kill all running task before worker stop (#8508)

2.0.7-release
caishunfeng 3 years ago committed by GitHub
parent
commit
6d0e367f69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  2. 5
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -39,6 +39,12 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
import javax.annotation.PostConstruct;
@ -141,7 +147,7 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager));
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new TaskKillAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK, new TaskKillAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor());
@ -198,6 +204,11 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.close();
this.workerRegistryClient.unRegistry();
this.alertClientService.close();
// kill running tasks
this.killAllRunningTasks();
// close the application context
this.springApplicationContext.close();
logger.info("springApplicationContext close");
try {
@ -217,4 +228,21 @@ public class WorkerServer implements IStoppable {
public void stop(String cause) {
close(cause);
}
/**
* kill all tasks which are running
*/
public void killAllRunningTasks() {
Collection<TaskRequest> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
if (CollectionUtils.isEmpty(taskRequests)) {
return;
}
for (TaskRequest taskRequest : taskRequests) {
// kill task when it's not finished yet
org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
}
}
}

5
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager {
taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request);
return taskRequestContextCache.containsKey(request.getTaskInstanceId());
}
public static Collection<TaskRequest> getAllTaskRequestList() {
return taskRequestContextCache.values();
}
}

Loading…
Cancel
Save