Browse Source

update

pull/3/MERGE
CalvinKirs 3 years ago
parent
commit
ff67386ce3
  1. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
  2. 32
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java
  3. 29
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java
  4. 19
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java
  5. 28
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
  6. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java
  7. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java
  8. 37
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java
  9. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java
  10. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java
  11. 22
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java
  12. 18
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java
  13. 19
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java
  14. 18
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java
  15. 59
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/directory/Directory.java
  16. 15
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java
  17. 38
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java
  18. 33
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java
  19. 26
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java
  20. 60
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java
  21. 87
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java
  22. 56
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java
  23. 141
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java
  24. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java
  25. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java
  26. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
  27. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java
  28. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java
  29. 29
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java
  30. 44
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java
  31. 16
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java
  32. 17
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java

@ -8,7 +8,7 @@ import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
*/
public class UserService {
@Rpc(async = true, callback = UserCallback.class, retries = 9999)
@Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999)
public Boolean say(String s) {
return true;
}

32
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.base;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
@ -14,10 +31,23 @@ import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
public @interface Rpc {
/**
* number of retries
*/
int retries() default 3;
boolean async() default false;
Class<? extends AbstractRpcCallBack> callback() default AbstractRpcCallBack.class;
boolean ack() default false;
/**
* When it is asynchronous transmission, callback must be set
*/
Class<? extends AbstractRpcCallBack> serviceCallback() default AbstractRpcCallBack.class;
Class<? extends AbstractRpcCallBack> ackCallback() default AbstractRpcCallBack.class;
}

29
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java

