diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml
index 27f0923017..708fb94a5b 100644
--- a/dolphinscheduler-remote/pom.xml
+++ b/dolphinscheduler-remote/pom.xml
@@ -69,6 +69,16 @@
junit
test
+
+ org.reflections
+ reflections
+ 0.9.11
+
+
+
+ com.google.guava
+ guava
+
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
index 53d56f5c51..4115184f22 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java
@@ -22,15 +22,18 @@ public class MainTest {
Host host = new Host("127.0.0.1", 12636);
IRpcClient rpcClient = new RpcClient();
- UserService userService = rpcClient.create(UserService.class, host);
+ IUserService userService = rpcClient.create(IUserService.class, host);
boolean result = userService.say("calvin");
System.out.println("异步回掉成功" + result);
System.out.println(userService.hi(10));
System.out.println(userService.hi(188888888));
- UserService user = rpcClient.create(UserService.class, host);
+ IUserService user = rpcClient.create(IUserService.class, host);
System.out.println(user.hi(99999));
System.out.println(user.hi(998888888));
+
+ System.out.println(IUserService.class.getSimpleName());
+ System.out.println(UserService.class.getSimpleName());
}
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
index a8f3c35adc..65734074fe 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/UserService.java
@@ -1,18 +1,22 @@
package org.apache.dolphinscheduler.remote.rpc;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
+import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
/**
* @author jiangli
* @date 2021-01-11 21:05
*/
-public class UserService {
+@RpcService("IUserService")
+public class UserService implements IUserService{
@Rpc(async = true, serviceCallback = UserCallback.class, retries = 9999)
+ @Override
public Boolean say(String s) {
return true;
}
+ @Override
public String hi(int num) {
return "this world has " + num + "sun";
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java
new file mode 100644
index 0000000000..4f2407413b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/base/RpcService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.rpc.base;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RpcService {
+ String value() default "";
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java
index 757bb0778b..897eb688cc 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerConfig.java
@@ -25,7 +25,9 @@ import org.apache.dolphinscheduler.remote.rpc.common.ConsumerConfigConstants;
*/
public class ConsumerConfig {
- private Class extends AbstractRpcCallBack> callBackClass;
+ private Class extends AbstractRpcCallBack> serviceCallBackClass;
+
+ private Class extends AbstractRpcCallBack> ackCallBackClass;
private String serviceName;
@@ -33,13 +35,20 @@ public class ConsumerConfig {
private Integer retries = ConsumerConfigConstants.DEFAULT_RETRIES;
- public Class extends AbstractRpcCallBack> getCallBackClass() {
- return callBackClass;
+ public Class extends AbstractRpcCallBack> getServiceCallBackClass() {
+ return serviceCallBackClass;
+ }
+
+ public void setServiceCallBackClass(Class extends AbstractRpcCallBack> serviceCallBackClass) {
+ this.serviceCallBackClass = serviceCallBackClass;
+ }
+
+ public Class extends AbstractRpcCallBack> getAckCallBackClass() {
+ return ackCallBackClass;
}
- //set call back class
- void setCallBackClass(Class extends AbstractRpcCallBack> callBackClass) {
- this.callBackClass = callBackClass;
+ public void setAckCallBackClass(Class extends AbstractRpcCallBack> ackCallBackClass) {
+ this.ackCallBackClass = ackCallBackClass;
}
public String getServiceName() {
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
index 79bf01718a..2b90c429b3 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java
@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.remote.rpc.client;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.rpc.Invoker;
import org.apache.dolphinscheduler.remote.rpc.base.Rpc;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
@@ -37,9 +36,6 @@ import net.bytebuddy.implementation.bind.annotation.RuntimeType;
*/
public class ConsumerInterceptor {
- private Invoker invoker;
-
-
private Host host;
private NettyClient nettyClient = NettyClient.getInstance();
@@ -52,7 +48,7 @@ public class ConsumerInterceptor {
public Object intercept(@AllArguments Object[] args, @Origin Method method) throws RemotingException {
RpcRequest request = buildReq(args, method);
- String serviceName = method.getDeclaringClass().getName() + method.getName();
+ String serviceName = method.getDeclaringClass().getSimpleName() + method.getName();
ConsumerConfig consumerConfig = ConsumerConfigCache.getConfigByServersName(serviceName);
if (null == consumerConfig) {
consumerConfig = cacheServiceConfig(method, serviceName);
@@ -76,7 +72,7 @@ public class ConsumerInterceptor {
private RpcRequest buildReq(Object[] args, Method method) {
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
- request.setClassName(method.getDeclaringClass().getName());
+ request.setClassName(method.getDeclaringClass().getSimpleName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
@@ -94,7 +90,8 @@ public class ConsumerInterceptor {
if (annotationPresent) {
Rpc rpc = method.getAnnotation(Rpc.class);
consumerConfig.setAsync(rpc.async());
- consumerConfig.setCallBackClass(rpc.callback());
+ consumerConfig.setServiceCallBackClass(rpc.serviceCallback());
+ consumerConfig.setAckCallBackClass(rpc.ackCallback());
consumerConfig.setRetries(rpc.retries());
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java
new file mode 100644
index 0000000000..c2f8a8f1dd
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RequestEventType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.rpc.common;
+
+public enum RequestEventType {
+
+ HEARTBEAT((byte)1,"heartbeat"),
+ BUSINESS((byte)2,"business request");
+
+
+ private Byte type;
+
+ private String description;
+
+ RequestEventType(Byte type, String description) {
+ this.type = type;
+ this.description = description;
+ }
+
+ public Byte getType() {
+ return type;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java
new file mode 100644
index 0000000000..4c0d72181c
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/ResponseEventType.java
@@ -0,0 +1,41 @@
+package org.apache.dolphinscheduler.remote.rpc.common;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public enum ResponseEventType {
+
+ ACK((byte) 1, "ack"),
+ BUSINESS_RSP((byte) 2, "business response");
+
+ private Byte type;
+
+ private String description;
+
+ ResponseEventType(Byte type, String description) {
+ this.type = type;
+ this.description = description;
+ }
+
+
+ public Byte getType() {
+ return type;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java
index 7d2480f642..3c8732c61f 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcRequest.java
@@ -27,9 +27,13 @@ public class RpcRequest {
private String methodName;
private Class>[] parameterTypes;
private Object[] parameters;
- // 0 hear beat,1 businness msg
+ /**
+ * @see RequestEventType
+ */
private Byte eventType = 1;
+ private Boolean ack;
+
public Byte getEventType() {
return eventType;
}
@@ -77,4 +81,12 @@ public class RpcRequest {
public void setParameters(Object[] parameters) {
this.parameters = parameters;
}
+
+ public Boolean getAck() {
+ return ack;
+ }
+
+ public void setAck(Boolean ack) {
+ this.ack = ack;
+ }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java
index 0a8c5ef799..9e6db6ec11 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/common/RpcResponse.java
@@ -26,7 +26,11 @@ public class RpcResponse {
private String msg;
private Object result;
private Byte status;
- private Integer responseType;
+
+ /**
+ * @see ResponseEventType
+ */
+ private Byte responseType;
public String getRequestId() {
return requestId;
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java
new file mode 100644
index 0000000000..a0fe93c5be
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/config/ServiceBean.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.rpc.config;
+
+import org.apache.dolphinscheduler.remote.rpc.IUserService;
+import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ServiceBean {
+
+ private static final Logger logger = LoggerFactory.getLogger(ServiceBean.class);
+
+ private static Map serviceMap = new HashMap<>();
+
+ private static AtomicBoolean initialized = new AtomicBoolean(false);
+
+ private static synchronized void init() {
+ Reflections f = new Reflections("org/apache/dolphinscheduler/remote/rpc");
+
+
+ List> list = new ArrayList<>(f.getTypesAnnotatedWith(RpcService.class));
+ list.forEach(rpcClass -> {
+ RpcService rpcService = rpcClass.getAnnotation(RpcService.class);
+ serviceMap.put(rpcService.value(), rpcClass);
+ });
+ }
+
+ public static void main(String[] args) {
+ init();
+ }
+
+ public static Class getServiceClass(String className) {
+ if (initialized.get()) {
+ return (Class) serviceMap.get(className);
+ } else {
+ init();
+ }
+ return (Class) serviceMap.get(className);
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
index da1b3c47f6..97d4bb0444 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java
@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
+import java.lang.reflect.InvocationTargetException;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
@@ -75,7 +76,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
new FastThreadLocalThread(() -> {
try {
if (rsp.getStatus() == 0) {
- consumerConfig.getCallBackClass().newInstance().run(rsp.getResult());
+ try {
+ consumerConfig.getServiceCallBackClass().getDeclaredConstructor().newInstance().run(rsp.getResult());
+ } catch (InvocationTargetException | NoSuchMethodException e) {
+ logger.error("rpc call back error",e);
+ }
+
} else {
logger.error("xxxx fail");
}
@@ -108,4 +114,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
logger.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close();
}
+
+ private void executeAsyncHandler(){
+
+ }
}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java
index e429af3ce3..f5ab779928 100644
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyServerHandler.java
@@ -17,10 +17,16 @@
package org.apache.dolphinscheduler.remote.rpc.remote;
+import org.apache.dolphinscheduler.remote.rpc.IUserService;
+import org.apache.dolphinscheduler.remote.rpc.base.RpcService;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
+import org.apache.dolphinscheduler.remote.rpc.config.ServiceBean;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,11 +68,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
RpcResponse response = new RpcResponse();
if (req.getEventType() == 0) {
- logger.info("接受心跳消息!...");
+ logger.info("accept heartbeat msg");
return;
}
response.setRequestId(req.getRequestId());
+
response.setStatus((byte) 0);
@@ -79,8 +86,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
Object[] arguments = req.getParameters();
Object result = null;
try {
-
- Class serviceClass = Class.forName(classname);
+ Class serviceClass = ServiceBean.getServiceClass(classname);
Object object = serviceClass.newInstance();
diff --git a/pom.xml b/pom.xml
index 63776bdb03..af4210dce4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1101,5 +1101,6 @@
dolphinscheduler-service
dolphinscheduler-spi
dolphinscheduler-microbench
+ dolphinscheduler-remote-connfig