From ff67386ce3ab12e6c56ac3fa8b30543a320c604b Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Fri, 22 Jan 2021 10:05:01 +0800 Subject: [PATCH] update --- .../remote/rpc/UserService.java | 2 +- .../dolphinscheduler/remote/rpc/base/Rpc.java | 32 +++- .../remote/rpc/client/ConsumerConfig.java | 29 +++- .../rpc/client/ConsumerConfigCache.java | 19 ++- .../rpc/client/ConsumerInterceptor.java | 28 ++-- .../remote/rpc/client/ConsumerInvoker.java | 17 --- .../remote/rpc/client/IRpcClient.java | 17 +++ .../remote/rpc/client/RpcClient.java | 37 +++-- .../remote/rpc/client/RpcRequestCache.java | 17 +++ .../remote/rpc/client/RpcRequestTable.java | 17 +++ .../rpc/common/AbstractRpcCallBack.java | 22 +++ .../rpc/common/ConsumerConfigConstants.java | 18 ++- .../remote/rpc/common/RpcRequest.java | 19 ++- .../remote/rpc/common/RpcResponse.java | 18 +++ .../remote/rpc/directory/Directory.java | 59 -------- .../remote/rpc/filter/Filter.java | 15 -- .../remote/rpc/filter/FilterChain.java | 38 ----- .../remote/rpc/filter/FilterWrapper.java | 33 ---- .../remote/rpc/filter/LoaderFilters.java | 26 ---- .../remote/rpc/filter/SelectorFilter.java | 60 -------- .../rpc/filter/selector/HostWeight.java | 87 ----------- .../selector/LowerWeightRoundRobin.java | 56 ------- .../filter/selector/RoundRobinSelector.java | 141 ------------------ .../remote/rpc/future/RpcFuture.java | 17 +++ .../remote/rpc/remote/NettyClient.java | 17 +++ .../remote/rpc/remote/NettyClientHandler.java | 17 +++ .../remote/rpc/remote/NettyServer.java | 17 +++ .../remote/rpc/remote/NettyServerHandler.java | 17 +++ .../remote/rpc/selector/AbstractSelector.java | 29 ---- .../remote/rpc/selector/RandomSelector.java | 44 ------ .../remote/rpc/selector/Selector.java | 16 -- .../remote/serialize/ProtoStuffUtils.java | 17 +++ 32 files changed, 347 insertions(+), 651 deletions(-) delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/directory/Directory.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java delete mode 100644 dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java index 57382c4368..a8f3c35adc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java @@ -8,7 +8,7 @@ import org.apache.dolphinscheduler.remote.rpc.base.Rpc; */ public class UserService { - @Rpc(async = true, callback = UserCallback.class, retries = 9999) + @Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999) public Boolean say(String s) { return true; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java index b580aef3dd..7ccee632b9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/Rpc.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.base; import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; @@ -14,10 +31,23 @@ import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) public @interface Rpc { + /** + * number of retries + */ int retries() default 3; boolean async() default false; - Class callback() default AbstractRpcCallBack.class; + boolean ack() default false; + + /** + * When it is asynchronous transmission, callback must be set + */ + Class serviceCallback() default AbstractRpcCallBack.class; + + Class ackCallback() default AbstractRpcCallBack.class; + + + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java index 639e022188..757bb0778b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java @@ -1,10 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import org.apache.dolphinscheduler.remote.rpc.common.AbstractRpcCallBack; import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants; /** - * ConsumerConfig + * We will cache the consumer configuration, when the rpc call is generated, the consumer configuration will be first obtained from here */ public class ConsumerConfig { @@ -16,12 +33,12 @@ public class ConsumerConfig { private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES; - public Class getCallBackClass() { return callBackClass; } - public void setCallBackClass(Class callBackClass) { + //set call back class + void setCallBackClass(Class callBackClass) { this.callBackClass = callBackClass; } @@ -37,15 +54,15 @@ public class ConsumerConfig { return async; } - public void setAsync(Boolean async) { + void setAsync(Boolean async) { this.async = async; } - public Integer getRetries() { + Integer getRetries() { return retries; } - public void setRetries(Integer retries) { + void setRetries(Integer retries) { this.retries = retries; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java index 4cd99a153b..f8d7d570b0 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfigCache.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import java.util.concurrent.ConcurrentHashMap; @@ -13,7 +30,7 @@ public class ConsumerConfigCache { return consumerMap.get(serviceName); } - public static void putConfig(String serviceName, ConsumerConfig consumerConfig) { + static void putConfig(String serviceName, ConsumerConfig consumerConfig) { consumerMap.putIfAbsent(serviceName, consumerConfig); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java index cde981b09f..79bf01718a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; @@ -5,7 +22,6 @@ import org.apache.dolphinscheduler.remote.rpc.Invoker; import org.apache.dolphinscheduler.remote.rpc.base.Rpc; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.filter.FilterChain; import org.apache.dolphinscheduler.remote.rpc.remote.NettyClient; import org.apache.dolphinscheduler.remote.utils.Host; @@ -24,24 +40,18 @@ public class ConsumerInterceptor { private Invoker invoker; - private FilterChain filterChain; - private Host host; private NettyClient nettyClient = NettyClient.getInstance(); - public ConsumerInterceptor(Invoker invoker, Host host) { - this.filterChain = new FilterChain(invoker); - this.invoker = this.filterChain.buildFilterChain(); + public ConsumerInterceptor(Host host) { this.host = host; } - @RuntimeType public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException { RpcRequest request = buildReq(args, method); - String serviceName = method.getDeclaringClass().getName() + method.getName(); ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName); if (null == consumerConfig) { @@ -52,7 +62,7 @@ public class ConsumerInterceptor { int retries = consumerConfig.getRetries(); while (retries-- > 0) { - RpcResponse rsp = (RpcResponse) nettyClient.sendMsg(host, request, async); + RpcResponse rsp = nettyClient.sendMsg(host, request, async); //success if (null != rsp && rsp.getStatus() == 0) { return rsp.getResult(); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java deleted file mode 100644 index cc0e86a2a9..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInvoker.java +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.client; - -import org.apache.dolphinscheduler.remote.rpc.Invoker; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; - -/** - * ConsumerInvoker - */ -public class ConsumerInvoker implements Invoker { - @Override - public RpcResponse invoke(RpcRequest req) throws Throwable { - - System.out.println(req.getRequestId()+"kris"); - return null; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java index abdc5d7289..ac7574ed1a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/IRpcClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import org.apache.dolphinscheduler.remote.utils.Host; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java index 1009750cb9..b8d1202433 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; @@ -12,24 +29,24 @@ import net.bytebuddy.implementation.MethodDelegation; /** * RpcClient */ -public class RpcClient implements IRpcClient{ +public class RpcClient implements IRpcClient { - private ConcurrentHashMap classMap=new ConcurrentHashMap<>(); + private ConcurrentHashMap classMap = new ConcurrentHashMap<>(); @Override - public T create(Class clazz,Host host) throws Exception { - // if(!classMap.containsKey(clazz.getName())){ - T proxy = new ByteBuddy() + public T create(Class clazz, Host host) throws Exception { + // if(!classMap.containsKey(clazz.getName())){ + T proxy = new ByteBuddy() .subclass(clazz) - .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(new ConsumerInvoker(),host))) + .method(isDeclaredBy(clazz)).intercept(MethodDelegation.to(new ConsumerInterceptor(host))) .make() .load(getClass().getClassLoader()) .getLoaded() .getDeclaredConstructor().newInstance(); - // classMap.putIfAbsent(clazz.getName(),proxy); - return proxy; - // } - // return (T) classMap.get(clazz.getName()); + // classMap.putIfAbsent(clazz.getName(),proxy); + return proxy; + // } + // return (T) classMap.get(clazz.getName()); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java index 5e75322ac7..153ceba293 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestCache.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java index 344a6e4a9b..ae31252d9a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/RpcRequestTable.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.client; import java.util.concurrent.ConcurrentHashMap; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java index 3e632c87f6..758b8474bb 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/AbstractRpcCallBack.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.common; /** @@ -5,6 +22,11 @@ package org.apache.dolphinscheduler.remote.rpc.common; */ public abstract class AbstractRpcCallBack { + /** + * When sending an asynchronous message, this method will be called after the response is successfully sent. + * + * @param object response + */ public abstract void run(Object object); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java index 3a0a5542b0..d23307aedf 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ConsumerConfigConstants.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.common; /** @@ -5,7 +22,6 @@ package org.apache.dolphinscheduler.remote.rpc.common; */ public class ConsumerConfigConstants { - public static final Boolean DEFAULT_SYNC = false; public static final Integer DEFAULT_RETRIES = 3; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java index 2cb4b23b7f..7d2480f642 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.common; /** @@ -11,7 +28,7 @@ public class RpcRequest { private Class[] parameterTypes; private Object[] parameters; // 0 hear beat,1 businness msg - private Byte eventType=1; + private Byte eventType = 1; public Byte getEventType() { return eventType; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java index f39889bdaf..0a8c5ef799 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.common; /** @@ -9,6 +26,7 @@ public class RpcResponse { private String msg; private Object result; private Byte status; + private Integer responseType; public String getRequestId() { return requestId; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/directory/Directory.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/directory/Directory.java deleted file mode 100644 index 7c4907d6dc..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/directory/Directory.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.directory; - -import org.apache.dolphinscheduler.remote.rpc.filter.SelectorFilter; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Directory - */ -public class Directory { - - - private static final Logger logger = LoggerFactory.getLogger(Directory.class); - - - private SelectorFilter selectorFilter = SelectorFilter.getInstance(); - - public static Directory getInstance() { - return Directory.DirectoryInner.INSTANCE; - } - - private static class DirectoryInner { - - private static final Directory INSTANCE = new Directory(); - } - - private Directory() { - } - - - private ConcurrentHashMap> directoryMap = new ConcurrentHashMap<>(); - - public List getDirectory(String serviceName) { - return directoryMap.get(serviceName); - } - - public boolean addServer(String serviceName, String servicePath) { - synchronized (this) { - if (directoryMap.containsKey(serviceName)) { - directoryMap.get(serviceName).add(servicePath); - return true; - } - } - directoryMap.putIfAbsent(serviceName, new ArrayList<>(Collections.singletonList(servicePath))); - return true; - } - - public boolean removeServer(String serviceName, String servicePath) { - - return true; - } - -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java deleted file mode 100644 index 30f1ff8196..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/Filter.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.filter; - -import org.apache.dolphinscheduler.remote.rpc.Invoker; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; - - -import com.amazonaws.Response; - - -public interface Filter { - - - RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable; -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java deleted file mode 100644 index abb0f3a5bf..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterChain.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.filter; - -import org.apache.dolphinscheduler.remote.rpc.Invoker; - -import java.util.List; - -/** - * FilterChain - */ -public class FilterChain { - - - private List filters; - - private Invoker invoker; - - - public FilterChain(List filters, Invoker invoker) { - this.filters = filters; - this.invoker = invoker; - } - - public FilterChain(Invoker invoker) { - this(LoaderFilters.create().getFilters(), invoker); - } - - public Invoker buildFilterChain() { - // 最后一个 - Invoker last = invoker; - - for (int i = filters.size() - 1; i >= 0; i--) { - last = new FilterWrapper(filters.get(i), last); - } - // 第一个 - return last; - - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java deleted file mode 100644 index e390f1a735..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/FilterWrapper.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.filter; - -import org.apache.dolphinscheduler.remote.rpc.Invoker; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.utils.Host; - -/** - * @author jiangli - * @date 2021-01-11 11:48 - */ -public class FilterWrapper implements Invoker { - - - private Filter next; - - private Invoker invoker; - - - public FilterWrapper(Filter next, Invoker invoker) { - this.next = next; - this.invoker = invoker; - } - - @Override - public RpcResponse invoke(RpcRequest args) throws Throwable { - if (next != null) { - return next.filter(invoker, args); - } else { - return invoker.invoke(args); - } - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java deleted file mode 100644 index be80385b04..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/LoaderFilters.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.filter; - -import java.util.ArrayList; -import java.util.List; - -/** - * LoaderFilters - */ -public class LoaderFilters { - - - private List filterList = new ArrayList<>(); - - private LoaderFilters() { - } - - public static LoaderFilters create() { - - return new LoaderFilters(); - } - - public List getFilters() { - filterList.add(SelectorFilter.getInstance()); - return filterList; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java deleted file mode 100644 index 8e8214a59f..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/SelectorFilter.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.filter; - -import org.apache.dolphinscheduler.remote.rpc.Invoker; -import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; -import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; -import org.apache.dolphinscheduler.remote.rpc.directory.Directory; -import org.apache.dolphinscheduler.remote.rpc.selector.RandomSelector; -import org.apache.dolphinscheduler.remote.utils.Host; - - -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * SelectorFilter - */ -public class SelectorFilter implements Filter { - - - private static final Logger logger = LoggerFactory.getLogger(SelectorFilter.class); - - - private SelectorFilter selectorFilter = SelectorFilter.getInstance(); - - public static SelectorFilter getInstance() { - return SelectorFilterInner.INSTANCE; - } - - - private static class SelectorFilterInner { - - private static final SelectorFilter INSTANCE = new SelectorFilter(); - } - - private SelectorFilter() { - } - - @Override - public RpcResponse filter(Invoker invoker, RpcRequest req) throws Throwable { - Directory.getInstance().addServer("default","127.0.0.1:8080"); - Directory.getInstance().addServer("default","127.0.0.2:8080"); - Directory.getInstance().addServer("default","127.0.0.3:8080"); - List hosts = Directory.getInstance().getDirectory("default"); - List candidateHosts = new ArrayList<>(hosts.size()); - hosts.forEach(node -> { - Host nodeHost = Host.of(node); - nodeHost.setWorkGroup("default"); - candidateHosts.add(nodeHost); - }); - RandomSelector randomSelector = new RandomSelector(); - System.out.println(randomSelector.doSelect(candidateHosts)); - RpcResponse rsp = new RpcResponse(); - rsp.setMsg("ms"); - return rsp; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java deleted file mode 100644 index e6352aa8d5..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/HostWeight.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.rpc.filter.selector; - -import org.apache.dolphinscheduler.remote.utils.Constants; -import org.apache.dolphinscheduler.remote.utils.Host; - -/** - * host weight - */ -public class HostWeight { - - private final int CPU_FACTOR = 10; - - private final int MEMORY_FACTOR = 20; - - private final int LOAD_AVERAGE_FACTOR = 70; - - private final Host host; - - private final double weight; - - private double currentWeight; - - public HostWeight(Host host, double cpu, double memory, double loadAverage) { - this.weight = getWeight(cpu, memory, loadAverage, host); - this.host = host; - this.currentWeight = weight; - } - - public double getCurrentWeight() { - return currentWeight; - } - - public double getWeight() { - return weight; - } - - public void setCurrentWeight(double currentWeight) { - this.currentWeight = currentWeight; - } - - public Host getHost() { - return host; - } - - @Override - public String toString() { - return "HostWeight{" - + "host=" + host - + ", weight=" + weight - + ", currentWeight=" + currentWeight - + '}'; - } - - private double getWeight(double cpu, double memory, double loadAverage, Host host) { - double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; - return getWarmUpWeight(host, calculateWeight); - } - - /** - * If the warm-up is not over, add the weight - */ - private double getWarmUpWeight(Host host, double weight) { - long startTime = host.getStartTime(); - long uptime = System.currentTimeMillis() - startTime; - if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { - return weight * Constants.WARM_UP_TIME / uptime; - } - return weight; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java deleted file mode 100644 index 115ca311d0..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/LowerWeightRoundRobin.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.rpc.filter.selector; - - -import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector; - -import java.util.Collection; - -/** - * lower weight round robin - */ -public class LowerWeightRoundRobin extends AbstractSelector { - - /** - * select - * - * @param sources sources - * @return HostWeight - */ - @Override - public HostWeight doSelect(Collection sources) { - double totalWeight = 0; - double lowWeight = 0; - HostWeight lowerNode = null; - for (HostWeight hostWeight : sources) { - totalWeight += hostWeight.getWeight(); - hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); - if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) { - lowerNode = hostWeight; - lowWeight = hostWeight.getCurrentWeight(); - } - } - lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight); - return lowerNode; - - } -} - - - diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java deleted file mode 100644 index 5859b4c96a..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/filter/selector/RoundRobinSelector.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.rpc.filter.selector; - -import org.apache.dolphinscheduler.remote.rpc.selector.AbstractSelector; -import org.apache.dolphinscheduler.remote.utils.Host; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - - -/** - * Smooth Weight Round Robin - */ -public class RoundRobinSelector extends AbstractSelector { - - private ConcurrentMap> workGroupWeightMap = new ConcurrentHashMap<>(); - - private static final int RECYCLE_PERIOD = 100000; - - private AtomicBoolean updateLock = new AtomicBoolean(); - - protected static class WeightedRoundRobin { - private int weight; - private AtomicLong current = new AtomicLong(0); - private long lastUpdate; - - int getWeight() { - return weight; - } - - void setWeight(int weight) { - this.weight = weight; - current.set(0); - } - - long increaseCurrent() { - return current.addAndGet(weight); - } - - void sel(int total) { - current.addAndGet(-1L * total); - } - - long getLastUpdate() { - return lastUpdate; - } - - void setLastUpdate(long lastUpdate) { - this.lastUpdate = lastUpdate; - } - - } - - - @Override - public Host doSelect(Collection source) { - - List hosts = new ArrayList<>(source); - String key = hosts.get(0).getWorkGroup(); - ConcurrentMap map = workGroupWeightMap.get(key); - if (map == null) { - workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>()); - map = workGroupWeightMap.get(key); - } - - int totalWeight = 0; - long maxCurrent = Long.MIN_VALUE; - long now = System.currentTimeMillis(); - Host selectedHost = null; - WeightedRoundRobin selectWeightRoundRobin = null; - - for (Host host : hosts) { - String workGroupHost = host.getWorkGroup() + host.getAddress(); - WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost); - int weight = host.getWeight(); - if (weight < 0) { - weight = 0; - } - - if (weightedRoundRobin == null) { - weightedRoundRobin = new WeightedRoundRobin(); - // set weight - weightedRoundRobin.setWeight(weight); - map.putIfAbsent(workGroupHost, weightedRoundRobin); - weightedRoundRobin = map.get(workGroupHost); - } - if (weight != weightedRoundRobin.getWeight()) { - weightedRoundRobin.setWeight(weight); - } - - long cur = weightedRoundRobin.increaseCurrent(); - weightedRoundRobin.setLastUpdate(now); - if (cur > maxCurrent) { - maxCurrent = cur; - selectedHost = host; - selectWeightRoundRobin = weightedRoundRobin; - } - - totalWeight += weight; - } - - - if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) { - try { - ConcurrentMap newMap = new ConcurrentHashMap<>(map); - newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); - workGroupWeightMap.put(key, newMap); - } finally { - updateLock.set(false); - } - } - - if (selectedHost != null) { - selectWeightRoundRobin.sel(totalWeight); - return selectedHost; - } - - return hosts.get(0); - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java index 2e2b1a734e..60f25294bb 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/future/RpcFuture.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.future; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java index a5f13a8343..0da43ce0dc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.remote; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java index d6c9387b5b..da1b3c47f6 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.remote; import io.netty.channel.ChannelHandler; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java index 091a697ff3..eca355a483 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.remote; import io.netty.bootstrap.ServerBootstrap; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java index 0b75d37b66..e429af3ce3 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.rpc.remote; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java deleted file mode 100644 index 8e8d68ae39..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/AbstractSelector.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.selector; - -import org.apache.dolphinscheduler.common.utils.CollectionUtils; - -import java.util.Collection; - -/** - * AbstractSelector - */ -public abstract class AbstractSelector implements Selector{ - @Override - public T select(Collection source) { - - if (CollectionUtils.isEmpty(source)) { - throw new IllegalArgumentException("Empty source."); - } - - /** - * if only one , return directly - */ - if (source.size() == 1) { - return (T)source.toArray()[0]; - } - return doSelect(source); - } - - protected abstract T doSelect(Collection source); - -} \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java deleted file mode 100644 index 0a8c1b3666..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/RandomSelector.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.selector; - -import org.apache.dolphinscheduler.remote.utils.Host; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -/** - * RandomSelector - */ -public class RandomSelector extends AbstractSelector { - - @Override - public Host doSelect(final Collection source) { - - List hosts = new ArrayList<>(source); - int size = hosts.size(); - int[] weights = new int[size]; - int totalWeight = 0; - int index = 0; - - for (Host host : hosts) { - totalWeight += host.getWeight(); - weights[index] = host.getWeight(); - index++; - } - - if (totalWeight > 0) { - int offset = ThreadLocalRandom.current().nextInt(totalWeight); - - for (int i = 0; i < size; i++) { - offset -= weights[i]; - if (offset < 0) { - return hosts.get(i); - } - } - } - return hosts.get(ThreadLocalRandom.current().nextInt(size)); - } - -} - diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java deleted file mode 100644 index d511294496..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/selector/Selector.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.dolphinscheduler.remote.rpc.selector; - -import java.util.Collection; - -/** - * Selector - */ -public interface Selector { - - /** - * select - * @param source source - * @return T - */ - T select(Collection source); -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java index 0277273ef5..96ed34d619 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/serialize/ProtoStuffUtils.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.remote.serialize; import io.protostuff.LinkedBuffer;