From 0dab5cb04263bb558531a699f1c03a4d88ca811f Mon Sep 17 00:00:00 2001 From: Tboy Date: Thu, 19 Mar 2020 21:31:09 +0800 Subject: [PATCH] Refactor worker (#2241) * let quartz use the same datasource * move master/worker config from dao.properties to each config add master/worker registry test * move mybatis config from application.properties to SpringConnectionFactory * move mybatis-plus config from application.properties to SpringConnectionFactory * refactor TaskCallbackService --- .../worker/processor/TaskCallbackService.java | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 02d889ba4d..a508177010 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -21,13 +21,18 @@ package org.apache.dolphinscheduler.server.worker.processor; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -43,6 +48,12 @@ public class TaskCallbackService { */ private static final ConcurrentHashMap REMOTE_CHANNELS = new ConcurrentHashMap<>(); + /** + * zookeeper register center + */ + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; + /** * netty remoting client */ @@ -75,11 +86,26 @@ public class TaskCallbackService { } Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); if(newChannel != null){ - NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque()); - addRemoteChannel(taskInstanceId, remoteChannel); - return remoteChannel; + return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + } + logger.warn("original master : {} is not reachable, random select master", nettyRemoteChannel.getHost()); + Set masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); + if(CollectionUtils.isEmpty(masterNodes)){ + throw new IllegalStateException("no available master node exception"); + } + for(String masterNode : masterNodes){ + newChannel = nettyRemotingClient.getChannel(Host.of(masterNode)); + if(newChannel != null){ + return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId); + } } - return null; + throw new IllegalStateException(String.format("all available master nodes : %s are not reachable", masterNodes)); + } + + private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId){ + NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque); + addRemoteChannel(taskInstanceId, remoteChannel); + return remoteChannel; } /** @@ -97,11 +123,7 @@ public class TaskCallbackService { */ public void sendAck(int taskInstanceId, Command command){ NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); - if(nettyRemoteChannel == null){ - //TODO - } else{ - nettyRemoteChannel.writeAndFlush(command); - } + nettyRemoteChannel.writeAndFlush(command); } /** @@ -112,19 +134,15 @@ public class TaskCallbackService { */ public void sendResult(int taskInstanceId, Command command){ NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); - if(nettyRemoteChannel == null){ - //TODO - } else{ - nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){ - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ - remove(taskInstanceId); - return; - } + nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){ + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()){ + remove(taskInstanceId); + return; } - }); - } + } + }); } }