Browse Source

refactor worker

pull/2/head
Technoboy- 5 years ago
parent
commit
aeb8b852e7
  1. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  3. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  4. 103
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java
  5. 17
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -54,7 +54,7 @@ import java.util.concurrent.ExecutorService;
* master server * master server
*/ */
@ComponentScan("org.apache.dolphinscheduler") @ComponentScan("org.apache.dolphinscheduler")
public class MasterServer implements IStoppable { public class MasterServer {
/** /**
* logger of MasterServer * logger of MasterServer
@ -142,8 +142,6 @@ public class MasterServer implements IStoppable {
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
zkMasterClient.setStoppable(this);
// master scheduler thread // master scheduler thread
MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread(
zkMasterClient, zkMasterClient,
@ -180,7 +178,7 @@ public class MasterServer implements IStoppable {
zkMasterClient.getAlertDao().sendServerStopedAlert( zkMasterClient.getAlertDao().sendServerStopedAlert(
1, OSUtils.getHost(), "Master-Server"); 1, OSUtils.getHost(), "Master-Server");
} }
stop("shutdownhook"); close("shutdownhook");
} }
})); }));
} }
@ -190,8 +188,7 @@ public class MasterServer implements IStoppable {
* gracefully stop * gracefully stop
* @param cause why stopping * @param cause why stopping
*/ */
@Override public void close(String cause) {
public synchronized void stop(String cause) {
try { try {
//execute only once //execute only once
@ -225,10 +222,10 @@ public class MasterServer implements IStoppable {
try { try {
ThreadPoolExecutors.getInstance().shutdown(); ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){ }catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage()); logger.warn("threadPool service stopped exception:{}",e.getMessage());
} }
logger.info("threadpool service stopped"); logger.info("threadPool service stopped");
try { try {
masterSchedulerService.shutdownNow(); masterSchedulerService.shutdownNow();

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.server.worker; package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -29,9 +28,7 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.ZKWorkerClient;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.TaskQueueFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -47,20 +44,13 @@ import java.util.concurrent.ExecutorService;
* worker server * worker server
*/ */
@ComponentScan("org.apache.dolphinscheduler") @ComponentScan("org.apache.dolphinscheduler")
public class WorkerServer implements IStoppable { public class WorkerServer {
/** /**
* logger * logger
*/ */
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class); private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
* zk worker client
*/
@Autowired
private ZKWorkerClient zkWorkerClient = null;
/** /**
* fetch task executor service * fetch task executor service
@ -130,21 +120,16 @@ public class WorkerServer implements IStoppable {
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
this.workerRegistry.registry(); this.workerRegistry.registry();
this.zkWorkerClient.init();
this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor"); this.fetchTaskExecutorService = ThreadUtils.newDaemonSingleThreadExecutor("Worker-Fetch-Thread-Executor");
zkWorkerClient.setStoppable(this);
/** /**
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
stop("shutdownHook"); close("shutdownHook");
} }
})); }));
@ -156,8 +141,7 @@ public class WorkerServer implements IStoppable {
} }
} }
@Override public void close(String cause) {
public synchronized void stop(String cause) {
try { try {
//execute only once //execute only once
@ -195,11 +179,6 @@ public class WorkerServer implements IStoppable {
} }
logger.info("worker fetch task service stopped"); logger.info("worker fetch task service stopped");
try{
zkWorkerClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
latch.countDown(); latch.countDown();
logger.info("zookeeper service stopped"); logger.info("zookeeper service stopped");

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -245,10 +245,6 @@ public class ZKMasterClient extends AbstractZKClient {
logger.info("master node added : {}", path); logger.info("master node added : {}", path);
break; break;
case NODE_REMOVED: case NODE_REMOVED:
String serverHost = getHostByEventDataPath(path);
if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
return;
}
removeZKNodePath(path, ZKNodeType.MASTER, true); removeZKNodePath(path, ZKNodeType.MASTER, true);
break; break;
default: default:

103
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

@ -1,103 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* zookeeper worker client
* single instance
*/
@Component
public class ZKWorkerClient extends AbstractZKClient {
/**
* logger
*/
private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class);
/**
* worker znode
*/
private String workerZNode = null;
/**
* init
*/
public void init(){
logger.info("initialize worker client...");
// init system znode
this.initSystemZNode();
}
/**
* handle path events that this class cares about
* @param client zkClient
* @param event path event
* @param path zk path
*/
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
if(path.startsWith(getZNodeParentPath(ZKNodeType.WORKER)+Constants.SINGLE_SLASH)){
handleWorkerEvent(event,path);
}
}
/**
* monitor worker
* @param event event
* @param path path
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
case NODE_ADDED:
logger.info("worker node added : {}", path);
break;
case NODE_REMOVED:
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
default:
break;
}
}
/**
* get worker znode
* @return worker zookeeper node
*/
public String getWorkerZNode() {
return workerZNode;
}
}

17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -369,23 +369,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
} }
} }
/**
* server self dead, stop all threads
* @param serverHost server host
* @param zkNodeType zookeeper node type
* @return true if server dead and stop all threads
*/
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
if (serverHost.equals(OSUtils.getHost())) {
logger.error("{} server({}) of myself dead , stopping...",
zkNodeType.toString(), serverHost);
stoppable.stop(String.format(" %s server %s of myself dead , stopping...",
zkNodeType.toString(), serverHost));
return true;
}
return false;
}
/** /**
* get host ip, string format: masterParentPath/ip * get host ip, string format: masterParentPath/ip
* @param path path * @param path path

Loading…
Cancel
Save