Browse Source

Refactor worker (#2103)

* refactor kill logic

* refactor ExecutionContext

* refactor worker
pull/2/head
Tboy 4 years ago committed by GitHub
parent
commit
4eebde8355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  3. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  4. 103
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
  5. 17
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -54,7 +54,7 @@ import java.util.concurrent.ExecutorService;
* master server
*/
@ComponentScan("org.apache.dolphinscheduler")
public class MasterServer implements IStoppable {
public class MasterServer {
/**
* logger of MasterServer
@ -142,8 +142,6 @@ public class MasterServer implements IStoppable {
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
zkMasterClient.setStoppable(this);
// master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient,
@ -180,7 +178,7 @@ public class MasterServer implements IStoppable {
zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server");
}
stop("shutdownhook");
close("shutdownhook");
}
}));
}
@ -190,8 +188,7 @@ public class MasterServer implements IStoppable {
* gracefully stop
* @param cause why stopping
*/
@Override
public synchronized void stop(String cause) {
public void close(String cause) {
try {
//execute only once
@ -225,10 +222,10 @@ public class MasterServer implements IStoppable {
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage());
logger.warn("threadPool service stopped exception:{}",e.getMessage());
}
logger.info("threadpool service stopped");
logger.info("threadPool service stopped");
try {
masterSchedulerService.shutdownNow();

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -47,20 +44,13 @@ import java.util.concurrent.ExecutorService;
* worker server
*/
@ComponentScan("org.apache.dolphinscheduler")
public class WorkerServer implements IStoppable {
public class WorkerServer {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* zk worker client
*/
@Autowired
private ZKWorkerClient zkWorkerClient = null;
/**
* fetch task executor service
@ -130,21 +120,16 @@ public class WorkerServer implements IStoppable {
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
this.workerRegistry.registry();
this.zkWorkerClient.init();
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
zkWorkerClient.setStoppable(this);
/**
* register hooks, which are called before the process exits
*/
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
stop("shutdownHook");
close("shutdownHook");
}
}));
@ -156,8 +141,7 @@ public class WorkerServer implements IStoppable {
}
}
@Override
public synchronized void stop(String cause) {
public void close(String cause) {
try {
//execute only once
@ -195,11 +179,6 @@ public class WorkerServer implements IStoppable {
}
logger.info("worker fetch task service stopped");
try{
zkWorkerClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
latch.countDown();
logger.info("zookeeper service stopped");

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -245,10 +245,6 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master node added : {}", path);
break;
case NODE_REMOVED:
String serverHost = getHostByEventDataPath(path);
if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
return;
}
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
default:

103
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* zookeeper worker client
* single instance
*/
@Component
public class ZKWorkerClient extends AbstractZKClient {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class);
/**
* worker znode
*/
private String workerZNode = null;
/**
* init
*/
public void init(){
logger.info("initialize worker client...");
// init system znode
this.initSystemZNode();
}
/**
* handle path events that this class cares about
* @param client zkClient
* @param event path event
* @param path zk path
*/
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
handleWorkerEvent(event,path);
}
}
/**
* monitor worker
* @param event event
* @param path path
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
default:
break;
}
}
/**
* get worker znode
* @return worker zookeeper node
*/
public String getWorkerZNode() {
return workerZNode;
}
}

17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -369,23 +369,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
}
}
/**
* server self dead, stop all threads
* @param serverHost server host
* @param zkNodeType zookeeper node type
* @return true if server dead and stop all threads
*/
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
if (serverHost.equals(OSUtils.getHost())) {
logger.error("{} server({}) of myself dead , stopping...",
zkNodeType.toString(), serverHost);
stoppable.stop(String.format(" %s server %s of myself dead , stopping...",
zkNodeType.toString(), serverHost));
return true;
}
return false;
}
/**
* get host ip, string format: masterParentPath/ip
* @param path path

Loading…
Cancel
Save