qiaozhanwei
5 years ago
123 changed files with 3038 additions and 1285 deletions
@ -1,137 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.api.log; |
||||
|
||||
import io.grpc.ManagedChannel; |
||||
import io.grpc.ManagedChannelBuilder; |
||||
import io.grpc.StatusRuntimeException; |
||||
import org.apache.dolphinscheduler.rpc.*; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* log client |
||||
*/ |
||||
public class LogClient { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LogClient.class); |
||||
|
||||
private final ManagedChannel channel; |
||||
private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub; |
||||
|
||||
/** |
||||
* construct client connecting to HelloWorld server at {@code host:port} |
||||
* |
||||
* @param host host |
||||
* @param port port |
||||
*/ |
||||
public LogClient(String host, int port) { |
||||
this(ManagedChannelBuilder.forAddress(host, port) |
||||
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
|
||||
// needing certificates.
|
||||
.usePlaintext(true)); |
||||
} |
||||
|
||||
/** |
||||
* construct client for accessing RouteGuide server using the existing channel |
||||
* |
||||
*/ |
||||
LogClient(ManagedChannelBuilder<?> channelBuilder) { |
||||
/** |
||||
* set max read size |
||||
*/ |
||||
channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE); |
||||
channel = channelBuilder.build(); |
||||
blockingStub = LogViewServiceGrpc.newBlockingStub(channel); |
||||
} |
||||
|
||||
/** |
||||
* shutdown |
||||
* |
||||
* @throws InterruptedException InterruptedException |
||||
*/ |
||||
public void shutdown() throws InterruptedException { |
||||
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); |
||||
} |
||||
|
||||
/** |
||||
* roll view log |
||||
* |
||||
* @param path path |
||||
* @param skipLineNum skip line number |
||||
* @param limit limit |
||||
* @return log content |
||||
*/ |
||||
public String rollViewLog(String path,int skipLineNum,int limit) { |
||||
logger.info("roll view log : path {},skipLineNum {} ,limit {}", path, skipLineNum, limit); |
||||
LogParameter pathParameter = LogParameter |
||||
.newBuilder() |
||||
.setPath(path) |
||||
.setSkipLineNum(skipLineNum) |
||||
.setLimit(limit) |
||||
.build(); |
||||
RetStrInfo retStrInfo; |
||||
try { |
||||
retStrInfo = blockingStub.rollViewLog(pathParameter); |
||||
return retStrInfo.getMsg(); |
||||
} catch (StatusRuntimeException e) { |
||||
logger.error("roll view log error", e); |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* view log |
||||
* |
||||
* @param path path |
||||
* @return log content |
||||
*/ |
||||
public String viewLog(String path) { |
||||
logger.info("view log path {}",path); |
||||
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build(); |
||||
RetStrInfo retStrInfo; |
||||
try { |
||||
retStrInfo = blockingStub.viewLog(pathParameter); |
||||
return retStrInfo.getMsg(); |
||||
} catch (StatusRuntimeException e) { |
||||
logger.error("view log error", e); |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get log size |
||||
* |
||||
* @param path log path |
||||
* @return log content bytes |
||||
*/ |
||||
public byte[] getLogBytes(String path) { |
||||
logger.info("log path {}",path); |
||||
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build(); |
||||
RetByteInfo retByteInfo; |
||||
try { |
||||
retByteInfo = blockingStub.getLogBytes(pathParameter); |
||||
return retByteInfo.getData().toByteArray(); |
||||
} catch (StatusRuntimeException e) { |
||||
logger.error("log size error", e); |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,44 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<parent> |
||||
<artifactId>dolphinscheduler</artifactId> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<version>1.2.1-SNAPSHOT</version> |
||||
</parent> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
|
||||
<artifactId>dolphinscheduler-remote</artifactId> |
||||
|
||||
<name>dolphinscheduler-remote</name> |
||||
<!-- FIXME change it to the project's website --> |
||||
<url>http://www.example.com</url> |
||||
|
||||
<properties> |
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
||||
<maven.compiler.source>1.7</maven.compiler.source> |
||||
<maven.compiler.target>1.7</maven.compiler.target> |
||||
</properties> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>io.netty</groupId> |
||||
<artifactId>netty-all</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.slf4j</groupId> |
||||
<artifactId>slf4j-api</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>com.alibaba</groupId> |
||||
<artifactId>fastjson</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>junit</groupId> |
||||
<artifactId>junit</artifactId> |
||||
<scope>test</scope> |
||||
</dependency> |
||||
|
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,197 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote; |
||||
|
||||
import io.netty.bootstrap.Bootstrap; |
||||
import io.netty.channel.*; |
||||
import io.netty.channel.nio.NioEventLoopGroup; |
||||
import io.netty.channel.socket.SocketChannel; |
||||
import io.netty.channel.socket.nio.NioSocketChannel; |
||||
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; |
||||
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
import org.apache.dolphinscheduler.remote.exceptions.RemotingException; |
||||
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.Address; |
||||
import org.apache.dolphinscheduler.remote.utils.Constants; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.ThreadFactory; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
/** |
||||
* remoting netty client |
||||
*/ |
||||
public class NettyRemotingClient { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); |
||||
|
||||
private final Bootstrap bootstrap = new Bootstrap(); |
||||
|
||||
private final NettyEncoder encoder = new NettyEncoder(); |
||||
|
||||
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(); |
||||
|
||||
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); |
||||
|
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false); |
||||
|
||||
private final NioEventLoopGroup workerGroup; |
||||
|
||||
private final NettyClientHandler clientHandler = new NettyClientHandler(this); |
||||
|
||||
private final NettyClientConfig clientConfig; |
||||
|
||||
public NettyRemotingClient(final NettyClientConfig clientConfig){ |
||||
this.clientConfig = clientConfig; |
||||
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
@Override |
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
this.start(); |
||||
} |
||||
|
||||
private void start(){ |
||||
|
||||
this.bootstrap |
||||
.group(this.workerGroup) |
||||
.channel(NioSocketChannel.class) |
||||
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive()) |
||||
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay()) |
||||
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize()) |
||||
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize()) |
||||
.handler(new ChannelInitializer<SocketChannel>() { |
||||
@Override |
||||
public void initChannel(SocketChannel ch) throws Exception { |
||||
ch.pipeline().addLast( |
||||
new NettyDecoder(), |
||||
clientHandler, |
||||
encoder); |
||||
} |
||||
}); |
||||
//
|
||||
isStarted.compareAndSet(false, true); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { |
||||
registerProcessor(commandType, processor, null); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { |
||||
this.clientHandler.registerProcessor(commandType, processor, executor); |
||||
} |
||||
|
||||
public void send(final Address address, final Command command) throws RemotingException { |
||||
final Channel channel = getChannel(address); |
||||
if (channel == null) { |
||||
throw new RemotingException("network error"); |
||||
} |
||||
try { |
||||
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ |
||||
|
||||
@Override |
||||
public void operationComplete(ChannelFuture future) throws Exception { |
||||
if(future.isSuccess()){ |
||||
logger.info("sent command {} to {}", command, address); |
||||
} else{ |
||||
logger.error("send command {} to {} failed, error {}", command, address, future.cause()); |
||||
} |
||||
} |
||||
}); |
||||
} catch (Exception ex) { |
||||
String msg = String.format("send command %s to address %s encounter error", command, address); |
||||
throw new RemotingException(msg, ex); |
||||
} |
||||
} |
||||
|
||||
public Channel getChannel(Address address) { |
||||
Channel channel = channels.get(address); |
||||
if(channel != null && channel.isActive()){ |
||||
return channel; |
||||
} |
||||
return createChannel(address, true); |
||||
} |
||||
|
||||
public Channel createChannel(Address address, boolean isSync) { |
||||
ChannelFuture future; |
||||
try { |
||||
synchronized (bootstrap){ |
||||
future = bootstrap.connect(new InetSocketAddress(address.getHost(), address.getPort())); |
||||
} |
||||
if(isSync){ |
||||
future.sync(); |
||||
} |
||||
if (future.isSuccess()) { |
||||
Channel channel = future.channel(); |
||||
channels.put(address, channel); |
||||
return channel; |
||||
} |
||||
} catch (Exception ex) { |
||||
logger.info("connect to {} error {}", address, ex); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
public ExecutorService getDefaultExecutor() { |
||||
return defaultExecutor; |
||||
} |
||||
|
||||
public void close() { |
||||
if(isStarted.compareAndSet(true, false)){ |
||||
try { |
||||
closeChannels(); |
||||
if(workerGroup != null){ |
||||
this.workerGroup.shutdownGracefully(); |
||||
} |
||||
if(defaultExecutor != null){ |
||||
defaultExecutor.shutdown(); |
||||
} |
||||
} catch (Exception ex) { |
||||
logger.error("netty client close exception", ex); |
||||
} |
||||
logger.info("netty client closed"); |
||||
} |
||||
} |
||||
|
||||
private void closeChannels(){ |
||||
for (Channel channel : this.channels.values()) { |
||||
channel.close(); |
||||
} |
||||
this.channels.clear(); |
||||
} |
||||
|
||||
public void removeChannel(Address address){ |
||||
Channel channel = this.channels.remove(address); |
||||
if(channel != null){ |
||||
channel.close(); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,165 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote; |
||||
|
||||
import io.netty.bootstrap.ServerBootstrap; |
||||
import io.netty.channel.ChannelFuture; |
||||
import io.netty.channel.ChannelInitializer; |
||||
import io.netty.channel.ChannelOption; |
||||
import io.netty.channel.ChannelPipeline; |
||||
import io.netty.channel.nio.NioEventLoopGroup; |
||||
import io.netty.channel.socket.nio.NioServerSocketChannel; |
||||
import io.netty.channel.socket.nio.NioSocketChannel; |
||||
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; |
||||
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; |
||||
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.Constants; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.ThreadFactory; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
/** |
||||
* remoting netty server |
||||
*/ |
||||
public class NettyRemotingServer { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); |
||||
|
||||
private final ServerBootstrap serverBootstrap = new ServerBootstrap(); |
||||
|
||||
private final NettyEncoder encoder = new NettyEncoder(); |
||||
|
||||
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS); |
||||
|
||||
private final NioEventLoopGroup bossGroup; |
||||
|
||||
private final NioEventLoopGroup workGroup; |
||||
|
||||
private final NettyServerConfig serverConfig; |
||||
|
||||
private final NettyServerHandler serverHandler = new NettyServerHandler(this); |
||||
|
||||
private final AtomicBoolean isStarted = new AtomicBoolean(false); |
||||
|
||||
public NettyRemotingServer(final NettyServerConfig serverConfig){ |
||||
this.serverConfig = serverConfig; |
||||
|
||||
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
|
||||
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() { |
||||
private AtomicInteger threadIndex = new AtomicInteger(0); |
||||
|
||||
public Thread newThread(Runnable r) { |
||||
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet())); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
public void start(){ |
||||
|
||||
if(this.isStarted.get()){ |
||||
return; |
||||
} |
||||
|
||||
this.serverBootstrap |
||||
.group(this.bossGroup, this.workGroup) |
||||
.channel(NioServerSocketChannel.class) |
||||
.option(ChannelOption.SO_REUSEADDR, true) |
||||
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog()) |
||||
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive()) |
||||
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay()) |
||||
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize()) |
||||
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize()) |
||||
.childHandler(new ChannelInitializer<NioSocketChannel>() { |
||||
|
||||
@Override |
||||
protected void initChannel(NioSocketChannel ch) throws Exception { |
||||
initNettyChannel(ch); |
||||
} |
||||
}); |
||||
|
||||
ChannelFuture future; |
||||
try { |
||||
future = serverBootstrap.bind(serverConfig.getListenPort()).sync(); |
||||
} catch (Exception e) { |
||||
logger.error("NettyRemotingServer bind fail {}, exit", e); |
||||
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); |
||||
} |
||||
if (future.isSuccess()) { |
||||
logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort()); |
||||
} else if (future.cause() != null) { |
||||
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause()); |
||||
} else { |
||||
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort())); |
||||
} |
||||
//
|
||||
isStarted.compareAndSet(false, true); |
||||
} |
||||
|
||||
private void initNettyChannel(NioSocketChannel ch) throws Exception{ |
||||
ChannelPipeline pipeline = ch.pipeline(); |
||||
pipeline.addLast("encoder", encoder); |
||||
pipeline.addLast("decoder", new NettyDecoder()); |
||||
pipeline.addLast("handler", serverHandler); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { |
||||
this.registerProcessor(commandType, processor, null); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { |
||||
this.serverHandler.registerProcessor(commandType, processor, executor); |
||||
} |
||||
|
||||
public ExecutorService getDefaultExecutor() { |
||||
return defaultExecutor; |
||||
} |
||||
|
||||
public void close() { |
||||
if(isStarted.compareAndSet(true, false)){ |
||||
try { |
||||
if(bossGroup != null){ |
||||
this.bossGroup.shutdownGracefully(); |
||||
} |
||||
if(workGroup != null){ |
||||
this.workGroup.shutdownGracefully(); |
||||
} |
||||
if(defaultExecutor != null){ |
||||
defaultExecutor.shutdown(); |
||||
} |
||||
} catch (Exception ex) { |
||||
logger.error("netty server close exception", ex); |
||||
} |
||||
logger.info("netty server closed"); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,92 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.codec; |
||||
|
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.handler.codec.ReplayingDecoder; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandHeader; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
|
||||
import java.util.List; |
||||
|
||||
/** |
||||
* netty decoder |
||||
*/ |
||||
public class NettyDecoder extends ReplayingDecoder<NettyDecoder.State> { |
||||
|
||||
public NettyDecoder(){ |
||||
super(State.MAGIC); |
||||
} |
||||
|
||||
private final CommandHeader commandHeader = new CommandHeader(); |
||||
|
||||
@Override |
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { |
||||
switch (state()){ |
||||
case MAGIC: |
||||
checkMagic(in.readByte()); |
||||
checkpoint(State.COMMAND); |
||||
case COMMAND: |
||||
commandHeader.setType(in.readByte()); |
||||
checkpoint(State.OPAQUE); |
||||
case OPAQUE: |
||||
commandHeader.setOpaque(in.readLong()); |
||||
checkpoint(State.BODY_LENGTH); |
||||
case BODY_LENGTH: |
||||
commandHeader.setBodyLength(in.readInt()); |
||||
checkpoint(State.BODY); |
||||
case BODY: |
||||
byte[] body = new byte[commandHeader.getBodyLength()]; |
||||
in.readBytes(body); |
||||
//
|
||||
Command packet = new Command(); |
||||
packet.setType(commandType(commandHeader.getType())); |
||||
packet.setOpaque(commandHeader.getOpaque()); |
||||
packet.setBody(body); |
||||
out.add(packet); |
||||
//
|
||||
checkpoint(State.MAGIC); |
||||
} |
||||
} |
||||
|
||||
private CommandType commandType(byte type){ |
||||
for(CommandType ct : CommandType.values()){ |
||||
if(ct.ordinal() == type){ |
||||
return ct; |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
private void checkMagic(byte magic) { |
||||
if (magic != Command.MAGIC) { |
||||
throw new IllegalArgumentException("illegal packet [magic]" + magic); |
||||
} |
||||
} |
||||
|
||||
enum State{ |
||||
MAGIC, |
||||
COMMAND, |
||||
OPAQUE, |
||||
BODY_LENGTH, |
||||
BODY; |
||||
} |
||||
} |
@ -0,0 +1,44 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.codec; |
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.channel.ChannelHandler.Sharable; |
||||
import io.netty.channel.ChannelHandlerContext; |
||||
import io.netty.handler.codec.MessageToByteEncoder; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
|
||||
/** |
||||
* netty encoder |
||||
*/ |
||||
@Sharable |
||||
public class NettyEncoder extends MessageToByteEncoder<Command> { |
||||
|
||||
@Override |
||||
protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { |
||||
if(msg == null){ |
||||
throw new Exception("encode msg is null"); |
||||
} |
||||
out.writeByte(Command.MAGIC); |
||||
out.writeByte(msg.getType().ordinal()); |
||||
out.writeLong(msg.getOpaque()); |
||||
out.writeInt(msg.getBody().length); |
||||
out.writeBytes(msg.getBody()); |
||||
} |
||||
|
||||
} |
||||
|
@ -0,0 +1,102 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* receive task log request command and content fill |
||||
* for netty data serializable transfer |
||||
*/ |
||||
public class Command implements Serializable { |
||||
|
||||
private static final long serialVersionUID = 1L; |
||||
|
||||
public static final byte MAGIC = (byte) 0xbabe; |
||||
|
||||
public Command(){ |
||||
} |
||||
|
||||
public Command(long opaque){ |
||||
this.opaque = opaque; |
||||
} |
||||
|
||||
/** |
||||
* comman type |
||||
*/ |
||||
private CommandType type; |
||||
|
||||
/** |
||||
* request unique identification |
||||
*/ |
||||
private long opaque; |
||||
|
||||
private byte[] body; |
||||
|
||||
public CommandType getType() { |
||||
return type; |
||||
} |
||||
|
||||
public void setType(CommandType type) { |
||||
this.type = type; |
||||
} |
||||
|
||||
public long getOpaque() { |
||||
return opaque; |
||||
} |
||||
|
||||
public void setOpaque(long opaque) { |
||||
this.opaque = opaque; |
||||
} |
||||
|
||||
public byte[] getBody() { |
||||
return body; |
||||
} |
||||
|
||||
public void setBody(byte[] body) { |
||||
this.body = body; |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
final int prime = 31; |
||||
int result = 1; |
||||
result = prime * result + (int) (opaque ^ (opaque >>> 32)); |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object obj) { |
||||
if (this == obj) { |
||||
return true; |
||||
} |
||||
if (obj == null) { |
||||
return false; |
||||
} |
||||
if (getClass() != obj.getClass()) { |
||||
return false; |
||||
} |
||||
Command other = (Command) obj; |
||||
return opaque == other.opaque; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "Command [type=" + type + ", opaque=" + opaque + ", bodyLen=" + (body == null ? 0 : body.length) + "]"; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,55 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* command header |
||||
*/ |
||||
public class CommandHeader implements Serializable { |
||||
|
||||
private byte type; |
||||
|
||||
private long opaque; |
||||
|
||||
private int bodyLength; |
||||
|
||||
public int getBodyLength() { |
||||
return bodyLength; |
||||
} |
||||
|
||||
public void setBodyLength(int bodyLength) { |
||||
this.bodyLength = bodyLength; |
||||
} |
||||
|
||||
public byte getType() { |
||||
return type; |
||||
} |
||||
|
||||
public void setType(byte type) { |
||||
this.type = type; |
||||
} |
||||
|
||||
public long getOpaque() { |
||||
return opaque; |
||||
} |
||||
|
||||
public void setOpaque(long opaque) { |
||||
this.opaque = opaque; |
||||
} |
||||
} |
@ -0,0 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
public enum CommandType {
/**
* roll view log request
*/
ROLL_VIEW_LOG_REQUEST,
/**
* roll view log response
*/
ROLL_VIEW_LOG_RESPONSE,
/**
* view whole log request
*/
VIEW_WHOLE_LOG_REQUEST,
/**
* view whole log response
*/
VIEW_WHOLE_LOG_RESPONSE,
/**
* get log bytes request
*/
GET_LOG_BYTES_REQUEST,
/**
* get log bytes response
*/
GET_LOG_BYTES_RESPONSE,
WORKER_REQUEST,
MASTER_RESPONSE,
/**
* execute task request
*/
EXECUTE_TASK_REQUEST,
/**
* execute task response
*/
EXECUTE_TASK_RESPONSE,
PING,
PONG;
} |
@ -0,0 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskRequestCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private String applicationName;
private String groupName;
private String taskName;
private int connectorPort;
private String description;
private String className;
private String methodName;
private String params;
private List<Integer> shardItems;
public List<Integer> getShardItems() {
return shardItems;
}
public void setShardItems(List<Integer> shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
} |
@ -0,0 +1 @@
|
||||
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskResponseCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
} |
@ -0,0 +1,57 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.buffer.Unpooled; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
|
||||
public class Ping implements Serializable { |
||||
|
||||
private static final AtomicLong ID = new AtomicLong(1); |
||||
|
||||
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; |
||||
|
||||
private static byte[] EMPTY_BODY_ARRAY = new byte[0]; |
||||
|
||||
private static final ByteBuf PING_BUF; |
||||
|
||||
static { |
||||
ByteBuf ping = Unpooled.buffer(); |
||||
ping.writeByte(Command.MAGIC); |
||||
ping.writeByte(CommandType.PING.ordinal()); |
||||
ping.writeLong(0); |
||||
ping.writeInt(0); |
||||
ping.writeBytes(EMPTY_BODY); |
||||
PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); |
||||
} |
||||
|
||||
public static ByteBuf pingContent(){ |
||||
return PING_BUF.duplicate(); |
||||
} |
||||
|
||||
public static Command create(){ |
||||
Command command = new Command(ID.getAndIncrement()); |
||||
command.setType(CommandType.PING); |
||||
command.setBody(EMPTY_BODY_ARRAY); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,54 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command; |
||||
|
||||
import io.netty.buffer.ByteBuf; |
||||
import io.netty.buffer.Unpooled; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
|
||||
public class Pong implements Serializable { |
||||
|
||||
protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; |
||||
|
||||
private static byte[] EMPTY_BODY_ARRAY = new byte[0]; |
||||
|
||||
private static final ByteBuf PONG_BUF; |
||||
|
||||
static { |
||||
ByteBuf ping = Unpooled.buffer(); |
||||
ping.writeByte(Command.MAGIC); |
||||
ping.writeByte(CommandType.PONG.ordinal()); |
||||
ping.writeLong(0); |
||||
ping.writeInt(0); |
||||
ping.writeBytes(EMPTY_BODY); |
||||
PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); |
||||
} |
||||
|
||||
public static ByteBuf pingContent(){ |
||||
return PONG_BUF.duplicate(); |
||||
} |
||||
|
||||
public static Command create(long opaque){ |
||||
Command command = new Command(opaque); |
||||
command.setType(CommandType.PONG); |
||||
command.setBody(EMPTY_BODY_ARRAY); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,62 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.log; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
/** |
||||
* get log bytes request command |
||||
*/ |
||||
public class GetLogBytesRequestCommand implements Serializable { |
||||
|
||||
private static final AtomicLong REQUEST = new AtomicLong(1); |
||||
|
||||
private String path; |
||||
|
||||
public GetLogBytesRequestCommand() { |
||||
} |
||||
|
||||
public GetLogBytesRequestCommand(String path) { |
||||
this.path = path; |
||||
} |
||||
|
||||
public String getPath() { |
||||
return path; |
||||
} |
||||
|
||||
public void setPath(String path) { |
||||
this.path = path; |
||||
} |
||||
|
||||
/** |
||||
* |
||||
* @return |
||||
*/ |
||||
public Command convert2Command(){ |
||||
Command command = new Command(REQUEST.getAndIncrement()); |
||||
command.setType(CommandType.GET_LOG_BYTES_REQUEST); |
||||
byte[] body = FastJsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,56 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.log; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* get log bytes response command |
||||
*/ |
||||
public class GetLogBytesResponseCommand implements Serializable { |
||||
|
||||
private byte[] data; |
||||
|
||||
public GetLogBytesResponseCommand() { |
||||
} |
||||
|
||||
public GetLogBytesResponseCommand(byte[] data) { |
||||
this.data = data; |
||||
} |
||||
|
||||
public byte[] getData() { |
||||
return data; |
||||
} |
||||
|
||||
public void setData(byte[] data) { |
||||
this.data = data; |
||||
} |
||||
|
||||
public Command convert2Command(long opaque){ |
||||
Command command = new Command(opaque); |
||||
command.setType(CommandType.GET_LOG_BYTES_RESPONSE); |
||||
byte[] body = FastJsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,80 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.log; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
/** |
||||
* roll view log request command |
||||
*/ |
||||
public class RollViewLogRequestCommand implements Serializable { |
||||
|
||||
private static final AtomicLong REQUEST = new AtomicLong(1); |
||||
|
||||
private String path; |
||||
|
||||
private int skipLineNum; |
||||
|
||||
private int limit; |
||||
|
||||
public RollViewLogRequestCommand() { |
||||
} |
||||
|
||||
public RollViewLogRequestCommand(String path, int skipLineNum, int limit) { |
||||
this.path = path; |
||||
this.skipLineNum = skipLineNum; |
||||
this.limit = limit; |
||||
} |
||||
|
||||
public String getPath() { |
||||
return path; |
||||
} |
||||
|
||||
public void setPath(String path) { |
||||
this.path = path; |
||||
} |
||||
|
||||
public int getSkipLineNum() { |
||||
return skipLineNum; |
||||
} |
||||
|
||||
public void setSkipLineNum(int skipLineNum) { |
||||
this.skipLineNum = skipLineNum; |
||||
} |
||||
|
||||
public int getLimit() { |
||||
return limit; |
||||
} |
||||
|
||||
public void setLimit(int limit) { |
||||
this.limit = limit; |
||||
} |
||||
|
||||
public Command convert2Command(){ |
||||
Command command = new Command(REQUEST.getAndIncrement()); |
||||
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); |
||||
byte[] body = FastJsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,55 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.log; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* roll view log response command |
||||
*/ |
||||
public class RollViewLogResponseCommand implements Serializable { |
||||
|
||||
private String msg; |
||||
|
||||
public RollViewLogResponseCommand() { |
||||
} |
||||
|
||||
public RollViewLogResponseCommand(String msg) { |
||||
this.msg = msg; |
||||
} |
||||
|
||||
public String getMsg() { |
||||
return msg; |
||||
} |
||||
|
||||
public void setMsg(String msg) { |
||||
this.msg = msg; |
||||
} |
||||
|
||||
public Command convert2Command(long opaque){ |
||||
Command command = new Command(opaque); |
||||
command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE); |
||||
byte[] body = FastJsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,58 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.log; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
/** |
||||
* view log request command |
||||
*/ |
||||
public class ViewLogRequestCommand implements Serializable { |
||||
|
||||
private static final AtomicLong REQUEST = new AtomicLong(1); |
||||
|
||||
private String path; |
||||
|
||||
public ViewLogRequestCommand() { |
||||
} |
||||
|
||||
public ViewLogRequestCommand(String path) { |
||||
this.path = path; |
||||
} |
||||
|
||||
public String getPath() { |
||||
return path; |
||||
} |
||||
|
||||
public void setPath(String path) { |
||||
this.path = path; |
||||
} |
||||
|
||||
public Command convert2Command(){ |
||||
Command command = new Command(REQUEST.getAndIncrement()); |
||||
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); |
||||
byte[] body = FastJsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,55 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.command.log; |
||||
|
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* view log response command |
||||
*/ |
||||
public class ViewLogResponseCommand implements Serializable { |
||||
|
||||
private String msg; |
||||
|
||||
public ViewLogResponseCommand() { |
||||
} |
||||
|
||||
public ViewLogResponseCommand(String msg) { |
||||
this.msg = msg; |
||||
} |
||||
|
||||
public String getMsg() { |
||||
return msg; |
||||
} |
||||
|
||||
public void setMsg(String msg) { |
||||
this.msg = msg; |
||||
} |
||||
|
||||
public Command convert2Command(long opaque){ |
||||
Command command = new Command(opaque); |
||||
command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE); |
||||
byte[] body = FastJsonSerializer.serialize(this); |
||||
command.setBody(body); |
||||
return command; |
||||
} |
||||
} |
@ -0,0 +1,76 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.config; |
||||
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants; |
||||
|
||||
/** |
||||
* netty client config |
||||
*/ |
||||
public class NettyClientConfig { |
||||
|
||||
private int workerThreads = Constants.CPUS; |
||||
|
||||
private boolean tcpNoDelay = true; |
||||
|
||||
private boolean soKeepalive = true; |
||||
|
||||
private int sendBufferSize = 65535; |
||||
|
||||
private int receiveBufferSize = 65535; |
||||
|
||||
public int getWorkerThreads() { |
||||
return workerThreads; |
||||
} |
||||
|
||||
public void setWorkerThreads(int workerThreads) { |
||||
this.workerThreads = workerThreads; |
||||
} |
||||
|
||||
public boolean isTcpNoDelay() { |
||||
return tcpNoDelay; |
||||
} |
||||
|
||||
public void setTcpNoDelay(boolean tcpNoDelay) { |
||||
this.tcpNoDelay = tcpNoDelay; |
||||
} |
||||
|
||||
public boolean isSoKeepalive() { |
||||
return soKeepalive; |
||||
} |
||||
|
||||
public void setSoKeepalive(boolean soKeepalive) { |
||||
this.soKeepalive = soKeepalive; |
||||
} |
||||
|
||||
public int getSendBufferSize() { |
||||
return sendBufferSize; |
||||
} |
||||
|
||||
public void setSendBufferSize(int sendBufferSize) { |
||||
this.sendBufferSize = sendBufferSize; |
||||
} |
||||
|
||||
public int getReceiveBufferSize() { |
||||
return receiveBufferSize; |
||||
} |
||||
|
||||
public void setReceiveBufferSize(int receiveBufferSize) { |
||||
this.receiveBufferSize = receiveBufferSize; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,95 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.config; |
||||
|
||||
import org.apache.dolphinscheduler.remote.utils.Constants; |
||||
|
||||
/** |
||||
* netty server config |
||||
*/ |
||||
public class NettyServerConfig { |
||||
|
||||
private int soBacklog = 1024; |
||||
|
||||
private boolean tcpNoDelay = true; |
||||
|
||||
private boolean soKeepalive = true; |
||||
|
||||
private int sendBufferSize = 65535; |
||||
|
||||
private int receiveBufferSize = 65535; |
||||
|
||||
private int workerThread = Constants.CPUS; |
||||
|
||||
private int listenPort = 12346; |
||||
|
||||
public int getListenPort() { |
||||
return listenPort; |
||||
} |
||||
|
||||
public void setListenPort(int listenPort) { |
||||
this.listenPort = listenPort; |
||||
} |
||||
|
||||
public int getSoBacklog() { |
||||
return soBacklog; |
||||
} |
||||
|
||||
public void setSoBacklog(int soBacklog) { |
||||
this.soBacklog = soBacklog; |
||||
} |
||||
|
||||
public boolean isTcpNoDelay() { |
||||
return tcpNoDelay; |
||||
} |
||||
|
||||
public void setTcpNoDelay(boolean tcpNoDelay) { |
||||
this.tcpNoDelay = tcpNoDelay; |
||||
} |
||||
|
||||
public boolean isSoKeepalive() { |
||||
return soKeepalive; |
||||
} |
||||
|
||||
public void setSoKeepalive(boolean soKeepalive) { |
||||
this.soKeepalive = soKeepalive; |
||||
} |
||||
|
||||
public int getSendBufferSize() { |
||||
return sendBufferSize; |
||||
} |
||||
|
||||
public void setSendBufferSize(int sendBufferSize) { |
||||
this.sendBufferSize = sendBufferSize; |
||||
} |
||||
|
||||
public int getReceiveBufferSize() { |
||||
return receiveBufferSize; |
||||
} |
||||
|
||||
public void setReceiveBufferSize(int receiveBufferSize) { |
||||
this.receiveBufferSize = receiveBufferSize; |
||||
} |
||||
|
||||
public int getWorkerThread() { |
||||
return workerThread; |
||||
} |
||||
|
||||
public void setWorkerThread(int workerThread) { |
||||
this.workerThread = workerThread; |
||||
} |
||||
} |
@ -0,0 +1,94 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.exceptions; |
||||
|
||||
/** |
||||
* remote exception |
||||
*/ |
||||
public class RemotingException extends Exception { |
||||
|
||||
public RemotingException() { |
||||
super(); |
||||
} |
||||
|
||||
/** Constructs a new runtime exception with the specified detail message. |
||||
* The cause is not initialized, and may subsequently be initialized by a |
||||
* call to {@link #initCause}. |
||||
* |
||||
* @param message the detail message. The detail message is saved for |
||||
* later retrieval by the {@link #getMessage()} method. |
||||
*/ |
||||
public RemotingException(String message) { |
||||
super(message); |
||||
} |
||||
|
||||
/** |
||||
* Constructs a new runtime exception with the specified detail message and |
||||
* cause. <p>Note that the detail message associated with |
||||
* {@code cause} is <i>not</i> automatically incorporated in |
||||
* this runtime exception's detail message. |
||||
* |
||||
* @param message the detail message (which is saved for later retrieval |
||||
* by the {@link #getMessage()} method). |
||||
* @param cause the cause (which is saved for later retrieval by the |
||||
* {@link #getCause()} method). (A <tt>null</tt> value is |
||||
* permitted, and indicates that the cause is nonexistent or |
||||
* unknown.) |
||||
* @since 1.4 |
||||
*/ |
||||
public RemotingException(String message, Throwable cause) { |
||||
super(message, cause); |
||||
} |
||||
|
||||
/** Constructs a new runtime exception with the specified cause and a |
||||
* detail message of <tt>(cause==null ? null : cause.toString())</tt> |
||||
* (which typically contains the class and detail message of |
||||
* <tt>cause</tt>). This constructor is useful for runtime exceptions |
||||
* that are little more than wrappers for other throwables. |
||||
* |
||||
* @param cause the cause (which is saved for later retrieval by the |
||||
* {@link #getCause()} method). (A <tt>null</tt> value is |
||||
* permitted, and indicates that the cause is nonexistent or |
||||
* unknown.) |
||||
* @since 1.4 |
||||
*/ |
||||
public RemotingException(Throwable cause) { |
||||
super(cause); |
||||
} |
||||
|
||||
/** |
||||
* Constructs a new runtime exception with the specified detail |
||||
* message, cause, suppression enabled or disabled, and writable |
||||
* stack trace enabled or disabled. |
||||
* |
||||
* @param message the detail message. |
||||
* @param cause the cause. (A {@code null} value is permitted, |
||||
* and indicates that the cause is nonexistent or unknown.) |
||||
* @param enableSuppression whether or not suppression is enabled |
||||
* or disabled |
||||
* @param writableStackTrace whether or not the stack trace should |
||||
* be writable |
||||
* |
||||
* @since 1.7 |
||||
*/ |
||||
protected RemotingException(String message, Throwable cause, |
||||
boolean enableSuppression, |
||||
boolean writableStackTrace) { |
||||
super(message, cause, enableSuppression, writableStackTrace); |
||||
} |
||||
} |
@ -0,0 +1,123 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.handler; |
||||
|
||||
import io.netty.channel.*; |
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; |
||||
import org.apache.dolphinscheduler.remote.utils.Pair; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.RejectedExecutionException; |
||||
|
||||
/** |
||||
* netty client request handler |
||||
*/ |
||||
@ChannelHandler.Sharable |
||||
public class NettyClientHandler extends ChannelInboundHandlerAdapter { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); |
||||
|
||||
private final NettyRemotingClient nettyRemotingClient; |
||||
|
||||
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap(); |
||||
|
||||
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){ |
||||
this.nettyRemotingClient = nettyRemotingClient; |
||||
} |
||||
|
||||
@Override |
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
||||
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); |
||||
ctx.channel().close(); |
||||
} |
||||
|
||||
@Override |
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
||||
processReceived(ctx.channel(), (Command)msg); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { |
||||
this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor()); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { |
||||
ExecutorService executorRef = executor; |
||||
if(executorRef == null){ |
||||
executorRef = nettyRemotingClient.getDefaultExecutor(); |
||||
} |
||||
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef)); |
||||
} |
||||
|
||||
private void processReceived(final Channel channel, final Command msg) { |
||||
final CommandType commandType = msg.getType(); |
||||
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType); |
||||
if (pair != null) { |
||||
Runnable r = new Runnable() { |
||||
@Override |
||||
public void run() { |
||||
try { |
||||
pair.getLeft().process(channel, msg); |
||||
} catch (Throwable ex) { |
||||
logger.error("process msg {} error : {}", msg, ex); |
||||
} |
||||
} |
||||
}; |
||||
try { |
||||
pair.getRight().submit(r); |
||||
} catch (RejectedExecutionException e) { |
||||
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel)); |
||||
} |
||||
} else { |
||||
logger.warn("commandType {} not support", commandType); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
||||
logger.error("exceptionCaught : {}", cause); |
||||
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); |
||||
ctx.channel().close(); |
||||
} |
||||
|
||||
@Override |
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { |
||||
Channel ch = ctx.channel(); |
||||
ChannelConfig config = ch.config(); |
||||
|
||||
if (!ch.isWritable()) { |
||||
if (logger.isWarnEnabled()) { |
||||
logger.warn("{} is not writable, over high water level : {}", |
||||
new Object[]{ch, config.getWriteBufferHighWaterMark()}); |
||||
} |
||||
|
||||
config.setAutoRead(false); |
||||
} else { |
||||
if (logger.isWarnEnabled()) { |
||||
logger.warn("{} is writable, to low water : {}", |
||||
new Object[]{ch, config.getWriteBufferLowWaterMark()}); |
||||
} |
||||
config.setAutoRead(true); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,121 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.remote.handler; |
||||
|
||||
import io.netty.channel.*; |
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; |
||||
import org.apache.dolphinscheduler.remote.utils.Pair; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.RejectedExecutionException; |
||||
|
||||
/** |
||||
* netty server request handler |
||||
*/ |
||||
@ChannelHandler.Sharable |
||||
public class NettyServerHandler extends ChannelInboundHandlerAdapter { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); |
||||
|
||||
private final NettyRemotingServer nettyRemotingServer; |
||||
|
||||
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap(); |
||||
|
||||
public NettyServerHandler(NettyRemotingServer nettyRemotingServer){ |
||||
this.nettyRemotingServer = nettyRemotingServer; |
||||
} |
||||
|
||||
@Override |
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
||||
ctx.channel().close(); |
||||
} |
||||
|
||||
@Override |
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
||||
processReceived(ctx.channel(), (Command)msg); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { |
||||
this.registerProcessor(commandType, processor, null); |
||||
} |
||||
|
||||
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { |
||||
ExecutorService executorRef = executor; |
||||
if(executorRef == null){ |
||||
executorRef = nettyRemotingServer.getDefaultExecutor(); |
||||
} |
||||
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef)); |
||||
} |
||||
|
||||
private void processReceived(final Channel channel, final Command msg) { |
||||
final CommandType commandType = msg.getType(); |
||||
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType); |
||||
if (pair != null) { |
||||
Runnable r = new Runnable() { |
||||
public void run() { |
||||
try { |
||||
pair.getLeft().process(channel, msg); |
||||
} catch (Throwable ex) { |
||||
logger.error("process msg {} error : {}", msg, ex); |
||||
} |
||||
} |
||||
}; |
||||
try { |
||||
pair.getRight().submit(r); |
||||
} catch (RejectedExecutionException e) { |
||||
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel)); |
||||
} |
||||
} else { |
||||
logger.warn("commandType {} not support", commandType); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
||||
logger.error("exceptionCaught : {}", cause); |
||||
ctx.channel().close(); |
||||
} |
||||
|
||||
@Override |
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { |
||||
Channel ch = ctx.channel(); |
||||
ChannelConfig config = ch.config(); |
||||
|
||||
if (!ch.isWritable()) { |
||||
if (logger.isWarnEnabled()) { |
||||
logger.warn("{} is not writable, over high water level : {}", |
||||
new Object[]{ch, config.getWriteBufferHighWaterMark()}); |
||||
} |
||||
|
||||
config.setAutoRead(false); |
||||
} else { |
||||
if (logger.isWarnEnabled()) { |
||||
logger.warn("{} is writable, to low water : {}", |
||||
new Object[]{ch, config.getWriteBufferLowWaterMark()}); |
||||
} |
||||
config.setAutoRead(true); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,90 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.utils; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
/** |
||||
* server address |
||||
*/ |
||||
public class Address implements Serializable { |
||||
|
||||
private String host; |
||||
|
||||
private int port; |
||||
|
||||
public Address(){ |
||||
//NOP
|
||||
} |
||||
|
||||
public Address(String host, int port){ |
||||
this.host = host; |
||||
this.port = port; |
||||
} |
||||
|
||||
public String getHost() { |
||||
return host; |
||||
} |
||||
|
||||
public void setHost(String host) { |
||||
this.host = host; |
||||
} |
||||
|
||||
public int getPort() { |
||||
return port; |
||||
} |
||||
|
||||
public void setPort(int port) { |
||||
this.port = port; |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
final int prime = 31; |
||||
int result = 1; |
||||
result = prime * result + ((host == null) ? 0 : host.hashCode()); |
||||
result = prime * result + port; |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object obj) { |
||||
if (this == obj) { |
||||
return true; |
||||
} |
||||
if (obj == null) { |
||||
return false; |
||||
} |
||||
if (getClass() != obj.getClass()) { |
||||
return false; |
||||
} |
||||
Address other = (Address) obj; |
||||
if (host == null) { |
||||
if (other.host != null) { |
||||
return false; |
||||
} |
||||
} else if (!host.equals(other.host)) { |
||||
return false; |
||||
} |
||||
return port == other.port; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "Address [host=" + host + ", port=" + port + "]"; |
||||
} |
||||
} |
@ -0,0 +1,41 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.utils; |
||||
|
||||
import io.netty.channel.Channel; |
||||
|
||||
import java.net.InetSocketAddress; |
||||
|
||||
/** |
||||
* channel utils |
||||
*/ |
||||
public class ChannelUtils { |
||||
|
||||
public static String getLocalAddress(Channel channel){ |
||||
return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress(); |
||||
} |
||||
|
||||
public static String getRemoteAddress(Channel channel){ |
||||
return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress(); |
||||
} |
||||
|
||||
public static Address toAddress(Channel channel){ |
||||
InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress()); |
||||
return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort()); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,31 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.utils; |
||||
|
||||
import java.nio.charset.Charset; |
||||
|
||||
public class Constants { |
||||
|
||||
public static final String COMMA = ","; |
||||
|
||||
public static final String SLASH = "/"; |
||||
|
||||
public static final Charset UTF8 = Charset.forName("UTF-8"); |
||||
|
||||
public static final int CPUS = Runtime.getRuntime().availableProcessors(); |
||||
|
||||
} |
@ -0,0 +1,39 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.remote.utils; |
||||
|
||||
import com.alibaba.fastjson.JSON; |
||||
|
||||
/** |
||||
* json serialize or deserialize |
||||
*/ |
||||
public class FastJsonSerializer { |
||||
|
||||
public static <T> byte[] serialize(T obj) { |
||||
String json = JSON.toJSONString(obj); |
||||
return json.getBytes(Constants.UTF8); |
||||
} |
||||
|
||||
public static <T> String serializeToString(T obj) { |
||||
return JSON.toJSONString(obj); |
||||
} |
||||
|
||||
public static <T> T deserialize(byte[] src, Class<T> clazz) { |
||||
return JSON.parseObject(new String(src, Constants.UTF8), clazz); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,71 @@
|
||||
package org.apache.dolphinscheduler.remote;/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
import io.netty.channel.Channel; |
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient; |
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.Ping; |
||||
import org.apache.dolphinscheduler.remote.command.Pong; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.Address; |
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
|
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
|
||||
public class NettyRemotingClientTest { |
||||
|
||||
|
||||
@Test |
||||
public void testSend(){ |
||||
NettyServerConfig serverConfig = new NettyServerConfig(); |
||||
|
||||
NettyRemotingServer server = new NettyRemotingServer(serverConfig); |
||||
server.registerProcessor(CommandType.PING, new NettyRequestProcessor() { |
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
channel.writeAndFlush(Pong.create(command.getOpaque())); |
||||
} |
||||
}); |
||||
server.start(); |
||||
//
|
||||
CountDownLatch latch = new CountDownLatch(1); |
||||
AtomicLong opaque = new AtomicLong(1); |
||||
final NettyClientConfig clientConfig = new NettyClientConfig(); |
||||
NettyRemotingClient client = new NettyRemotingClient(clientConfig); |
||||
client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() { |
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
opaque.set(command.getOpaque()); |
||||
latch.countDown(); |
||||
} |
||||
}); |
||||
Command commandPing = Ping.create(); |
||||
try { |
||||
client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing); |
||||
latch.await(); |
||||
} catch (Exception e) { |
||||
e.printStackTrace(); |
||||
} |
||||
Assert.assertEquals(opaque.get(), commandPing.getOpaque()); |
||||
} |
||||
} |
@ -1,113 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler</artifactId> |
||||
<version>1.2.1-SNAPSHOT</version> |
||||
</parent> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
|
||||
<artifactId>dolphinscheduler-rpc</artifactId> |
||||
|
||||
<name>dolphinscheduler-rpc</name> |
||||
<url>https://github.com/apache/incubator-dolphinscheduler</url> |
||||
|
||||
<properties> |
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
||||
<maven.compiler.source>1.8</maven.compiler.source> |
||||
<maven.compiler.target>1.8</maven.compiler.target> |
||||
|
||||
<protobuf.version>3.5.1</protobuf.version> |
||||
<grpc.version>1.9.0</grpc.version> |
||||
</properties> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>com.google.protobuf</groupId> |
||||
<artifactId>protobuf-java</artifactId> |
||||
<version>${protobuf.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>io.grpc</groupId> |
||||
<artifactId>grpc-netty</artifactId> |
||||
<version>${grpc.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>io.grpc</groupId> |
||||
<artifactId>grpc-protobuf</artifactId> |
||||
<version>${grpc.version}</version> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>io.grpc</groupId> |
||||
<artifactId>grpc-stub</artifactId> |
||||
<version>${grpc.version}</version> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>com.google.guava</groupId> |
||||
<artifactId>guava</artifactId> |
||||
</dependency> |
||||
</dependencies> |
||||
|
||||
<build> |
||||
<extensions> |
||||
<extension> |
||||
<groupId>kr.motd.maven</groupId> |
||||
<artifactId>os-maven-plugin</artifactId> |
||||
<version>1.5.0.Final</version> |
||||
</extension> |
||||
</extensions> |
||||
<plugins> |
||||
<plugin> |
||||
<groupId>org.xolstice.maven.plugins</groupId> |
||||
<artifactId>protobuf-maven-plugin</artifactId> |
||||
<version>0.5.0</version> |
||||
<configuration> |
||||
<protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact> |
||||
<pluginId>grpc-java</pluginId> |
||||
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> |
||||
</configuration> |
||||
<executions> |
||||
<execution> |
||||
<id>compile</id> |
||||
<goals> |
||||
<goal>compile</goal> |
||||
</goals> |
||||
</execution> |
||||
<execution> |
||||
<id>compile-custom</id> |
||||
<goals> |
||||
<goal>compile-custom</goal> |
||||
</goals> |
||||
</execution> |
||||
</executions> |
||||
</plugin> |
||||
<plugin> |
||||
<groupId>org.apache.maven.plugins</groupId> |
||||
<artifactId>maven-compiler-plugin</artifactId> |
||||
<configuration> |
||||
<source>${java.version}</source> |
||||
<target>${java.version}</target> |
||||
<encoding>${project.build.sourceEncoding}</encoding> |
||||
</configuration> |
||||
</plugin> |
||||
</plugins> |
||||
</build> |
||||
</project> |
@ -1,101 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0 |
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
* |
||||
*/ |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package schduler; |
||||
|
||||
option java_multiple_files = true; |
||||
option java_package = "org.apache.dolphinscheduler.rpc"; |
||||
option java_outer_classname = "SchdulerProto"; |
||||
|
||||
|
||||
/** |
||||
* return str info |
||||
*/ |
||||
message RetStrInfo { |
||||
/** |
||||
* str msg info |
||||
*/ |
||||
string msg = 1 ; |
||||
} |
||||
|
||||
/** |
||||
* return byte info |
||||
*/ |
||||
message RetByteInfo { |
||||
/** |
||||
* byte data info |
||||
*/ |
||||
bytes data = 1; |
||||
} |
||||
|
||||
/** |
||||
* log parameter |
||||
*/ |
||||
message LogParameter { |
||||
|
||||
/** |
||||
* path |
||||
*/ |
||||
string path = 1 ; |
||||
|
||||
/** |
||||
* skip line num |
||||
*/ |
||||
int32 skipLineNum = 2 ; |
||||
|
||||
/** |
||||
* display limt num |
||||
*/ |
||||
int32 limit = 3 ; |
||||
} |
||||
|
||||
|
||||
/** |
||||
* path parameter |
||||
*/ |
||||
message PathParameter { |
||||
|
||||
/** |
||||
* path |
||||
*/ |
||||
string path = 1 ; |
||||
} |
||||
|
||||
/** |
||||
* log view service |
||||
*/ |
||||
service LogViewService { |
||||
|
||||
/** |
||||
* roll view log |
||||
*/ |
||||
rpc rollViewLog(LogParameter) returns (RetStrInfo) {}; |
||||
|
||||
/** |
||||
* view all log |
||||
*/ |
||||
rpc viewLog(PathParameter) returns (RetStrInfo) {}; |
||||
|
||||
/** |
||||
* get log bytes |
||||
*/ |
||||
rpc getLogBytes(PathParameter) returns (RetByteInfo) {}; |
||||
} |
||||
|
@ -0,0 +1,179 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.log; |
||||
|
||||
import io.netty.channel.Channel; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.log.*; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.io.*; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Paths; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.LinkedBlockingQueue; |
||||
import java.util.concurrent.ThreadPoolExecutor; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
/** |
||||
* logger request process logic |
||||
*/ |
||||
public class LoggerRequestProcessor implements NettyRequestProcessor { |
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class); |
||||
|
||||
private final ThreadPoolExecutor executor; |
||||
|
||||
public LoggerRequestProcessor(){ |
||||
this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
logger.info("received command : {}", command); |
||||
|
||||
/** |
||||
* reuqest task log command type |
||||
*/ |
||||
final CommandType commandType = command.getType(); |
||||
switch (commandType){ |
||||
case GET_LOG_BYTES_REQUEST: |
||||
GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize( |
||||
command.getBody(), GetLogBytesRequestCommand.class); |
||||
byte[] bytes = getFileContentBytes(getLogRequest.getPath()); |
||||
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); |
||||
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); |
||||
break; |
||||
case VIEW_WHOLE_LOG_REQUEST: |
||||
ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize( |
||||
command.getBody(), ViewLogRequestCommand.class); |
||||
String msg = readWholeFileContent(viewLogRequest.getPath()); |
||||
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); |
||||
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque())); |
||||
break; |
||||
case ROLL_VIEW_LOG_REQUEST: |
||||
RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize( |
||||
command.getBody(), RollViewLogRequestCommand.class); |
||||
List<String> lines = readPartFileContent(rollViewLogRequest.getPath(), |
||||
rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit()); |
||||
StringBuilder builder = new StringBuilder(); |
||||
for (String line : lines){ |
||||
builder.append(line + "\r\n"); |
||||
} |
||||
RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString()); |
||||
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque())); |
||||
break; |
||||
default: |
||||
throw new IllegalArgumentException("unknown commandType"); |
||||
} |
||||
} |
||||
|
||||
public ExecutorService getExecutor(){ |
||||
return this.executor; |
||||
} |
||||
|
||||
/** |
||||
* get files content bytes,for down load file |
||||
* |
||||
* @param filePath file path |
||||
* @return byte array of file |
||||
* @throws Exception exception |
||||
*/ |
||||
private byte[] getFileContentBytes(String filePath){ |
||||
InputStream in = null; |
||||
ByteArrayOutputStream bos = null; |
||||
try { |
||||
in = new FileInputStream(filePath); |
||||
bos = new ByteArrayOutputStream(); |
||||
byte[] buf = new byte[1024]; |
||||
int len; |
||||
while ((len = in.read(buf)) != -1) { |
||||
bos.write(buf, 0, len); |
||||
} |
||||
return bos.toByteArray(); |
||||
}catch (IOException e){ |
||||
logger.error("get file bytes error",e); |
||||
}finally { |
||||
if (bos != null){ |
||||
try { |
||||
bos.close(); |
||||
} catch (IOException ignore) {} |
||||
} |
||||
if (in != null){ |
||||
try { |
||||
in.close(); |
||||
} catch (IOException ignore) {} |
||||
} |
||||
} |
||||
return new byte[0]; |
||||
} |
||||
|
||||
/** |
||||
* read part file content,can skip any line and read some lines |
||||
* |
||||
* @param filePath file path |
||||
* @param skipLine skip line |
||||
* @param limit read lines limit |
||||
* @return part file content |
||||
*/ |
||||
private List<String> readPartFileContent(String filePath, |
||||
int skipLine, |
||||
int limit){ |
||||
try (Stream<String> stream = Files.lines(Paths.get(filePath))) { |
||||
return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); |
||||
} catch (IOException e) { |
||||
logger.error("read file error",e); |
||||
} |
||||
return Collections.EMPTY_LIST; |
||||
} |
||||
|
||||
/** |
||||
* read whole file content |
||||
* |
||||
* @param filePath file path |
||||
* @return whole file content |
||||
*/ |
||||
private String readWholeFileContent(String filePath){ |
||||
BufferedReader br = null; |
||||
String line; |
||||
StringBuilder sb = new StringBuilder(); |
||||
try { |
||||
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); |
||||
while ((line = br.readLine()) != null){ |
||||
sb.append(line + "\r\n"); |
||||
} |
||||
return sb.toString(); |
||||
}catch (IOException e){ |
||||
logger.error("read file error",e); |
||||
}finally { |
||||
try { |
||||
if (br != null){ |
||||
br.close(); |
||||
} |
||||
} catch (IOException ignore) {} |
||||
} |
||||
return ""; |
||||
} |
||||
} |
@ -0,0 +1,91 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.server.log; |
||||
|
||||
|
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.remote.NettyRemotingServer; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
/** |
||||
* logger server |
||||
*/ |
||||
public class LoggerServer { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class); |
||||
|
||||
/** |
||||
* netty server |
||||
*/ |
||||
private final NettyRemotingServer server; |
||||
|
||||
/** |
||||
* netty server config |
||||
*/ |
||||
private final NettyServerConfig serverConfig; |
||||
|
||||
/** |
||||
* loggger request processor |
||||
*/ |
||||
private final LoggerRequestProcessor requestProcessor; |
||||
|
||||
public LoggerServer(){ |
||||
this.serverConfig = new NettyServerConfig(); |
||||
this.serverConfig.setListenPort(Constants.RPC_PORT); |
||||
this.server = new NettyRemotingServer(serverConfig); |
||||
this.requestProcessor = new LoggerRequestProcessor(); |
||||
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor()); |
||||
this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); |
||||
this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor()); |
||||
} |
||||
|
||||
/** |
||||
* main launches the server from the command line. |
||||
* @param args arguments |
||||
*/ |
||||
public static void main(String[] args) { |
||||
final LoggerServer server = new LoggerServer(); |
||||
server.start(); |
||||
} |
||||
|
||||
/** |
||||
* server start |
||||
*/ |
||||
public void start() { |
||||
this.server.start(); |
||||
logger.info("logger server started, listening on port : {}" , Constants.RPC_PORT); |
||||
Runtime.getRuntime().addShutdownHook(new Thread() { |
||||
@Override |
||||
public void run() { |
||||
LoggerServer.this.stop(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
/** |
||||
* stop |
||||
*/ |
||||
public void stop() { |
||||
this.server.close(); |
||||
logger.info("logger server shut down"); |
||||
} |
||||
|
||||
} |
@ -1,149 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.rpc; |
||||
|
||||
import io.grpc.ManagedChannel; |
||||
import io.grpc.ManagedChannelBuilder; |
||||
import io.grpc.StatusRuntimeException; |
||||
import org.apache.dolphinscheduler.rpc.*; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* log client |
||||
*/ |
||||
public class LogClient { |
||||
|
||||
/** |
||||
* logger of LogClient |
||||
*/ |
||||
private static final Logger logger = LoggerFactory.getLogger(LogClient.class); |
||||
|
||||
/** |
||||
* managed channel |
||||
*/ |
||||
private final ManagedChannel channel; |
||||
|
||||
/** |
||||
* blocking stub |
||||
*/ |
||||
private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub; |
||||
|
||||
/** |
||||
* Construct client connecting to HelloWorld server at host:port. |
||||
* |
||||
* @param host host |
||||
* @param port port |
||||
*/ |
||||
public LogClient(String host, int port) { |
||||
this(ManagedChannelBuilder.forAddress(host, port) |
||||
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
|
||||
// needing certificates.
|
||||
.usePlaintext(true)); |
||||
} |
||||
|
||||
/** |
||||
* Construct client for accessing RouteGuide server using the existing channel. |
||||
* |
||||
* @param channelBuilder channel builder |
||||
*/ |
||||
LogClient(ManagedChannelBuilder<?> channelBuilder) { |
||||
/** |
||||
* set max message read size |
||||
*/ |
||||
channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE); |
||||
channel = channelBuilder.build(); |
||||
blockingStub = LogViewServiceGrpc.newBlockingStub(channel); |
||||
} |
||||
|
||||
/** |
||||
* shut down channel |
||||
* |
||||
* @throws InterruptedException interrupted exception |
||||
*/ |
||||
public void shutdown() throws InterruptedException { |
||||
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); |
||||
} |
||||
|
||||
/** |
||||
* roll view log |
||||
* |
||||
* @param path log path |
||||
* @param skipLineNum skip line num |
||||
* @param limit limit |
||||
* @return log content |
||||
*/ |
||||
public String rollViewLog(String path,int skipLineNum,int limit) { |
||||
logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit); |
||||
LogParameter pathParameter = LogParameter |
||||
.newBuilder() |
||||
.setPath(path) |
||||
.setSkipLineNum(skipLineNum) |
||||
.setLimit(limit) |
||||
.build(); |
||||
RetStrInfo retStrInfo; |
||||
try { |
||||
retStrInfo = blockingStub.rollViewLog(pathParameter); |
||||
return retStrInfo.getMsg(); |
||||
} catch (StatusRuntimeException e) { |
||||
logger.error("roll view log failed", e); |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* view all log |
||||
* |
||||
* @param path log path |
||||
* @return log content |
||||
*/ |
||||
public String viewLog(String path) { |
||||
logger.info("view log path : {}",path); |
||||
|
||||
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build(); |
||||
RetStrInfo retStrInfo; |
||||
try { |
||||
retStrInfo = blockingStub.viewLog(pathParameter); |
||||
return retStrInfo.getMsg(); |
||||
} catch (StatusRuntimeException e) { |
||||
logger.error("view log failed", e); |
||||
return null; |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get log bytes |
||||
* |
||||
* @param path log path |
||||
* @return log content |
||||
*/ |
||||
public byte[] getLogBytes(String path) { |
||||
logger.info("get log bytes {}",path); |
||||
|
||||
PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build(); |
||||
RetByteInfo retByteInfo; |
||||
try { |
||||
retByteInfo = blockingStub.getLogBytes(pathParameter); |
||||
return retByteInfo.getData().toByteArray(); |
||||
} catch (StatusRuntimeException e) { |
||||
logger.error("get log bytes failed ", e); |
||||
return null; |
||||
} |
||||
} |
||||
} |
@ -1,238 +0,0 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.server.rpc; |
||||
|
||||
import io.grpc.stub.StreamObserver; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import com.google.protobuf.ByteString; |
||||
import io.grpc.Server; |
||||
import io.grpc.ServerBuilder; |
||||
import org.apache.dolphinscheduler.rpc.*; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
import java.io.*; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Paths; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
/** |
||||
* logger server |
||||
*/ |
||||
public class LoggerServer { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class); |
||||
|
||||
/** |
||||
* server |
||||
*/ |
||||
private Server server; |
||||
|
||||
/** |
||||
* server start |
||||
* @throws IOException io exception |
||||
*/ |
||||
public void start() throws IOException { |
||||
/* The port on which the server should run */ |
||||
int port = Constants.RPC_PORT; |
||||
server = ServerBuilder.forPort(port) |
||||
.addService(new LogViewServiceGrpcImpl()) |
||||
.build() |
||||
.start(); |
||||
logger.info("server started, listening on port : {}" , port); |
||||
Runtime.getRuntime().addShutdownHook(new Thread() { |
||||
@Override |
||||
public void run() { |
||||
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
|
||||
logger.info("shutting down gRPC server since JVM is shutting down"); |
||||
LoggerServer.this.stop(); |
||||
logger.info("server shut down"); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
/** |
||||
* stop |
||||
*/ |
||||
private void stop() { |
||||
if (server != null) { |
||||
server.shutdown(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* await termination on the main thread since the grpc library uses daemon threads. |
||||
*/ |
||||
private void blockUntilShutdown() throws InterruptedException { |
||||
if (server != null) { |
||||
server.awaitTermination(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* main launches the server from the command line. |
||||
*/ |
||||
|
||||
/** |
||||
* main launches the server from the command line. |
||||
* @param args arguments |
||||
* @throws IOException io exception |
||||
* @throws InterruptedException interrupted exception |
||||
*/ |
||||
public static void main(String[] args) throws IOException, InterruptedException { |
||||
final LoggerServer server = new LoggerServer(); |
||||
server.start(); |
||||
server.blockUntilShutdown(); |
||||
} |
||||
|
||||
/** |
||||
* Log View Service Grpc Implementation |
||||
*/ |
||||
static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase { |
||||
@Override |
||||
public void rollViewLog(LogParameter request, StreamObserver<RetStrInfo> responseObserver) { |
||||
|
||||
logger.info("log parameter path : {} ,skip line : {}, limit : {}", |
||||
request.getPath(), |
||||
request.getSkipLineNum(), |
||||
request.getLimit()); |
||||
List<String> list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit()); |
||||
StringBuilder sb = new StringBuilder(); |
||||
boolean errorLineFlag = false; |
||||
for (String line : list){ |
||||
sb.append(line + "\r\n"); |
||||
} |
||||
RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build(); |
||||
responseObserver.onNext(retInfoBuild); |
||||
responseObserver.onCompleted(); |
||||
} |
||||
|
||||
@Override |
||||
public void viewLog(PathParameter request, StreamObserver<RetStrInfo> responseObserver) { |
||||
logger.info("task path is : {} " , request.getPath()); |
||||
RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(readFile(request.getPath())).build(); |
||||
responseObserver.onNext(retInfoBuild); |
||||
responseObserver.onCompleted(); |
||||
} |
||||
|
||||
@Override |
||||
public void getLogBytes(PathParameter request, StreamObserver<RetByteInfo> responseObserver) { |
||||
try { |
||||
ByteString bytes = ByteString.copyFrom(getFileBytes(request.getPath())); |
||||
RetByteInfo.Builder builder = RetByteInfo.newBuilder(); |
||||
builder.setData(bytes); |
||||
responseObserver.onNext(builder.build()); |
||||
responseObserver.onCompleted(); |
||||
}catch (Exception e){ |
||||
logger.error("get log bytes failed",e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* get files bytes |
||||
* |
||||
* @param path path |
||||
* @return byte array of file |
||||
* @throws Exception exception |
||||
*/ |
||||
private static byte[] getFileBytes(String path){ |
||||
InputStream in = null; |
||||
ByteArrayOutputStream bos = null; |
||||
try { |
||||
in = new FileInputStream(path); |
||||
bos = new ByteArrayOutputStream(); |
||||
byte[] buf = new byte[1024]; |
||||
int len = 0; |
||||
while ((len = in.read(buf)) != -1) { |
||||
bos.write(buf, 0, len); |
||||
} |
||||
return bos.toByteArray(); |
||||
}catch (IOException e){ |
||||
logger.error("get file bytes error",e); |
||||
}finally { |
||||
if (bos != null){ |
||||
try { |
||||
bos.close(); |
||||
} catch (IOException e) { |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
if (in != null){ |
||||
try { |
||||
in.close(); |
||||
} catch (IOException e) { |
||||
e.printStackTrace(); |
||||
} |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* read file content |
||||
* |
||||
* @param path |
||||
* @param skipLine |
||||
* @param limit |
||||
* @return |
||||
*/ |
||||
private static List<String> readFile(String path,int skipLine,int limit){ |
||||
try (Stream<String> stream = Files.lines(Paths.get(path))) { |
||||
return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); |
||||
} catch (IOException e) { |
||||
logger.error("read file failed",e); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
/** |
||||
* read file content |
||||
* |
||||
* @param path path |
||||
* @return string of file content |
||||
* @throws Exception exception |
||||
*/ |
||||
private static String readFile(String path){ |
||||
BufferedReader br = null; |
||||
String line = null; |
||||
StringBuilder sb = new StringBuilder(); |
||||
try { |
||||
br = new BufferedReader(new InputStreamReader(new FileInputStream(path))); |
||||
boolean errorLineFlag = false; |
||||
while ((line = br.readLine()) != null){ |
||||
sb.append(line + "\r\n"); |
||||
} |
||||
|
||||
return sb.toString(); |
||||
}catch (IOException e){ |
||||
logger.error("read file failed",e); |
||||
}finally { |
||||
try { |
||||
if (br != null){ |
||||
br.close(); |
||||
} |
||||
} catch (IOException e) { |
||||
logger.error(e.getMessage(),e); |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,56 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<parent> |
||||
<artifactId>dolphinscheduler</artifactId> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<version>1.2.1-SNAPSHOT</version> |
||||
</parent> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
|
||||
<artifactId>dolphinscheduler-service</artifactId> |
||||
|
||||
<name>dolphinscheduler-service</name> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-remote</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-dao</artifactId> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.apache.curator</groupId> |
||||
<artifactId>curator-client</artifactId> |
||||
<version>${curator.version}</version> |
||||
<exclusions> |
||||
<exclusion> |
||||
<groupId>log4j-1.2-api</groupId> |
||||
<artifactId>org.apache.logging.log4j</artifactId> |
||||
</exclusion> |
||||
<exclusion> |
||||
<groupId>io.netty</groupId> |
||||
<artifactId>netty</artifactId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
<dependency> |
||||
<groupId>org.quartz-scheduler</groupId> |
||||
<artifactId>quartz</artifactId> |
||||
<exclusions> |
||||
<exclusion> |
||||
<artifactId>c3p0</artifactId> |
||||
<groupId>c3p0</groupId> |
||||
</exclusion> |
||||
</exclusions> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.quartz-scheduler</groupId> |
||||
<artifactId>quartz-jobs</artifactId> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,166 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.apache.dolphinscheduler.service.log; |
||||
|
||||
import io.netty.channel.Channel; |
||||
import org.apache.dolphinscheduler.remote.NettyRemotingClient; |
||||
import org.apache.dolphinscheduler.remote.command.Command; |
||||
import org.apache.dolphinscheduler.remote.command.CommandType; |
||||
import org.apache.dolphinscheduler.remote.command.log.*; |
||||
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; |
||||
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; |
||||
import org.apache.dolphinscheduler.remote.utils.Address; |
||||
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; |
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
|
||||
|
||||
/** |
||||
* log client |
||||
*/ |
||||
public class LogClientService implements NettyRequestProcessor { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class); |
||||
|
||||
private final NettyClientConfig clientConfig; |
||||
|
||||
private final NettyRemotingClient client; |
||||
|
||||
private final Address address; |
||||
|
||||
/** |
||||
* request time out |
||||
*/ |
||||
private final long logRequestTimeout = 10 * 1000; |
||||
|
||||
/** |
||||
* construct client |
||||
* @param host host |
||||
* @param port port |
||||
*/ |
||||
public LogClientService(String host, int port) { |
||||
this.address = new Address(host, port); |
||||
this.clientConfig = new NettyClientConfig(); |
||||
this.clientConfig.setWorkerThreads(1); |
||||
this.client = new NettyRemotingClient(clientConfig); |
||||
this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this); |
||||
this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this); |
||||
this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this); |
||||
|
||||
} |
||||
|
||||
/** |
||||
* shutdown |
||||
*/ |
||||
public void shutdown() { |
||||
this.client.close(); |
||||
logger.info("logger client shutdown"); |
||||
} |
||||
|
||||
/** |
||||
* roll view log |
||||
* @param path path |
||||
* @param skipLineNum skip line number |
||||
* @param limit limit |
||||
* @return log content |
||||
*/ |
||||
public String rollViewLog(String path,int skipLineNum,int limit) { |
||||
logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit); |
||||
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); |
||||
String result = ""; |
||||
try { |
||||
Command command = request.convert2Command(); |
||||
this.client.send(address, command); |
||||
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); |
||||
result = ((String)promise.getResult()); |
||||
} catch (Exception e) { |
||||
logger.error("roll view log error", e); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* view log |
||||
* @param path path |
||||
* @return log content |
||||
*/ |
||||
public String viewLog(String path) { |
||||
logger.info("view log path {}", path); |
||||
ViewLogRequestCommand request = new ViewLogRequestCommand(path); |
||||
String result = ""; |
||||
try { |
||||
Command command = request.convert2Command(); |
||||
this.client.send(address, command); |
||||
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); |
||||
result = ((String)promise.getResult()); |
||||
} catch (Exception e) { |
||||
logger.error("view log error", e); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* get log size |
||||
* @param path log path |
||||
* @return log content bytes |
||||
*/ |
||||
public byte[] getLogBytes(String path) { |
||||
logger.info("log path {}", path); |
||||
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); |
||||
byte[] result = null; |
||||
try { |
||||
Command command = request.convert2Command(); |
||||
this.client.send(address, command); |
||||
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); |
||||
result = (byte[])promise.getResult(); |
||||
} catch (Exception e) { |
||||
logger.error("get log size error", e); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public void process(Channel channel, Command command) { |
||||
logger.info("received log response : {}", command); |
||||
switch (command.getType()){ |
||||
case ROLL_VIEW_LOG_RESPONSE: |
||||
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize( |
||||
command.getBody(), RollViewLogResponseCommand.class); |
||||
LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg()); |
||||
break; |
||||
case VIEW_WHOLE_LOG_RESPONSE: |
||||
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize( |
||||
command.getBody(), ViewLogResponseCommand.class); |
||||
LogPromise.notify(command.getOpaque(), viewLog.getMsg()); |
||||
break; |
||||
case GET_LOG_BYTES_RESPONSE: |
||||
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize( |
||||
command.getBody(), GetLogBytesResponseCommand.class); |
||||
LogPromise.notify(command.getOpaque(), getLog.getData()); |
||||
break; |
||||
default: |
||||
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType())); |
||||
} |
||||
} |
||||
|
||||
public static void main(String[] args) throws Exception{ |
||||
LogClientService logClient = new LogClientService("192.168.220.247", 50051); |
||||
byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log"); |
||||
System.out.println(new String(logBytes)); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,81 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.service.log; |
||||
|
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
/** |
||||
* log asyc callback |
||||
*/ |
||||
public class LogPromise { |
||||
|
||||
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>(); |
||||
|
||||
private long opaque; |
||||
|
||||
private final long start; |
||||
|
||||
private final long timeout; |
||||
|
||||
private final CountDownLatch latch; |
||||
|
||||
private Object result; |
||||
|
||||
public LogPromise(long opaque, long timeout){ |
||||
this.opaque = opaque; |
||||
this.timeout = timeout; |
||||
this.start = System.currentTimeMillis(); |
||||
this.latch = new CountDownLatch(1); |
||||
PROMISES.put(opaque, this); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* notify client finish |
||||
* @param opaque unique identification |
||||
* @param result result |
||||
*/ |
||||
public static void notify(long opaque, Object result){ |
||||
LogPromise promise = PROMISES.remove(opaque); |
||||
if(promise != null){ |
||||
promise.doCountDown(result); |
||||
} |
||||
} |
||||
|
||||
private void doCountDown(Object result){ |
||||
this.result = result; |
||||
this.latch.countDown(); |
||||
} |
||||
|
||||
public boolean isTimeout(){ |
||||
return System.currentTimeMillis() - start > timeout; |
||||
} |
||||
|
||||
public Object getResult(){ |
||||
try { |
||||
latch.await(timeout, TimeUnit.MILLISECONDS); |
||||
} catch (InterruptedException ignore) { |
||||
} |
||||
PROMISES.remove(opaque); |
||||
return this.result; |
||||
} |
||||
|
||||
|
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue