diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml index 3ac7b914a5..e40b7e1274 100644 --- a/dolphinscheduler-remote/pom.xml +++ b/dolphinscheduler-remote/pom.xml @@ -47,6 +47,10 @@ org.slf4j slf4j-api + + net.bytebuddy + byte-buddy + junit junit diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/Invoker.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/Invoker.java new file mode 100644 index 0000000000..e93dfa4e66 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/Invoker.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.remote.rpc; + +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; + +/** + * Invoker + */ +public interface Invoker { + + RpcResponse invoke(RpcRequest req) throws Throwable; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java new file mode 100644 index 0000000000..e34db2dd9c --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java @@ -0,0 +1,49 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import net.bytebuddy.implementation.bind.annotation.AllArguments; +import net.bytebuddy.implementation.bind.annotation.Origin; +import net.bytebuddy.implementation.bind.annotation.RuntimeType; + +import org.apache.dolphinscheduler.remote.rpc.Invoker; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain; + +import java.lang.reflect.Method; +import java.util.UUID; + +/** + * ConsumerInterceptor + */ +public class ConsumerInterceptor { + + private Invoker invoker; + + + private FilterChain filterChain; + + public ConsumerInterceptor(Invoker invoker) { + this.filterChain = new FilterChain(invoker); + this.invoker = this.filterChain.buildFilterChain(); + } + + + @RuntimeType + public Object intercept(@AllArguments Object[] args, @Origin Method method) throws Throwable { + RpcRequest request = buildReq(args, method); + //todo + System.out.println(invoker.invoke(request)); + return null; + + } + + private RpcRequest buildReq(Object[] args, Method method) { + RpcRequest request = new RpcRequest(); + request.setRequestId(UUID.randomUUID().toString()); + request.setClassName(method.getDeclaringClass().getName()); + request.setMethodName(method.getName()); + request.setParameterTypes(method.getParameterTypes()); + request.setParameters(args); + return request; + } + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java new file mode 100644 index 0000000000..5ef800ec5d --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java @@ -0,0 +1,18 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import org.apache.dolphinscheduler.remote.rpc.Invoker; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +/** + * @author jiangli + * @date 2021-01-09 15:27 + */ +public class ConsumerInvoker implements Invoker { + @Override + public RpcResponse invoke(RpcRequest req) throws Throwable { + + System.out.println(req.getRequestId()+"kris"); + return null; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java new file mode 100644 index 0000000000..0b384cbe62 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java @@ -0,0 +1,12 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +/** + * @author jiangli + * @date 2021-01-09 10:58 + */ +public interface IRpcClient { + + + T create(Class clazz) throws Exception; + +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java new file mode 100644 index 0000000000..83b64fcc31 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java @@ -0,0 +1,33 @@ +package org.apache.dolphinscheduler.remote.rpc.client; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.implementation.MethodDelegation; +import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author jiangli + * @date 2021-01-09 10:59 + */ +public class RpcClient implements IRpcClient{ + + private ConcurrentHashMap classMap=new ConcurrentHashMap<>(); + + + @Override + public T create(Class clazz) throws Exception { + if(!classMap.containsKey(clazz.getName())){ + T proxy = new ByteBuddy() + .subclass(clazz) + .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker()))) + .make() + .load(getClass().getClassLoader()) + .getLoaded() + .getDeclaredConstructor().newInstance(); + + classMap.putIfAbsent(clazz.getName(),proxy); + } + return (T) classMap.get(clazz.getName()); + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java new file mode 100644 index 0000000000..1e0eb2caf2 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java @@ -0,0 +1,54 @@ +package org.apache.dolphinscheduler.remote.rpc.common; + +/** + * @author jiangli + * @date 2021-01-09 13:21 + */ +public class RpcRequest { + + private String requestId; + private String className; + private String methodName; + private Class[] parameterTypes; + private Object[] parameters; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public String getMethodName() { + return methodName; + } + + public void setMethodName(String methodName) { + this.methodName = methodName; + } + + public Class[] getParameterTypes() { + return parameterTypes; + } + + public void setParameterTypes(Class[] parameterTypes) { + this.parameterTypes = parameterTypes; + } + + public Object[] getParameters() { + return parameters; + } + + public void setParameters(Object[] parameters) { + this.parameters = parameters; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java new file mode 100644 index 0000000000..f39889bdaf --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java @@ -0,0 +1,44 @@ +package org.apache.dolphinscheduler.remote.rpc.common; + +/** + * RpcResponse + */ +public class RpcResponse { + + private String requestId; + private String msg; + private Object result; + private Byte status; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public Object getResult() { + return result; + } + + public void setResult(Object result) { + this.result = result; + } + + public Byte getStatus() { + return status; + } + + public void setStatus(Byte status) { + this.status = status; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java new file mode 100644 index 0000000000..11786c0dde --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java @@ -0,0 +1,11 @@ +package org.apache.dolphinscheduler.remote.rpc.filter; + +import org.apache.dolphinscheduler.remote.rpc.Invoker; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +public interface Filter { + + + RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable; +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java new file mode 100644 index 0000000000..abb0f3a5bf --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java @@ -0,0 +1,38 @@ +package org.apache.dolphinscheduler.remote.rpc.filter; + +import org.apache.dolphinscheduler.remote.rpc.Invoker; + +import java.util.List; + +/** + * FilterChain + */ +public class FilterChain { + + + private List filters; + + private Invoker invoker; + + + public FilterChain(List filters, Invoker invoker) { + this.filters = filters; + this.invoker = invoker; + } + + public FilterChain(Invoker invoker) { + this(LoaderFilters.create().getFilters(), invoker); + } + + public Invoker buildFilterChain() { + // 最后一个 + Invoker last = invoker; + + for (int i = filters.size() - 1; i >= 0; i--) { + last = new FilterWrapper(filters.get(i), last); + } + // 第一个 + return last; + + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java new file mode 100644 index 0000000000..ade79ef2d0 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java @@ -0,0 +1,32 @@ +package org.apache.dolphinscheduler.remote.rpc.filter; + +import org.apache.dolphinscheduler.remote.rpc.Invoker; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +/** + * @author jiangli + * @date 2021-01-11 11:48 + */ +public class FilterWrapper implements Invoker { + + + private Filter next; + + private Invoker invoker; + + + public FilterWrapper(Filter next, Invoker invoker) { + this.next = next; + this.invoker = invoker; + } + + @Override + public RpcResponse invoke(RpcRequest args) throws Throwable { + if (next != null) { + return next.filter(invoker, args); + } else { + return invoker.invoke(args); + } + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java new file mode 100644 index 0000000000..be80385b04 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java @@ -0,0 +1,26 @@ +package org.apache.dolphinscheduler.remote.rpc.filter; + +import java.util.ArrayList; +import java.util.List; + +/** + * LoaderFilters + */ +public class LoaderFilters { + + + private List filterList = new ArrayList<>(); + + private LoaderFilters() { + } + + public static LoaderFilters create() { + + return new LoaderFilters(); + } + + public List getFilters() { + filterList.add(SelectorFilter.getInstance()); + return filterList; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java new file mode 100644 index 0000000000..5cccedbc40 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java @@ -0,0 +1,41 @@ +package org.apache.dolphinscheduler.remote.rpc.filter; + +import org.apache.dolphinscheduler.remote.rpc.Invoker; +import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; +import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * SelectorFilter + */ +public class SelectorFilter implements Filter { + + + private static final Logger logger = LoggerFactory.getLogger(SelectorFilter.class); + + + private SelectorFilter selectorFilter = SelectorFilter.getInstance(); + + public static SelectorFilter getInstance() { + return SelectorFilterInner.INSTANCE; + } + + + private static class SelectorFilterInner { + + private static final SelectorFilter INSTANCE = new SelectorFilter(); + } + + private SelectorFilter() { + } + + @Override + public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable { + RpcResponse rsp = new RpcResponse(); + rsp.setMsg("ms"); + return rsp; + } +} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java new file mode 100644 index 0000000000..8e8d68ae39 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java @@ -0,0 +1,29 @@ +package org.apache.dolphinscheduler.remote.rpc.selector; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; + +import java.util.Collection; + +/** + * AbstractSelector + */ +public abstract class AbstractSelector implements Selector{ + @Override + public T select(Collection source) { + + if (CollectionUtils.isEmpty(source)) { + throw new IllegalArgumentException("Empty source."); + } + + /** + * if only one , return directly + */ + if (source.size() == 1) { + return (T)source.toArray()[0]; + } + return doSelect(source); + } + + protected abstract T doSelect(Collection source); + +} \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java new file mode 100644 index 0000000000..b48017ebf2 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java @@ -0,0 +1,45 @@ +package org.apache.dolphinscheduler.remote.rpc.selector; + +import org.apache.dolphinscheduler.remote.utils.Host; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * @author jiangli + * @date 2021-01-11 12:00 + */ +public class RandomSelector extends AbstractSelector { + + @Override + public Host doSelect(final Collection source) { + + List hosts = new ArrayList<>(source); + int size = hosts.size(); + int[] weights = new int[size]; + int totalWeight = 0; + int index = 0; + + for (Host host : hosts) { + totalWeight += host.getWeight(); + weights[index] = host.getWeight(); + index++; + } + + if (totalWeight > 0) { + int offset = ThreadLocalRandom.current().nextInt(totalWeight); + + for (int i = 0; i < size; i++) { + offset -= weights[i]; + if (offset < 0) { + return hosts.get(i); + } + } + } + return hosts.get(ThreadLocalRandom.current().nextInt(size)); + } + +} + diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java new file mode 100644 index 0000000000..d511294496 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java @@ -0,0 +1,16 @@ +package org.apache.dolphinscheduler.remote.rpc.selector; + +import java.util.Collection; + +/** + * Selector + */ +public interface Selector { + + /** + * select + * @param source source + * @return T + */ + T select(Collection source); +}