/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.COLON; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.SLASH; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import java.util.Date; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.google.common.collect.Sets; /** * worker registry */ @Service public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); /** * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; /** * worker config */ @Autowired private WorkerConfig workerConfig; /** * heartbeat executor */ private ScheduledExecutorService heartBeatExecutor; /** * worker start time */ private String startTime; private Set workerGroups; @PostConstruct public void init() { this.workerGroups = workerConfig.getWorkerGroups(); this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } /** * registry */ public void registry() { String address = NetUtils.getHost(); Set workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); for (String workerZKPath : workerZkPaths) { zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if (newState == ConnectionState.LOST) { logger.error("worker : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("worker : {} reconnected to zookeeper", address); zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("worker : {} connection SUSPENDED ", address); } } }); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, this.workerConfig.getWorkerReservedMemory(), this.workerConfig.getWorkerMaxCpuloadAvg(), workerZkPaths, this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); } /** * remove registry info */ public void unRegistry() { String address = getLocalAddress(); Set workerZkPaths = getWorkerZkPaths(); for (String workerZkPath : workerZkPaths) { zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath); logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); } this.heartBeatExecutor.shutdownNow(); } /** * get worker path */ private Set getWorkerZkPaths() { Set workerZkPaths = Sets.newHashSet(); String address = getLocalAddress(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String weight = getWorkerWeight(); String workerStartTime = COLON + System.currentTimeMillis(); for (String workGroup : this.workerGroups) { StringBuilder workerZkPathBuilder = new StringBuilder(100); workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH); if (StringUtils.isEmpty(workGroup)) { workGroup = DEFAULT_WORKER_GROUP; } // trim and lower case is need workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(address); workerZkPathBuilder.append(weight); workerZkPathBuilder.append(workerStartTime); workerZkPaths.add(workerZkPathBuilder.toString()); } return workerZkPaths; } /** * get local address */ private String getLocalAddress() { return NetUtils.getHost() + COLON + workerConfig.getListenPort(); } /** * get Worker Weight */ private String getWorkerWeight() { return COLON + workerConfig.getWeight(); } }