分布式调度框架。
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
3.7 KiB

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
4 years ago
/**
* worker registry
*/
public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
4 years ago
/**
* zookeeper registry center
*/
private final ZookeeperRegistryCenter zookeeperRegistryCenter;
4 years ago
/**
* port
*/
private final int port;
4 years ago
/**
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.port = port;
}
4 years ago
/**
* registry
*/
public void registry() {
String address = Constants.LOCAL_ADDRESS;
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if(newState == ConnectionState.LOST){
logger.error("worker : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
logger.info("worker : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("worker : {} connection SUSPENDED ", address);
}
}
});
logger.info("worker node : {} registry to ZK successfully.", address);
}
4 years ago
/**
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
logger.info("worker node : {} unRegistry to ZK.", address);
}
4 years ago
/**
* get worker path
* @return
*/
private String getWorkerPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address;
return localNodePath;
}
4 years ago
/**
* get local address
* @return
*/
private String getLocalAddress(){
return Constants.LOCAL_ADDRESS + ":" + port;
}
}