Browse Source

[Fix-9222] [master] Support for deploying multiple masters on one node (#9240)

This closes #9222

Co-authored-by: guoshupei <guoshupei@lixiang.com>
3.0.0/version-upgrade
guoshupei 2 years ago committed by GitHub
parent
commit
e2c1cc0579
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  2. 10
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type; import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
@ -124,6 +125,12 @@ public class ServerNodeManager implements InitializingBean {
@Autowired @Autowired
private AlertDao alertDao; private AlertDao alertDao;
/**
* master config
*/
@Autowired
private MasterConfig masterConfig;
private static volatile int MASTER_SLOT = 0; private static volatile int MASTER_SLOT = 0;
private static volatile int MASTER_SIZE = 0; private static volatile int MASTER_SIZE = 0;
@ -338,18 +345,18 @@ public class ServerNodeManager implements InitializingBean {
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) { private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock(); masterLock.lock();
try { try {
String host = NetUtils.getHost(); String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
this.masterNodes.addAll(nodes); this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear(); this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes); this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(host); int index = masterPriorityQueue.getIndex(addr);
if (index >= 0) { if (index >= 0) {
MASTER_SIZE = nodes.size(); MASTER_SIZE = nodes.size();
MASTER_SLOT = index; MASTER_SLOT = index;
} else { } else {
logger.warn("current host:{} is not in active master list", host); logger.warn("current addr:{} is not in active master list", addr);
} }
logger.info("update master nodes, master size: {}, slot: {}", MASTER_SIZE, MASTER_SLOT); logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
} finally { } finally {
masterLock.unlock(); masterLock.unlock();
} }

10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.service.queue; package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -83,17 +84,18 @@ public class MasterPriorityQueue implements TaskPriorityQueue<Server> {
int index = 0; int index = 0;
while (iterator.hasNext()) { while (iterator.hasNext()) {
Server server = iterator.next(); Server server = iterator.next();
hostIndexMap.put(server.getHost(), index); String addr = NetUtils.getAddr(server.getHost(), server.getPort());
hostIndexMap.put(addr, index);
index += 1; index += 1;
} }
} }
public int getIndex(String host) { public int getIndex(String addr) {
if (!hostIndexMap.containsKey(host)) { if (!hostIndexMap.containsKey(addr)) {
return -1; return -1;
} }
return hostIndexMap.get(host); return hostIndexMap.get(addr);
} }
/** /**

Loading…
Cancel
Save