From fd051530fa563917c10d177d04d5766e811ab3dc Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 12 Jan 2021 09:49:56 +0800 Subject: [PATCH] rpc --- .../remote/rpc/IUserService.java | 10 ++ .../dolphinscheduler/remote/rpc/MainTest.java | 21 +++ .../remote/rpc/UserService.java | 12 ++ .../remote/rpc/filter/Filter.java | 4 + .../remote/rpc/filter/FilterWrapper.java | 1 + .../remote/rpc/filter/SelectorFilter.java | 19 +++ .../rpc/filter/directory/Directory.java | 59 ++++++++ .../rpc/filter/selector/HostWeight.java | 87 +++++++++++ .../selector/LowerWeightRoundRobin.java | 56 +++++++ .../filter/selector/RoundRobinSelector.java | 141 ++++++++++++++++++ 10 files changed, 410 insertions(+) create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/directory/Directory.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java create mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java new file mode 100644 index 0000000000..e834995028 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/IUserService.java @@ -0,0 +1,10 @@ +package org.apache.dolphinscheduler.remote.rpc; + +/** + * @author jiangli + * @date 2021-01-11 21:05 + */ +public interface IUserService { + + String say(); +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java new file mode 100644 index 0000000000..3071b4e0b4 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -0,0 +1,21 @@ +package org.apache.dolphinscheduler.remote.rpc; + +import org.apache.dolphinscheduler.remote.rpc.client.IRpcClient; +import org.apache.dolphinscheduler.remote.rpc.client.RpcClient; + +/** + * @author jiangli + * @date 2021-01-11 21:06 + */ +public class MainTest { + + public static void main(String[] args) throws Exception { + + RpcClient rpcClient = new RpcClient(); + IUserService userService = rpcClient.create(IUserService.class); + for (int i = 0; i < 100; i++) { + userService.say(); + } + + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java new file mode 100644 index 0000000000..bf1e6623e3 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.remote.rpc; + +/** + * @author jiangli + * @date 2021-01-11 21:05 + */ +public class UserService implements IUserService{ + @Override + public String say() { + return null; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java index 11786c0dde..30f1ff8196 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java @@ -4,6 +4,10 @@ import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +import com.amazonaws.Response; + + public interface Filter { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java index ade79ef2d0..e390f1a735 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java @@ -3,6 +3,7 @@ package org.apache.dolphinscheduler.remote.rpc.filter; import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.utils.Host; /** * @author jiangli diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java index 5cccedbc40..dea5a0f652 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java @@ -3,6 +3,13 @@ package org.apache.dolphinscheduler.remote.rpc.filter; import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.rpc.filter.directory.Directory; +import org.apache.dolphinscheduler.remote.rpc.selector.RandomSelector; +import org.apache.dolphinscheduler.remote.utils.Host; + +import java.nio.channels.Selector; +import java.util.ArrayList; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +41,18 @@ public class SelectorFilter implements Filter { @Override public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable { + Directory.getInstance().addServer("default","127.0.0.1:8080"); + Directory.getInstance().addServer("default","127.0.0.2:8080"); + Directory.getInstance().addServer("default","127.0.0.3:8080"); + List hosts = Directory.getInstance().getDirectory("default"); + List candidateHosts = new ArrayList<>(hosts.size()); + hosts.forEach(node -> { + Host nodeHost = Host.of(node); + nodeHost.setWorkGroup("default"); + candidateHosts.add(nodeHost); + }); + RandomSelector randomSelector = new RandomSelector(); + System.out.println(randomSelector.doSelect(candidateHosts)); RpcResponse rsp = new RpcResponse(); rsp.setMsg("ms"); return rsp; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/directory/Directory.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/directory/Directory.java new file mode 100644 index 0000000000..c090e0ebb3 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/directory/Directory.java @@ -0,0 +1,59 @@ +package org.apache.dolphinscheduler.remote.rpc.filter.directory; + +import org.apache.dolphinscheduler.remote.rpc.filter.SelectorFilter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Directory + */ +public class Directory { + + + private static final Logger logger = LoggerFactory.getLogger(Directory.class); + + + private SelectorFilter selectorFilter = SelectorFilter.getInstance(); + + public static Directory getInstance() { + return Directory.DirectoryInner.INSTANCE; + } + + private static class DirectoryInner { + + private static final Directory INSTANCE = new Directory(); + } + + private Directory() { + } + + + private ConcurrentHashMap> directoryMap = new ConcurrentHashMap<>(); + + public List getDirectory(String serviceName) { + return directoryMap.get(serviceName); + } + + public boolean addServer(String serviceName, String servicePath) { + synchronized (this) { + if (directoryMap.containsKey(serviceName)) { + directoryMap.get(serviceName).add(servicePath); + return true; + } + } + directoryMap.putIfAbsent(serviceName, new ArrayList<>(Collections.singletonList(servicePath))); + return true; + } + + public boolean removeServer(String serviceName, String servicePath) { + + return true; + } + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java new file mode 100644 index 0000000000..e6352aa8d5 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.rpc.filter.selector; + +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.Host; + +/** + * host weight + */ +public class HostWeight { + + private final int CPU_FACTOR = 10; + + private final int MEMORY_FACTOR = 20; + + private final int LOAD_AVERAGE_FACTOR = 70; + + private final Host host; + + private final double weight; + + private double currentWeight; + + public HostWeight(Host host, double cpu, double memory, double loadAverage) { + this.weight = getWeight(cpu, memory, loadAverage, host); + this.host = host; + this.currentWeight = weight; + } + + public double getCurrentWeight() { + return currentWeight; + } + + public double getWeight() { + return weight; + } + + public void setCurrentWeight(double currentWeight) { + this.currentWeight = currentWeight; + } + + public Host getHost() { + return host; + } + + @Override + public String toString() { + return "HostWeight{" + + "host=" + host + + ", weight=" + weight + + ", currentWeight=" + currentWeight + + '}'; + } + + private double getWeight(double cpu, double memory, double loadAverage, Host host) { + double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; + return getWarmUpWeight(host, calculateWeight); + } + + /** + * If the warm-up is not over, add the weight + */ + private double getWarmUpWeight(Host host, double weight) { + long startTime = host.getStartTime(); + long uptime = System.currentTimeMillis() - startTime; + if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { + return weight * Constants.WARM_UP_TIME / uptime; + } + return weight; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java new file mode 100644 index 0000000000..115ca311d0 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.rpc.filter.selector; + + +import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector; + +import java.util.Collection; + +/** + * lower weight round robin + */ +public class LowerWeightRoundRobin extends AbstractSelector { + + /** + * select + * + * @param sources sources + * @return HostWeight + */ + @Override + public HostWeight doSelect(Collection sources) { + double totalWeight = 0; + double lowWeight = 0; + HostWeight lowerNode = null; + for (HostWeight hostWeight : sources) { + totalWeight += hostWeight.getWeight(); + hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); + if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { + lowerNode = hostWeight; + lowWeight = hostWeight.getCurrentWeight(); + } + } + lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight); + return lowerNode; + + } +} + + + diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java new file mode 100644 index 0000000000..5859b4c96a --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.rpc.filter.selector; + +import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector; +import org.apache.dolphinscheduler.remote.utils.Host; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * Smooth Weight Round Robin + */ +public class RoundRobinSelector extends AbstractSelector { + + private ConcurrentMap> workGroupWeightMap = new ConcurrentHashMap<>(); + + private static final int RECYCLE_PERIOD = 100000; + + private AtomicBoolean updateLock = new AtomicBoolean(); + + protected static class WeightedRoundRobin { + private int weight; + private AtomicLong current = new AtomicLong(0); + private long lastUpdate; + + int getWeight() { + return weight; + } + + void setWeight(int weight) { + this.weight = weight; + current.set(0); + } + + long increaseCurrent() { + return current.addAndGet(weight); + } + + void sel(int total) { + current.addAndGet(-1L * total); + } + + long getLastUpdate() { + return lastUpdate; + } + + void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } + + } + + + @Override + public Host doSelect(Collection source) { + + List hosts = new ArrayList<>(source); + String key = hosts.get(0).getWorkGroup(); + ConcurrentMap map = workGroupWeightMap.get(key); + if (map == null) { + workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>()); + map = workGroupWeightMap.get(key); + } + + int totalWeight = 0; + long maxCurrent = Long.MIN_VALUE; + long now = System.currentTimeMillis(); + Host selectedHost = null; + WeightedRoundRobin selectWeightRoundRobin = null; + + for (Host host : hosts) { + String workGroupHost = host.getWorkGroup() + host.getAddress(); + WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost); + int weight = host.getWeight(); + if (weight < 0) { + weight = 0; + } + + if (weightedRoundRobin == null) { + weightedRoundRobin = new WeightedRoundRobin(); + // set weight + weightedRoundRobin.setWeight(weight); + map.putIfAbsent(workGroupHost, weightedRoundRobin); + weightedRoundRobin = map.get(workGroupHost); + } + if (weight != weightedRoundRobin.getWeight()) { + weightedRoundRobin.setWeight(weight); + } + + long cur = weightedRoundRobin.increaseCurrent(); + weightedRoundRobin.setLastUpdate(now); + if (cur > maxCurrent) { + maxCurrent = cur; + selectedHost = host; + selectWeightRoundRobin = weightedRoundRobin; + } + + totalWeight += weight; + } + + + if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) { + try { + ConcurrentMap newMap = new ConcurrentHashMap<>(map); + newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); + workGroupWeightMap.put(key, newMap); + } finally { + updateLock.set(false); + } + } + + if (selectedHost != null) { + selectWeightRoundRobin.sel(totalWeight); + return selectedHost; + } + + return hosts.get(0); + } +}