@ -1,10 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack;
import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants;
/**
* ConsumerConfig
* We will cache the consumer configuration, when the rpc call is generated, the consumer configuration will be first obtained from here
*/
public class ConsumerConfig {
@ -16,12 +33,12 @@ public class ConsumerConfig {
private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES;
public Class<? extends AbstractRpcCallBack> getCallBackClass() {
return callBackClass;
}
public void setCallBackClass(Class<? extends AbstractRpcCallBack> callBackClass) {
//set call back class
void setCallBackClass(Class<? extends AbstractRpcCallBack> callBackClass) {
this.callBackClass = callBackClass;
}
@ -37,15 +54,15 @@ public class ConsumerConfig {
return async;
}
public void setAsync(Boolean async) {
void setAsync(Boolean async) {
this.async = async;
}
public Integer getRetries() {
Integer getRetries() {
return retries;
}
public void setRetries(Integer retries) {
void setRetries(Integer retries) {
this.retries = retries;
}
}

19
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import java.util.concurrent.ConcurrentHashMap;
@ -13,7 +30,7 @@ public class ConsumerConfigCache {
return consumerMap.get(serviceName);
}
public static void putConfig(String serviceName, ConsumerConfig consumerConfig) {
static void putConfig(String serviceName, ConsumerConfig consumerConfig) {
consumerMap.putIfAbsent(serviceName, consumerConfig);
}
}

28
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
@ -5,7 +22,6 @@ import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain;
import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient;
import org.apache.dolphinscheduler.remote.utils.Host;
@ -24,24 +40,18 @@ public class ConsumerInterceptor {
private Invoker invoker;
private FilterChain filterChain;
private Host host;
private NettyClient nettyClient = NettyClient.getInstance();
public ConsumerInterceptor(Invoker invoker, Host host) {
this.filterChain = new FilterChain(invoker);
this.invoker = this.filterChain.buildFilterChain();
public ConsumerInterceptor(Host host) {
this.host = host;
}
@RuntimeType
public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException {
RpcRequest request = buildReq(args, method);
String serviceName = method.getDeclaringClass().getName() + method.getName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (null == consumerConfig) {
@ -52,7 +62,7 @@ public class ConsumerInterceptor {
int retries = consumerConfig.getRetries();
while (retries-- > 0) {
RpcResponse rsp = (RpcResponse) nettyClient.sendMsg(host, request, async);
RpcResponse rsp = nettyClient.sendMsg(host, request, async);
//success
if (null != rsp && rsp.getStatus() == 0) {
return rsp.getResult();

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java

@ -1,17 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
/**
* ConsumerInvoker
*/
public class ConsumerInvoker implements Invoker {
@Override
public RpcResponse invoke(RpcRequest req) throws Throwable {
System.out.println(req.getRequestId()+"kris");
return null;
}
}

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.utils.Host;

37
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
@ -12,24 +29,24 @@ import net.bytebuddy.implementation.MethodDelegation;
/**
* RpcClient
*/
public class RpcClient implements IRpcClient{
public class RpcClient implements IRpcClient {
private ConcurrentHashMap<String,Object> classMap=new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<>();
@Override
public <T> T create(Class<T> clazz,Host host) throws Exception {
// if(!classMap.containsKey(clazz.getName())){
T proxy = new ByteBuddy()
public <T> T create(Class<T> clazz, Host host) throws Exception {
// if(!classMap.containsKey(clazz.getName())){
T proxy = new ByteBuddy()
.subclass(clazz)
.method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker(),host)))
.method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(host)))
.make()
.load(getClass().getClassLoader())
.getLoaded()
.getDeclaredConstructor().newInstance();
// classMap.putIfAbsent(clazz.getName(),proxy);
return proxy;
// }
// return (T) classMap.get(clazz.getName());
// classMap.putIfAbsent(clazz.getName(),proxy);
return proxy;
// }
// return (T) classMap.get(clazz.getName());
}
}

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.client;
import java.util.concurrent.ConcurrentHashMap;

22
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.common;
/**
@ -5,6 +22,11 @@ package org.apache.dolphinscheduler.remote.rpc.common;
*/
public abstract class AbstractRpcCallBack {
/**
* When sending an asynchronous message, this method will be called after the response is successfully sent.
*
* @param object response
*/
public abstract void run(Object object);
}

18
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.common;
/**
@ -5,7 +22,6 @@ package org.apache.dolphinscheduler.remote.rpc.common;
*/
public class ConsumerConfigConstants {
public static final Boolean DEFAULT_SYNC = false;
public static final Integer DEFAULT_RETRIES = 3;

19
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.common;
/**
@ -11,7 +28,7 @@ public class RpcRequest {
private Class<?>[] parameterTypes;
private Object[] parameters;
// 0 hear beat,1 businness msg
private Byte eventType=1;
private Byte eventType = 1;
public Byte getEventType() {
return eventType;

18
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.common;
/**
@ -9,6 +26,7 @@ public class RpcResponse {
private String msg;
private Object result;
private Byte status;
private Integer responseType;
public String getRequestId() {
return requestId;

59
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/directory/Directory.java

@ -1,59 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.directory;
import org.apache.dolphinscheduler.remote.rpc.filter.SelectorFilter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Directory
*/
public class Directory {
private static final Logger logger = LoggerFactory.getLogger(Directory.class);
private SelectorFilter selectorFilter = SelectorFilter.getInstance();
public static Directory getInstance() {
return Directory.DirectoryInner.INSTANCE;
}
private static class DirectoryInner {
private static final Directory INSTANCE = new Directory();
}
private Directory() {
}
private ConcurrentHashMap<String, List<String>> directoryMap = new ConcurrentHashMap<>();
public List<String> getDirectory(String serviceName) {
return directoryMap.get(serviceName);
}
public boolean addServer(String serviceName, String servicePath) {
synchronized (this) {
if (directoryMap.containsKey(serviceName)) {
directoryMap.get(serviceName).add(servicePath);
return true;
}
}
directoryMap.putIfAbsent(serviceName, new ArrayList<>(Collections.singletonList(servicePath)));
return true;
}
public boolean removeServer(String serviceName, String servicePath) {
return true;
}
}

15
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java

@ -1,15 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import com.amazonaws.Response;
public interface Filter {
RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable;
}

38
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java

@ -1,38 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import java.util.List;
/**
* FilterChain
*/
public class FilterChain {
private List<Filter> filters;
private Invoker invoker;
public FilterChain(List<Filter> filters, Invoker invoker) {
this.filters = filters;
this.invoker = invoker;
}
public FilterChain(Invoker invoker) {
this(LoaderFilters.create().getFilters(), invoker);
}
public Invoker buildFilterChain() {
// 最后一个
Invoker last = invoker;
for (int i = filters.size() - 1; i >= 0; i--) {
last = new FilterWrapper(filters.get(i), last);
}
// 第一个
return last;
}
}

33
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java

@ -1,33 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* @author jiangli
* @date 2021-01-11 11:48
*/
public class FilterWrapper implements Invoker {
private Filter next;
private Invoker invoker;
public FilterWrapper(Filter next, Invoker invoker) {
this.next = next;
this.invoker = invoker;
}
@Override
public RpcResponse invoke(RpcRequest args) throws Throwable {
if (next != null) {
return next.filter(invoker, args);
} else {
return invoker.invoke(args);
}
}
}

26
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java

@ -1,26 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.filter;
import java.util.ArrayList;
import java.util.List;
/**
* LoaderFilters
*/
public class LoaderFilters {
private List<Filter> filterList = new ArrayList<>();
private LoaderFilters() {
}
public static LoaderFilters create() {
return new LoaderFilters();
}
public List<Filter> getFilters() {
filterList.add(SelectorFilter.getInstance());
return filterList;
}
}

60
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java

@ -1,60 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.filter;
import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.directory.Directory;
import org.apache.dolphinscheduler.remote.rpc.selector.RandomSelector;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SelectorFilter
*/
public class SelectorFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(SelectorFilter.class);
private SelectorFilter selectorFilter = SelectorFilter.getInstance();
public static SelectorFilter getInstance() {
return SelectorFilterInner.INSTANCE;
}
private static class SelectorFilterInner {
private static final SelectorFilter INSTANCE = new SelectorFilter();
}
private SelectorFilter() {
}
@Override
public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable {
Directory.getInstance().addServer("default","127.0.0.1:8080");
Directory.getInstance().addServer("default","127.0.0.2:8080");
Directory.getInstance().addServer("default","127.0.0.3:8080");
List<String> hosts = Directory.getInstance().getDirectory("default");
List<Host> candidateHosts = new ArrayList<>(hosts.size());
hosts.forEach(node -> {
Host nodeHost = Host.of(node);
nodeHost.setWorkGroup("default");
candidateHosts.add(nodeHost);
});
RandomSelector randomSelector = new RandomSelector();
System.out.println(randomSelector.doSelect(candidateHosts));
RpcResponse rsp = new RpcResponse();
rsp.setMsg("ms");
return rsp;
}
}

87
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java

@ -1,87 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.filter.selector;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* host weight
*/
public class HostWeight {
private final int CPU_FACTOR = 10;
private final int MEMORY_FACTOR = 20;
private final int LOAD_AVERAGE_FACTOR = 70;
private final Host host;
private final double weight;
private double currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = getWeight(cpu, memory, loadAverage, host);
this.host = host;
this.currentWeight = weight;
}
public double getCurrentWeight() {
return currentWeight;
}
public double getWeight() {
return weight;
}
public void setCurrentWeight(double currentWeight) {
this.currentWeight = currentWeight;
}
public Host getHost() {
return host;
}
@Override
public String toString() {
return "HostWeight{"
+ "host=" + host
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ '}';
}
private double getWeight(double cpu, double memory, double loadAverage, Host host) {
double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
return getWarmUpWeight(host, calculateWeight);
}
/**
* If the warm-up is not over, add the weight
*/
private double getWarmUpWeight(Host host, double weight) {
long startTime = host.getStartTime();
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return weight * Constants.WARM_UP_TIME / uptime;
}
return weight;
}
}

56
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.filter.selector;
import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector;
import java.util.Collection;
/**
* lower weight round robin
*/
public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
/**
* select
*
* @param sources sources
* @return HostWeight
*/
@Override
public HostWeight doSelect(Collection<HostWeight> sources) {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
}

141
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java

@ -1,141 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.filter.selector;
import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Smooth Weight Round Robin
*/
public class RoundRobinSelector extends AbstractSelector<Host> {
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> workGroupWeightMap = new ConcurrentHashMap<>();
private static final int RECYCLE_PERIOD = 100000;
private AtomicBoolean updateLock = new AtomicBoolean();
protected static class WeightedRoundRobin {
private int weight;
private AtomicLong current = new AtomicLong(0);
private long lastUpdate;
int getWeight() {
return weight;
}
void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
long increaseCurrent() {
return current.addAndGet(weight);
}
void sel(int total) {
current.addAndGet(-1L * total);
}
long getLastUpdate() {
return lastUpdate;
}
void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
@Override
public Host doSelect(Collection<Host> source) {
List<Host> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkGroup();
ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
map = workGroupWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Host selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
for (Host host : hosts) {
String workGroupHost = host.getWorkGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
int weight = host.getWeight();
if (weight < 0) {
weight = 0;
}
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// set weight
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(workGroupHost, weightedRoundRobin);
weightedRoundRobin = map.get(workGroupHost);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
if (cur > maxCurrent) {
maxCurrent = cur;
selectedHost = host;
selectWeightRoundRobin = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
workGroupWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedHost != null) {
selectWeightRoundRobin.sel(totalWeight);
return selectedHost;
}
return hosts.get(0);
}
}

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.future;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.remote;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.remote;
import io.netty.channel.ChannelHandler;

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.remote;
import io.netty.bootstrap.ServerBootstrap;

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.rpc.remote;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;

29
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java

@ -1,29 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.selector;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import java.util.Collection;
/**
* AbstractSelector
*/
public abstract class AbstractSelector<T> implements Selector<T>{
@Override
public T select(Collection<T> source) {
if (CollectionUtils.isEmpty(source)) {
throw new IllegalArgumentException("Empty source.");
}
/**
* if only one , return directly
*/
if (source.size() == 1) {
return (T)source.toArray()[0];
}
return doSelect(source);
}
protected abstract T doSelect(Collection<T> source);
}

44
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java

@ -1,44 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.selector;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* RandomSelector
*/
public class RandomSelector extends AbstractSelector<Host> {
@Override
public Host doSelect(final Collection<Host> source) {
List<Host> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
for (Host host : hosts) {
totalWeight += host.getWeight();
weights[index] = host.getWeight();
index++;
}
if (totalWeight > 0) {
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < size; i++) {
offset -= weights[i];
if (offset < 0) {
return hosts.get(i);
}
}
}
return hosts.get(ThreadLocalRandom.current().nextInt(size));
}
}

16
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java

@ -1,16 +0,0 @@
package org.apache.dolphinscheduler.remote.rpc.selector;
import java.util.Collection;
/**
* Selector
*/
public interface Selector<T> {
/**
* select
* @param source source
* @return T
*/
T select(Collection<T> source);
}

17
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.serialize;
import io.protostuff.LinkedBuffer;

Loading…
Cancel
Save