@ -21,13 +21,18 @@ package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel ;
import io.netty.channel.Channel ;
import io.netty.channel.ChannelFuture ;
import io.netty.channel.ChannelFuture ;
import io.netty.channel.ChannelFutureListener ;
import io.netty.channel.ChannelFutureListener ;
import org.apache.dolphinscheduler.common.utils.CollectionUtils ;
import org.apache.dolphinscheduler.remote.NettyRemotingClient ;
import org.apache.dolphinscheduler.remote.NettyRemotingClient ;
import org.apache.dolphinscheduler.remote.command.Command ;
import org.apache.dolphinscheduler.remote.command.Command ;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig ;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig ;
import org.apache.dolphinscheduler.remote.utils.Host ;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter ;
import org.slf4j.Logger ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.slf4j.LoggerFactory ;
import org.springframework.beans.factory.annotation.Autowired ;
import org.springframework.stereotype.Service ;
import org.springframework.stereotype.Service ;
import java.util.Set ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentHashMap ;
/ * *
/ * *
@ -43,6 +48,12 @@ public class TaskCallbackService {
* /
* /
private static final ConcurrentHashMap < Integer , NettyRemoteChannel > REMOTE_CHANNELS = new ConcurrentHashMap < > ( ) ;
private static final ConcurrentHashMap < Integer , NettyRemoteChannel > REMOTE_CHANNELS = new ConcurrentHashMap < > ( ) ;
/ * *
* zookeeper register center
* /
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter ;
/ * *
/ * *
* netty remoting client
* netty remoting client
* /
* /
@ -75,11 +86,26 @@ public class TaskCallbackService {
}
}
Channel newChannel = nettyRemotingClient . getChannel ( nettyRemoteChannel . getHost ( ) ) ;
Channel newChannel = nettyRemotingClient . getChannel ( nettyRemoteChannel . getHost ( ) ) ;
if ( newChannel ! = null ) {
if ( newChannel ! = null ) {
NettyRemoteChannel remoteChannel = new NettyRemoteChannel ( newChannel , nettyRemoteChannel . getOpaque ( ) ) ;
return getRemoteChannel ( newChannel , nettyRemoteChannel . getOpaque ( ) , taskInstanceId ) ;
addRemoteChannel ( taskInstanceId , remoteChannel ) ;
}
return remoteChannel ;
logger . warn ( "original master : {} is not reachable, random select master" , nettyRemoteChannel . getHost ( ) ) ;
Set < String > masterNodes = zookeeperRegistryCenter . getMasterNodesDirectly ( ) ;
if ( CollectionUtils . isEmpty ( masterNodes ) ) {
throw new IllegalStateException ( "no available master node exception" ) ;
}
for ( String masterNode : masterNodes ) {
newChannel = nettyRemotingClient . getChannel ( Host . of ( masterNode ) ) ;
if ( newChannel ! = null ) {
return getRemoteChannel ( newChannel , nettyRemoteChannel . getOpaque ( ) , taskInstanceId ) ;
}
}
}
return null ;
throw new IllegalStateException ( String . format ( "all available master nodes : %s are not reachable" , masterNodes ) ) ;
}
private NettyRemoteChannel getRemoteChannel ( Channel newChannel , long opaque , int taskInstanceId ) {
NettyRemoteChannel remoteChannel = new NettyRemoteChannel ( newChannel , opaque ) ;
addRemoteChannel ( taskInstanceId , remoteChannel ) ;
return remoteChannel ;
}
}
/ * *
/ * *
@ -97,11 +123,7 @@ public class TaskCallbackService {
* /
* /
public void sendAck ( int taskInstanceId , Command command ) {
public void sendAck ( int taskInstanceId , Command command ) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel ( taskInstanceId ) ;
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel ( taskInstanceId ) ;
if ( nettyRemoteChannel = = null ) {
nettyRemoteChannel . writeAndFlush ( command ) ;
//TODO
} else {
nettyRemoteChannel . writeAndFlush ( command ) ;
}
}
}
/ * *
/ * *
@ -112,19 +134,15 @@ public class TaskCallbackService {
* /
* /
public void sendResult ( int taskInstanceId , Command command ) {
public void sendResult ( int taskInstanceId , Command command ) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel ( taskInstanceId ) ;
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel ( taskInstanceId ) ;
if ( nettyRemoteChannel = = null ) {
nettyRemoteChannel . writeAndFlush ( command ) . addListener ( new ChannelFutureListener ( ) {
//TODO
} else {
@Override
nettyRemoteChannel . writeAndFlush ( command ) . addListener ( new ChannelFutureListener ( ) {
public void operationComplete ( ChannelFuture future ) throws Exception {
if ( future . isSuccess ( ) ) {
@Override
remove ( taskInstanceId ) ;
public void operationComplete ( ChannelFuture future ) throws Exception {
return ;
if ( future . isSuccess ( ) ) {
remove ( taskInstanceId ) ;
return ;
}
}
}
} ) ;
}
}
} ) ;
}
}
}
}