From d223d654ccfbf5621c4faeed1b7fe45715bf5a6b Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 27 Aug 2023 20:59:55 +0800 Subject: [PATCH] Add rpc benchmark test (#14797) --- .github/CODEOWNERS | 2 +- docs/docs/en/about/glossary.md | 2 +- docs/docs/zh/about/glossary.md | 2 +- .../api/aspect/CacheEvictAspect.java | 2 +- .../pause/pause/PauseExecuteFunction.java | 2 +- .../instance/stop/StopExecuteFunction.java | 4 +- .../api/service/impl/ExecutorServiceImpl.java | 8 +- .../api/service/impl/LoggerServiceImpl.java | 8 +- .../impl/MetricsCleanUpServiceImpl.java | 2 +- .../service/impl/TaskInstanceServiceImpl.java | 8 +- .../JdkDynamicRpcClientProxyFactory.java | 36 +++++-- ...gletonJdkDynamicRpcClientProxyFactory.java | 14 +-- .../base/server/JdkDynamicServerHandler.java | 2 +- ...onJdkDynamicRpcClientProxyFactoryTest.java | 94 +++++++++++++++++++ .../master/event/TaskDelayEventHandler.java | 2 +- .../master/event/TaskResultEventHandler.java | 2 +- .../master/event/TaskRunningEventHandler.java | 2 +- .../event/TaskUpdatePidEventHandler.java | 2 +- .../runner/StreamTaskExecuteRunnable.java | 2 +- .../runner/WorkflowExecuteRunnable.java | 10 +- .../runner/WorkflowExecuteThreadPool.java | 2 +- .../dispatcher/MasterTaskDispatcher.java | 2 +- .../dispatcher/WorkerTaskDispatcher.java | 2 +- ...TaskInstanceExecuteRunningEventSender.java | 2 +- ...askInstanceExecutionFinishEventSender.java | 2 +- .../LogicTaskExecuteRunnableKillOperator.java | 2 +- ...LogicTaskExecuteRunnablePauseOperator.java | 2 +- ...gicTaskExecuteRunnableTimeoutOperator.java | 2 +- .../TaskExecuteRunnableKillOperator.java | 2 +- .../TaskExecuteRunnablePauseOperator.java | 2 +- .../TaskExecuteRunnableTimeoutOperator.java | 2 +- .../runner/task/dynamic/DynamicLogicTask.java | 2 +- .../subworkflow/SubWorkflowLogicTask.java | 2 +- .../master/service/WorkerFailoverService.java | 2 +- dolphinscheduler-microbench/pom.xml | 5 + .../microbench/rpc/IService.java | 29 ++++++ .../microbench/rpc/IServiceImpl.java | 27 ++++++ .../microbench/rpc/RpcBenchMarkTest.java | 76 +++++++++++++++ .../service/process/ProcessServiceImpl.java | 8 +- ...askInstanceExecutionFinishEventSender.java | 2 +- ...nstanceExecutionInfoUpdateEventSender.java | 2 +- ...skInstanceExecutionRunningEventSender.java | 2 +- .../runner/WorkerTaskExecuteRunnable.java | 2 +- 43 files changed, 317 insertions(+), 70 deletions(-) create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java create mode 100644 dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java create mode 100644 dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java create mode 100644 dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3462cff0fa..b82253d7aa 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -33,7 +33,7 @@ /dolphinscheduler-master/ @caishunfeng @SbloodyS @ruanwenjun /dolphinscheduler-worker/ @caishunfeng @SbloodyS @ruanwenjun /dolphinscheduler-service/ @caishunfeng -/dolphinscheduler-remote/ @caishunfeng +/dolphinscheduler-extract/ @caishunfeng @ruanwenjun /dolphinscheduler-spi/ @caishunfeng /dolphinscheduler-task-plugin/ @caishunfeng @SbloodyS @zhuangchong /dolphinscheduler-tools/ @caishunfeng @SbloodyS @zhongjiajie @EricGao888 diff --git a/docs/docs/en/about/glossary.md b/docs/docs/en/about/glossary.md index 29f0da5a5a..e3cee76f14 100644 --- a/docs/docs/en/about/glossary.md +++ b/docs/docs/en/about/glossary.md @@ -61,7 +61,7 @@ process fails and ends - dolphinscheduler-dao provides operations such as database access. -- dolphinscheduler-remote client and server based on netty +- dolphinscheduler-extract dolphinscheduler extract module, providing master/worker/alert sdk. - dolphinscheduler-service service module, including Quartz, Zookeeper, log client access service, easy to call server module and api module diff --git a/docs/docs/zh/about/glossary.md b/docs/docs/zh/about/glossary.md index 2358b2c5d1..2b5876669e 100644 --- a/docs/docs/zh/about/glossary.md +++ b/docs/docs/zh/about/glossary.md @@ -45,7 +45,7 @@ - dolphinscheduler-dao 提供数据库访问等操作。 -- dolphinscheduler-remote 基于 netty 的客户端、服务端 +- dolphinscheduler-extract extract模块,包含master/worker/alert的sdk - dolphinscheduler-service service模块,包含Quartz、Zookeeper、日志客户端访问服务,便于server模块和api模块调用 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java index cf6df2823a..53dff28aff 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/CacheEvictAspect.java @@ -147,7 +147,7 @@ public class CacheEvictAspect { return; } for (Server server : serverList) { - IMasterCacheService masterCacheService = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + IMasterCacheService masterCacheService = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(server.getHost() + ":" + server.getPort(), IMasterCacheService.class); masterCacheService.cacheExpire(new CacheExpireRequest(cacheType, cacheKey)); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java index 5df321628a..e8d6de590d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/instance/pause/pause/PauseExecuteFunction.java @@ -58,7 +58,7 @@ public class PauseExecuteFunction implements ExecuteFunction> proxyClientCache = new ConcurrentHashMap<>(); + private static final LoadingCache> proxyClientCache = CacheBuilder.newBuilder() + // expire here to remove dead host + .expireAfterAccess(Duration.ofHours(1)) + .build(new CacheLoader>() { + + @Override + public Map load(String key) { + return new ConcurrentHashMap<>(); + } + }); public JdkDynamicRpcClientProxyFactory(NettyRemotingClient nettyRemotingClient) { this.nettyRemotingClient = nettyRemotingClient; } + @SneakyThrows @SuppressWarnings("unchecked") @Override public T getProxyClient(String serverHost, Class clientInterface) { - return (T) proxyClientCache - .computeIfAbsent(serverHost, key -> new ConcurrentHashMap<>()) - .computeIfAbsent(clientInterface.getName(), - key -> Proxy.newProxyInstance( - clientInterface.getClassLoader(), new Class[]{clientInterface}, - new ClientInvocationHandler(Host.of(serverHost), nettyRemotingClient))); + return (T) proxyClientCache.get(serverHost) + .computeIfAbsent(clientInterface.getName(), key -> newProxyClient(serverHost, clientInterface)); + } + + @SuppressWarnings("unchecked") + private T newProxyClient(String serverHost, Class clientInterface) { + return (T) Proxy.newProxyInstance( + clientInterface.getClassLoader(), + new Class[]{clientInterface}, + new ClientInvocationHandler(Host.of(serverHost), nettyRemotingClient)); } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java index 44e0420272..28d82532be 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactory.java @@ -20,17 +20,13 @@ package org.apache.dolphinscheduler.extract.base.client; import org.apache.dolphinscheduler.extract.base.NettyRemotingClientFactory; import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig; -public class SingletonJdkDynamicRpcClientProxyFactory extends JdkDynamicRpcClientProxyFactory { +public class SingletonJdkDynamicRpcClientProxyFactory { - private static final SingletonJdkDynamicRpcClientProxyFactory INSTANCE = - new SingletonJdkDynamicRpcClientProxyFactory(); + private static final JdkDynamicRpcClientProxyFactory INSTANCE = new JdkDynamicRpcClientProxyFactory( + NettyRemotingClientFactory.buildNettyRemotingClient(new NettyClientConfig())); - private SingletonJdkDynamicRpcClientProxyFactory() { - super(NettyRemotingClientFactory.buildNettyRemotingClient(new NettyClientConfig())); - } - - public static SingletonJdkDynamicRpcClientProxyFactory getInstance() { - return INSTANCE; + public static T getProxyClient(String serverAddress, Class clazz) { + return INSTANCE.getProxyClient(serverAddress, clazz); } } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java index 7e5feab05f..a98362209d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java @@ -96,7 +96,7 @@ public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter { StandardRpcRequest standardRpcRequest = JsonSerializer.deserialize(transporter.getBody(), StandardRpcRequest.class); Object[] args; - if (standardRpcRequest.getArgs().length == 0) { + if (standardRpcRequest.getArgs() == null || standardRpcRequest.getArgs().length == 0) { args = null; } else { args = new Object[standardRpcRequest.getArgs().length]; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java new file mode 100644 index 0000000000..3cf9ff1c89 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/client/SingletonJdkDynamicRpcClientProxyFactoryTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.base.client; + +import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; +import org.apache.dolphinscheduler.extract.base.RpcMethod; +import org.apache.dolphinscheduler.extract.base.RpcService; +import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; +import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SingletonJdkDynamicRpcClientProxyFactoryTest { + + private NettyRemotingServer nettyRemotingServer; + + @BeforeEach + public void setUp() { + nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(12345)); + nettyRemotingServer.start(); + + new SpringServerMethodInvokerDiscovery(nettyRemotingServer) + .postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl"); + } + + @Test + public void getProxyClient() { + IService proxyClient = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class); + Assertions.assertNotNull(proxyClient); + } + + @Test + public void testPing() { + IService proxyClient = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class); + String ping = proxyClient.ping("ping"); + Assertions.assertEquals("pong", ping); + } + + @Test + public void testVoid() { + IService proxyClient = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class); + Assertions.assertDoesNotThrow(proxyClient::voidMethod); + } + + @AfterEach + public void tearDown() { + nettyRemotingServer.close(); + } + + @RpcService + public interface IService { + + @RpcMethod + String ping(String ping); + + @RpcMethod + void voidMethod(); + } + + public static class IServiceImpl implements IService { + + @Override + public String ping(String ping) { + return "pong"; + } + + @Override + public void voidMethod() { + System.out.println("void method"); + } + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java index 25ed3283ed..f749ac46cd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskDelayEventHandler.java @@ -114,7 +114,7 @@ public class TaskDelayEventHandler implements TaskEventHandler { private void sendAckToWorker(TaskEvent taskEvent) { // If event handle success, send ack to worker to otherwise the worker will retry this event ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); instanceExecutionEventAckListener.handleTaskInstanceExecutionRunningEventAck( TaskInstanceExecutionRunningEventAck.success(taskEvent.getTaskInstanceId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java index f331dfd360..854812264f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -116,7 +116,7 @@ public class TaskResultEventHandler implements TaskEventHandler { public void sendAckToWorker(TaskEvent taskEvent) { ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); instanceExecutionEventAckListener.handleTaskInstanceExecutionFinishEventAck( TaskInstanceExecutionFinishEventAck.success(taskEvent.getTaskInstanceId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java index 03300f2ddb..14cb8571d9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRunningEventHandler.java @@ -110,7 +110,7 @@ public class TaskRunningEventHandler implements TaskEventHandler { private void sendAckToWorker(TaskEvent taskEvent) { // If event handle success, send ack to worker to otherwise the worker will retry this event ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); instanceExecutionEventAckListener.handleTaskInstanceExecutionRunningEventAck( TaskInstanceExecutionRunningEventAck.success(taskEvent.getTaskInstanceId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java index f684322875..5be19ddfdb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskUpdatePidEventHandler.java @@ -94,7 +94,7 @@ public class TaskUpdatePidEventHandler implements TaskEventHandler { private void sendAckToWorker(TaskEvent taskEvent) { // If event handle success, send ack to worker to otherwise the worker will retry this event ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); instanceExecutionEventAckListener.handleTaskInstanceExecutionInfoEventAck( TaskInstanceExecutionInfoEventAck.success(taskEvent.getTaskInstanceId())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java index a4b4f083d9..24e524d171 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java @@ -473,7 +473,7 @@ public class StreamTaskExecuteRunnable implements Runnable { private void sendAckToWorker(TaskEvent taskEvent) { // If event handle success, send ack to worker to otherwise the worker will retry this event ITaskInstanceExecutionEventAckListener instanceExecutionEventAckListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskEvent.getWorkerAddress(), ITaskInstanceExecutionEventAckListener.class); if (taskEvent.getEvent() == TaskEventType.RUNNING) { log.error("taskEvent.getChannel() is null, taskEvent:{}", taskEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index b3fc7c4960..9c5e65160f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -60,6 +60,7 @@ import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils; import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; import org.apache.dolphinscheduler.extract.master.transportor.TaskInstanceWakeupRequest; +import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; @@ -466,7 +467,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } else { ProcessInstance processInstance = processService.findProcessInstanceById(nextTaskInstance.getProcessInstanceId()); - ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(processInstance.getHost(), ILogicTaskInstanceOperator.class); taskInstanceOperator.wakeupTaskInstance( new TaskInstanceWakeupRequest(processInstance.getId(), nextTaskInstance.getId())); @@ -1385,10 +1386,9 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return false; } try { - org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator iTaskInstanceOperator = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() - .getProxyClient(taskInstance.getHost(), - org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator.class); + ITaskInstanceOperator iTaskInstanceOperator = + SingletonJdkDynamicRpcClientProxyFactory + .getProxyClient(taskInstance.getHost(), ITaskInstanceOperator.class); UpdateWorkflowHostResponse updateWorkflowHostResponse = iTaskInstanceOperator.updateWorkflowInstanceHost( new UpdateWorkflowHostRequest(taskInstance.getId(), masterConfig.getMasterAddress())); if (!updateWorkflowHostResponse.isSuccess()) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java index 8fe1a9a683..c0743b5a2c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThreadPool.java @@ -226,7 +226,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor { return; } ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(processInstanceHost, ITaskInstanceExecutionEventListener.class); WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent( diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java index 4a4ca42c58..d30a1e554d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java @@ -51,7 +51,7 @@ public class MasterTaskDispatcher extends BaseTaskDispatcher { protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException { TaskExecutionContext taskExecutionContext = taskExecuteRunnable.getTaskExecutionContext(); try { - ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskExecutionContext.getHost(), ILogicTaskInstanceOperator.class); LogicTaskDispatchResponse logicTaskDispatchResponse = taskInstanceOperator .dispatchLogicTask(new LogicTaskDispatchRequest(taskExecuteRunnable.getTaskExecutionContext())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java index 6760633b9b..36739a1163 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java @@ -56,7 +56,7 @@ public class WorkerTaskDispatcher extends BaseTaskDispatcher { protected void doDispatch(TaskExecuteRunnable taskExecuteRunnable) throws TaskDispatchException { TaskExecutionContext taskExecutionContext = taskExecuteRunnable.getTaskExecutionContext(); try { - ITaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + ITaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskExecutionContext.getHost(), ITaskInstanceOperator.class); TaskInstanceDispatchResponse taskInstanceDispatchResponse = taskInstanceOperator .dispatchTask(new TaskInstanceDispatchRequest(taskExecuteRunnable.getTaskExecutionContext())); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java index 1dcca72878..6ea085b31b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecuteRunningEventSender.java @@ -34,7 +34,7 @@ public class LogicTaskInstanceExecuteRunningEventSender @Override public void sendMessage(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstanceExecutionRunningEvent.getHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java index ad5ea37ec6..1949145e86 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/message/LogicTaskInstanceExecutionFinishEventSender.java @@ -32,7 +32,7 @@ public class LogicTaskInstanceExecutionFinishEventSender @Override public void sendMessage(TaskInstanceExecutionFinishEvent message) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(message.getHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(message); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java index 29ef03906d..e7c7b31da1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableKillOperator.java @@ -45,7 +45,7 @@ public class LogicTaskExecuteRunnableKillOperator extends BaseTaskExecuteRunnabl taskInstance.getName()); return; } - final ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + final ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ILogicTaskInstanceOperator.class); final LogicTaskKillRequest logicTaskKillRequest = new LogicTaskKillRequest(taskInstance.getId()); final LogicTaskKillResponse logicTaskKillResponse = taskInstanceOperator.killLogicTask(logicTaskKillRequest); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java index af865f05f6..4c1c77f24a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnablePauseOperator.java @@ -40,7 +40,7 @@ public class LogicTaskExecuteRunnablePauseOperator extends BaseTaskExecuteRunnab taskInstance.getName()); return; } - final ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + final ILogicTaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ILogicTaskInstanceOperator.class); final LogicTaskPauseRequest logicTaskPauseRequest = new LogicTaskPauseRequest(taskInstance.getId()); final LogicTaskPauseResponse logicTaskPauseResponse = diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java index 4cf16bb2c7..949bc940f5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableTimeoutOperator.java @@ -47,7 +47,7 @@ public class LogicTaskExecuteRunnableTimeoutOperator extends BaseTaskExecuteRunn } final ILogicTaskInstanceOperator iLogicTaskInstanceOperator = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ILogicTaskInstanceOperator.class); final LogicTaskKillRequest taskInstanceKillRequest = new LogicTaskKillRequest(taskInstance.getId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java index b80cb79561..dc9915f901 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableKillOperator.java @@ -43,7 +43,7 @@ public class TaskExecuteRunnableKillOperator extends BaseTaskExecuteRunnableKill log.info("TaskInstance {} host is empty, no need to killRemoteTask", taskInstance.getName()); return; } - ITaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + ITaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ITaskInstanceOperator.class); TaskInstanceKillRequest taskInstanceKillRequest = new TaskInstanceKillRequest(taskInstance.getId()); TaskInstanceKillResponse taskInstanceKillResponse = taskInstanceOperator.killTask(taskInstanceKillRequest); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java index a0636b8504..39896bb0cb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java @@ -48,7 +48,7 @@ public class TaskExecuteRunnablePauseOperator implements TaskExecuteRunnableOper log.info("The TaskInstance: {} host is null, no need to pauseRemoteTaskInstance", taskInstance.getName()); return; } - final ITaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + final ITaskInstanceOperator taskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ITaskInstanceOperator.class); final TaskInstancePauseRequest taskInstancePauseRequest = new TaskInstancePauseRequest(taskInstance.getId()); final TaskInstancePauseResponse taskInstancePauseResponse = diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java index 7154f5fa62..9356b24370 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableTimeoutOperator.java @@ -45,7 +45,7 @@ public class TaskExecuteRunnableTimeoutOperator extends BaseTaskExecuteRunnableT return; } - final ITaskInstanceOperator iTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + final ITaskInstanceOperator iTaskInstanceOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), ITaskInstanceOperator.class); final TaskInstanceKillRequest taskInstanceKillRequest = new TaskInstanceKillRequest(taskInstance.getId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java index 25b53c8d6d..3baa10b343 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/dynamic/DynamicLogicTask.java @@ -286,7 +286,7 @@ public class DynamicLogicTask extends BaseAsyncLogicTask { private void sendToSubProcess(TaskExecutionContext taskExecutionContext, ProcessInstance subProcessInstance) { final ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(subProcessInstance.getHost(), ITaskInstanceExecutionEventListener.class); final WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent( taskExecutionContext.getProcessInstanceId(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index 8e55e6fc82..1883a27d8b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -136,7 +136,7 @@ public class SubWorkflowLogicTask extends BaseAsyncLogicTask + + org.apache.dolphinscheduler + dolphinscheduler-extract-base + + org.openjdk.jmh jmh-core diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java new file mode 100644 index 0000000000..0a1122aa18 --- /dev/null +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.microbench.rpc; + +import org.apache.dolphinscheduler.extract.base.RpcMethod; +import org.apache.dolphinscheduler.extract.base.RpcService; + +@RpcService +public interface IService { + + @RpcMethod + String ping(String pingRequest); + +} diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java new file mode 100644 index 0000000000..0bd83ecc29 --- /dev/null +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/IServiceImpl.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.microbench.rpc; + +public class IServiceImpl implements IService { + + @Override + public String ping(String pingRequest) { + return "I get " + pingRequest + ", I am Pong!"; + } + +} diff --git a/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java new file mode 100644 index 0000000000..fd423bcda0 --- /dev/null +++ b/dolphinscheduler-microbench/src/main/java/org/apache/dolphinscheduler/microbench/rpc/RpcBenchMarkTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.microbench.rpc; + +import org.apache.dolphinscheduler.extract.base.NettyRemotingServer; +import org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcClientProxyFactory; +import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig; +import org.apache.dolphinscheduler.extract.base.server.SpringServerMethodInvokerDiscovery; +import org.apache.dolphinscheduler.microbench.base.AbstractBaseBenchmark; + +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Slf4j +@Warmup(iterations = 5, time = 1) +@Measurement(iterations = 10, time = 1) +@State(Scope.Benchmark) +@BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime}) +public class RpcBenchMarkTest extends AbstractBaseBenchmark { + + private NettyRemotingServer nettyRemotingServer; + + private IService iService; + + @Setup + public void before() { + nettyRemotingServer = new NettyRemotingServer(new NettyServerConfig(12345)); + nettyRemotingServer.start(); + SpringServerMethodInvokerDiscovery springServerMethodInvokerDiscovery = + new SpringServerMethodInvokerDiscovery(nettyRemotingServer); + springServerMethodInvokerDiscovery.postProcessAfterInitialization(new IServiceImpl(), "iServiceImpl"); + iService = + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient("localhost:12345", IService.class); + } + + @Benchmark + @BenchmarkMode({Mode.Throughput, Mode.AverageTime, Mode.SampleTime}) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void sendTest(Blackhole bh) { + String pong = iService.ping("ping"); + bh.consume(pong); + } + + @TearDown + public void after() { + nettyRemotingServer.close(); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 82bbef30de..a47fdac8a5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -389,8 +389,8 @@ public class ProcessServiceImpl implements ProcessService { if (update) { try { final ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() - .getProxyClient(info.getHost(), ITaskInstanceExecutionEventListener.class); + SingletonJdkDynamicRpcClientProxyFactory.getProxyClient(info.getHost(), + ITaskInstanceExecutionEventListener.class); final WorkflowInstanceStateChangeEvent workflowInstanceStateChangeEvent = new WorkflowInstanceStateChangeEvent(info.getId(), 0, info.getState(), info.getId(), 0); iTaskInstanceExecutionEventListener @@ -516,11 +516,11 @@ public class ProcessServiceImpl implements ProcessService { continue; } if (TaskUtils.isLogicTask(taskInstance.getTaskType())) { - IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + IMasterLogService masterLogService = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), IMasterLogService.class); masterLogService.removeLogicTaskInstanceLog(taskLogPath); } else { - IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + IWorkerLogService iWorkerLogService = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstance.getHost(), IWorkerLogService.class); iWorkerLogService.removeTaskInstanceLog(taskLogPath); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java index 27a9de73ae..9469ba1a69 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionFinishEventSender.java @@ -33,7 +33,7 @@ public class TaskInstanceExecutionFinishEventSender @Override public void sendEvent(TaskInstanceExecutionFinishEvent taskInstanceExecutionFinishEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstanceExecutionFinishEvent.getHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionFinish(taskInstanceExecutionFinishEvent); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java index e5350a7c0c..4b9e7e76b0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionInfoUpdateEventSender.java @@ -35,7 +35,7 @@ public class TaskInstanceExecutionInfoUpdateEventSender @Override public void sendEvent(TaskInstanceExecutionInfoEvent taskInstanceExecutionInfoEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstanceExecutionInfoEvent.getHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionInfoUpdate(taskInstanceExecutionInfoEvent); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java index 4f40940030..4f64a94002 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskInstanceExecutionRunningEventSender.java @@ -35,7 +35,7 @@ public class TaskInstanceExecutionRunningEventSender @Override public void sendEvent(TaskInstanceExecutionRunningEvent taskInstanceExecutionRunningEvent) { ITaskInstanceExecutionEventListener iTaskInstanceExecutionEventListener = - SingletonJdkDynamicRpcClientProxyFactory.getInstance() + SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(taskInstanceExecutionRunningEvent.getHost(), ITaskInstanceExecutionEventListener.class); iTaskInstanceExecutionEventListener.onTaskInstanceExecutionRunning(taskInstanceExecutionRunningEvent); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index 5fadf27ef0..15141ed747 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -257,7 +257,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { task.getExitStatus() == TaskExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode()); try { - IAlertOperator alertOperator = SingletonJdkDynamicRpcClientProxyFactory.getInstance() + IAlertOperator alertOperator = SingletonJdkDynamicRpcClientProxyFactory .getProxyClient(alertServerAddress.getAddress(), IAlertOperator.class); AlertSendResponse alertSendResponse = alertOperator.sendAlert(alertSendRequest); log.info("Send alert to: {} successfully, response: {}", alertServerAddress, alertSendResponse);