Browse Source

[Bug-8110][WorkerServer] kill all running task before worker stop (#8490)

* kill all running task before worker stop

* kill local process

* remove kill yarn job

* adjust the orders of close

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
caishunfeng 3 years ago committed by GitHub
parent
commit
f2b9796bc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java
  2. 27
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

5
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager {
taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request); taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request);
return taskRequestContextCache.containsKey(request.getTaskInstanceId()); return taskRequestContextCache.containsKey(request.getTaskInstanceId());
} }
public static Collection<TaskRequest> getAllTaskRequestList() {
return taskRequestContextCache.values();
}
} }

27
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -37,7 +37,12 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
import java.util.Set; import java.util.Set;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -201,6 +206,11 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.workerRegistryClient.unRegistry(); this.workerRegistryClient.unRegistry();
this.alertClientService.close(); this.alertClientService.close();
// kill running tasks
this.killAllRunningTasks();
// close the application context
this.springApplicationContext.close(); this.springApplicationContext.close();
} catch (Exception e) { } catch (Exception e) {
logger.error("worker server stop exception ", e); logger.error("worker server stop exception ", e);
@ -211,4 +221,21 @@ public class WorkerServer implements IStoppable {
public void stop(String cause) { public void stop(String cause) {
close(cause); close(cause);
} }
/**
* kill all tasks which are running
*/
public void killAllRunningTasks() {
Collection<TaskRequest> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
logger.info("ready to kill all cache job, job size:{}", taskRequests.size());
if (CollectionUtils.isEmpty(taskRequests)) {
return;
}
for (TaskRequest taskRequest : taskRequests) {
// kill task when it's not finished yet
org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest);
}
}
} }

Loading…
Cancel
Save