Browse Source

rpc

pull/3/MERGE
CalvinKirs 4 years ago
parent
commit
fd051530fa
  1. 10
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java
  2. 21
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
  3. 12
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
  4. 4
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java
  5. 1
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java
  6. 19
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java
  7. 59
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/directory/Directory.java
  8. 87
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java
  9. 56
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java
  10. 141
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java

10
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java

@ -0,0 +1,10 @@
package org.apache.dolphinscheduler.remote.rpc;
/**
* @author jiangli
* @date 2021-01-11 21:05
*/
public interface IUserService {
String say();
}

21
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java

@ -0,0 +1,21 @@
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient;
import org.apache.dolphinscheduler.remote.rpc.client.RpcClient;
/**
* @author jiangli
* @date 2021-01-11 21:06
*/
public class MainTest {
public static void main(String[] args) throws Exception {
RpcClient rpcClient = new RpcClient();
IUserService userService = rpcClient.create(IUserService.class);
for (int i = 0; i < 100; i++) {
userService.say();
}
}
}

12
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java

@ -0,0 +1,12 @@
package org.apache.dolphinscheduler.remote.rpc;
/**
* @author jiangli
* @date 2021-01-11 21:05
*/
public class UserService implements IUserService{
@Override
public String say() {
return null;
}
}

4
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java

@ -4,6 +4,10 @@ import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import com.amazonaws.Response;
public interface Filter { public interface Filter {

1
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java

@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.utils.Host;
/** /**
* @author jiangli * @author jiangli

19
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java

@ -3,6 +3,13 @@ package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.filter.directory.Directory;
import org.apache.dolphinscheduler.remote.rpc.selector.RandomSelector;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,6 +41,18 @@ public class SelectorFilter implements Filter {
@Override @Override
public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable { public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable {
Directory.getInstance().addServer("default","127.0.0.1:8080");
Directory.getInstance().addServer("default","127.0.0.2:8080");
Directory.getInstance().addServer("default","127.0.0.3:8080");
List<String> hosts = Directory.getInstance().getDirectory("default");
List<Host> candidateHosts = new ArrayList<>(hosts.size());
hosts.forEach(node -> {
Host nodeHost = Host.of(node);
nodeHost.setWorkGroup("default");
candidateHosts.add(nodeHost);
});
RandomSelector randomSelector = new RandomSelector();
System.out.println(randomSelector.doSelect(candidateHosts));
RpcResponse rsp = new RpcResponse(); RpcResponse rsp = new RpcResponse();
rsp.setMsg("ms"); rsp.setMsg("ms");
return rsp; return rsp;

59
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/directory/Directory.java

@ -0,0 +1,59 @@
package org.apache.dolphinscheduler.remote.rpc.filter.directory;
import org.apache.dolphinscheduler.remote.rpc.filter.SelectorFilter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Directory
*/
public class Directory {
private static final Logger logger = LoggerFactory.getLogger(Directory.class);
private SelectorFilter selectorFilter = SelectorFilter.getInstance();
public static Directory getInstance() {
return Directory.DirectoryInner.INSTANCE;
}
private static class DirectoryInner {
private static final Directory INSTANCE = new Directory();
}
private Directory() {
}
private ConcurrentHashMap<String, List<String>> directoryMap = new ConcurrentHashMap<>();
public List<String> getDirectory(String serviceName) {
return directoryMap.get(serviceName);
}
public boolean addServer(String serviceName, String servicePath) {
synchronized (this) {
if (directoryMap.containsKey(serviceName)) {
directoryMap.get(serviceName).add(servicePath);
return true;
}
}
directoryMap.putIfAbsent(serviceName, new ArrayList<>(Collections.singletonList(servicePath)));
return true;
}
public boolean removeServer(String serviceName, String servicePath) {
return true;
}
}

87
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.filter.selector;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* host weight
*/
public class HostWeight {
private final int CPU_FACTOR = 10;
private final int MEMORY_FACTOR = 20;
private final int LOAD_AVERAGE_FACTOR = 70;
private final Host host;
private final double weight;
private double currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = getWeight(cpu, memory, loadAverage, host);
this.host = host;
this.currentWeight = weight;
}
public double getCurrentWeight() {
return currentWeight;
}
public double getWeight() {
return weight;
}
public void setCurrentWeight(double currentWeight) {
this.currentWeight = currentWeight;
}
public Host getHost() {
return host;
}
@Override
public String toString() {
return "HostWeight{"
+ "host=" + host
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ '}';
}
private double getWeight(double cpu, double memory, double loadAverage, Host host) {
double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
return getWarmUpWeight(host, calculateWeight);
}
/**
* If the warm-up is not over, add the weight
*/
private double getWarmUpWeight(Host host, double weight) {
long startTime = host.getStartTime();
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return weight * Constants.WARM_UP_TIME / uptime;
}
return weight;
}
}

56
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.filter.selector;
import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector;
import java.util.Collection;
/**
* lower weight round robin
*/
public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
/**
* select
*
* @param sources sources
* @return HostWeight
*/
@Override
public HostWeight doSelect(Collection<HostWeight> sources) {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
}

141
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.filter.selector;
import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Smooth Weight Round Robin
*/
public class RoundRobinSelector extends AbstractSelector<Host> {
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> workGroupWeightMap = new ConcurrentHashMap<>();
private static final int RECYCLE_PERIOD = 100000;
private AtomicBoolean updateLock = new AtomicBoolean();
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
int getWeight() {
return weight;
}
void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
long increaseCurrent() {
return current.addAndGet(weight);
}
void sel(int total) {
current.addAndGet(-1L * total);
}
long getLastUpdate() {
return lastUpdate;
}
void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
@Override
public Host doSelect(Collection<Host> source) {
List<Host> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkGroup();
ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
map = workGroupWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Host selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
for (Host host : hosts) {
String workGroupHost = host.getWorkGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
int weight = host.getWeight();
if (weight < 0) {
weight = 0;
}
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// set weight
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(workGroupHost, weightedRoundRobin);
weightedRoundRobin = map.get(workGroupHost);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedHost = host;
selectWeightRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
workGroupWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedHost != null) {
selectWeightRoundRobin.sel(totalWeight);
return selectedHost;
}
return hosts.get(0);
}
}
Loading…
Cancel
Save