Browse Source

1,add sendAsync method 2,refactor LoggerClient (#1965)

* 1,remove dolphinscheduler-rpc module  2,add dolphinscheduler-remote module 3,add dolphinscheduler-service module 4,refactor LoggerServer module (#1925)

* 1,remove dolphinscheduler-rpc module
2,add dolphinscheduler-remote module
3,add dolphinscheduler-service module
4,refactor LoggerServer module

* ProcessUtils modify

* Refactor architecture (#1926)

* move version to parent pom

* move version properties to parent pom for easy management

* remove freemarker dependency

* delete CombinedApplicationServer

* #1871 correct spelling

* #1873 some updates for TaskQueueZkImpl

* #1875 remove unused properties in pom

* #1878
1. remove tomcat dependency
2. remove combined_logback.xml in api module
3. format pom.xml for not aligning

* #1885 fix api server startup failure
1. add jsp-2.1 dependency
2. remove jasper-runtime dependency

* add stringutils ut (#1921)

* add stringutils ut

* Newfeature for #1675. (#1908)

Continue to finish the rest works, add the cache feature for dependence,mr,python,sub_process,procedure and shell.

* Add modify user name for process definition (#1919)

* class overrides equals() and should therefore also override hashCode()

* #1862 add modify user in process difinition list

* #1862 add pg-1.2.2 ddl.sql

* modify ScriptRunnerTest

* add updateProessDifinition UT

* modify updateProcessDifinition UT

* modify updateProcessDifinition UT

* modify mysql 1.2.2 ddl.sql&dml.sql

* add scope test to mysql in pom

* modify pg-1.2.2 ddl.sql

* refactor module

* updates

Co-authored-by: khadgarmage <khadgar.mage@outlook.com>
Co-authored-by: zhukai <boness@qq.com>
Co-authored-by: Yelli <amarantine@my.com>

* dolphinscheduler-common remove spring (#1931)

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* SpringApplicationContext class title add license (#1932)

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* add license (#1934)

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* move datasource classes to dao module

* fix send4LetterWord bug

* Refactor architecture (#1936)

* move datasource classes to dao module

* fix send4LetterWord bug

* exclude jasper-compiler in case of runtime conflict

* LoggerServiceTest remove ProcessDao (#1944)

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* dolphinscheduler-common remove spring

* LoggerServiceTest remove ProcessDao

* exclude jasper-compiler in case of runtime conflict (#1938)

* move datasource classes to dao module

* fix send4LetterWord bug

* exclude jasper-compiler in case of runtime conflict

* DataAnaylysisServiceTest and ProcessDefinitionService modify

* remote module add comment

* OSUtilsTest modify

* add finally block to close channel

* add finally block to close channel (#1951)

* move datasource classes to dao module

* fix send4LetterWord bug

* exclude jasper-compiler in case of runtime conflict

* add finally block to close channel

* refactor log client service

* add sendSync method

* 1,quartz.properties add conf category
2,dolphinscheduler-daemon.sh modify

* dolphinscheduler-binary.xml modify

* add sendAsync method (#1962)

* add sendAsync method

* add sendAsync method

* 1,add sendAsync method
2,refactor LoggerClient

* LogClientService modify

Co-authored-by: Tboy <technoboy@yeah.net>
Co-authored-by: khadgarmage <khadgar.mage@outlook.com>
Co-authored-by: zhukai <boness@qq.com>
Co-authored-by: Yelli <amarantine@my.com>
pull/2/head
qiaozhanwei 4 years ago committed by GitHub
parent
commit
9b75bebf1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
  2. 210
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java
  3. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java
  4. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java
  5. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java
  6. 2
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
  7. 8
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
  8. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
  9. 11
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
  10. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
  11. 38
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java
  12. 27
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java
  13. 31
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java
  14. 41
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java
  15. 164
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java
  16. 87
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
  17. 38
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java
  18. 61
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java
  19. 48
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
  20. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
  21. 100
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
  22. 28
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java

@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
/** /**
* log service * log service
*/ */
@ -39,6 +41,17 @@ public class LoggerService {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
private final LogClientService logClient;
public LoggerService(){
logClient = new LogClientService();
}
@PreDestroy
public void close(){
logClient.close();
}
/** /**
* view log * view log
* *
@ -64,18 +77,9 @@ public class LoggerService {
Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); Result result = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT); logger.info("log host : {} , logPath : {} , logServer port : {}",host,taskInstance.getLogPath(),Constants.RPC_PORT);
LogClientService logClient = null; String log = logClient.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(),skipLineNum,limit);
try { result.setData(log);
logClient = new LogClientService(host, Constants.RPC_PORT); logger.info(log);
String log = logClient.rollViewLog(taskInstance.getLogPath(),skipLineNum,limit);
result.setData(log);
logger.info(log);
} finally {
if(logClient != null){
logClient.close();
}
}
return result; return result;
} }
@ -90,16 +94,7 @@ public class LoggerService {
if (taskInstance == null){ if (taskInstance == null){
throw new RuntimeException("task instance is null"); throw new RuntimeException("task instance is null");
} }
String host = taskInstance.getHost(); String host = taskInstance.getHost();
LogClientService logClient = null; return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
try {
logClient = new LogClientService(host, Constants.RPC_PORT);
return logClient.getLogBytes(taskInstance.getLogPath());
} finally {
if(logClient != null){
logClient.close();
}
}
} }
} }

210
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingClient.java

@ -25,21 +25,22 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder; import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder; import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler; import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -51,7 +52,7 @@ public class NettyRemotingClient {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class); private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);
/** /**
* bootstrap * client bootstrap
*/ */
private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap bootstrap = new Bootstrap();
@ -61,14 +62,9 @@ public class NettyRemotingClient {
private final NettyEncoder encoder = new NettyEncoder(); private final NettyEncoder encoder = new NettyEncoder();
/** /**
* channels * channels
*/ */
private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(); private final ConcurrentHashMap<Address, Channel> channels = new ConcurrentHashMap(128);
/**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
/** /**
* started flag * started flag
@ -81,18 +77,27 @@ public class NettyRemotingClient {
private final NioEventLoopGroup workerGroup; private final NioEventLoopGroup workerGroup;
/** /**
* client handler * client config
*/ */
private final NettyClientHandler clientHandler = new NettyClientHandler(this); private final NettyClientConfig clientConfig;
/** /**
* netty client config * saync semaphore
*/ */
private final NettyClientConfig clientConfig; private final Semaphore asyncSemaphore = new Semaphore(200, true);
/** /**
* netty client init * callback thread executor
* */
private final ExecutorService callbackExecutor;
/**
* client handler
*/
private final NettyClientHandler clientHandler;
/**
* client init
* @param clientConfig client config * @param clientConfig client config
*/ */
public NettyRemotingClient(final NettyClientConfig clientConfig){ public NettyRemotingClient(final NettyClientConfig clientConfig){
@ -105,11 +110,16 @@ public class NettyRemotingClient {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet())); return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
} }
}); });
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.start(); this.start();
} }
/** /**
* netty server start * start
*/ */
private void start(){ private void start(){
@ -129,63 +139,125 @@ public class NettyRemotingClient {
encoder); encoder);
} }
}); });
//
isStarted.compareAndSet(false, true); isStarted.compareAndSet(false, true);
} }
/** /**
* register processor * async send
* * @param address address
* @param commandType command type * @param command command
* @param processor processor * @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
*/ */
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) { public void sendAsync(final Address address, final Command command,
registerProcessor(commandType, processor, null); final long timeoutMillis,
} final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address);
if (channel == null) {
throw new RemotingException("network error");
}
/**
* request unique identification
*/
final long opaque = command.getOpaque();
/**
* control concurrency number
*/
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if(acquired){
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/** /**
* register processor * response future
* */
* @param commandType command type final ResponseFuture responseFuture = new ResponseFuture(opaque,
* @param processor processor timeoutMillis,
* @param executor thread executor invokeCallback,
*/ releaseSemaphore);
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) { try {
this.clientHandler.registerProcessor(commandType, processor, executor); channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
responseFuture.setSendOk(true);
return;
} else {
responseFuture.setSendOk(false);
}
responseFuture.setCause(future.cause());
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Throwable ex){
logger.error("execute callback error", ex);
} finally{
responseFuture.release();
}
}
});
} catch (Throwable ex){
responseFuture.release();
throw new RemotingException(String.format("send command to address: %s failed", address), ex);
}
} else{
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message);
}
} }
/** /**
* send connect * sync send
* @param address address * @param address address
* @param command command * @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
* @throws RemotingException * @throws RemotingException
*/ */
public void send(final Address address, final Command command) throws RemotingException { public Command sendSync(final Address address, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
final Channel channel = getChannel(address); final Channel channel = getChannel(address);
if (channel == null) { if (channel == null) {
throw new RemotingException("network error"); throw new RemotingException(String.format("connect to : %s fail", address));
} }
try { final long opaque = command.getOpaque();
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){ final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){ if(future.isSuccess()){
logger.info("sent command {} to {}", command, address); responseFuture.setSendOk(true);
} else{ return;
logger.error("send command {} to {} failed, error {}", command, address, future.cause()); } else {
} responseFuture.setSendOk(false);
} }
}); responseFuture.setCause(future.cause());
} catch (Exception ex) { responseFuture.putResponse(null);
String msg = String.format("send command %s to address %s encounter error", command, address); logger.error("send command {} to address {} failed", command, address);
throw new RemotingException(msg, ex); }
});
/**
* sync wait for result
*/
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
throw new RemotingTimeoutException(address.toString(), timeoutMillis, responseFuture.getCause());
} else{
throw new RemotingException(address.toString(), responseFuture.getCause());
}
} }
return result;
} }
/** /**
* get channel * get channel
* @param address address * @param address
* @return channel * @return
*/ */
public Channel getChannel(Address address) { public Channel getChannel(Address address) {
Channel channel = channels.get(address); Channel channel = channels.get(address);
@ -196,9 +268,9 @@ public class NettyRemotingClient {
} }
/** /**
* create channel * create channel
* @param address address * @param address address
* @param isSync is sync * @param isSync sync flag
* @return channel * @return channel
*/ */
public Channel createChannel(Address address, boolean isSync) { public Channel createChannel(Address address, boolean isSync) {
@ -222,15 +294,7 @@ public class NettyRemotingClient {
} }
/** /**
* get default thread executor * close
* @return thread executor
*/
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}
/**
* close client
*/ */
public void close() { public void close() {
if(isStarted.compareAndSet(true, false)){ if(isStarted.compareAndSet(true, false)){
@ -239,8 +303,8 @@ public class NettyRemotingClient {
if(workerGroup != null){ if(workerGroup != null){
this.workerGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully();
} }
if(defaultExecutor != null){ if(callbackExecutor != null){
defaultExecutor.shutdown(); this.callbackExecutor.shutdownNow();
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.error("netty client close exception", ex); logger.error("netty client close exception", ex);
@ -250,7 +314,7 @@ public class NettyRemotingClient {
} }
/** /**
* close channel * close channels
*/ */
private void closeChannels(){ private void closeChannels(){
for (Channel channel : this.channels.values()) { for (Channel channel : this.channels.values()) {
@ -260,10 +324,10 @@ public class NettyRemotingClient {
} }
/** /**
* remove channel * close channel
* @param address address * @param address address
*/ */
public void removeChannel(Address address){ public void closeChannel(Address address){
Channel channel = this.channels.remove(address); Channel channel = this.channels.remove(address);
if(channel != null){ if(channel != null){
channel.close(); channel.close();

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@ -49,7 +49,7 @@ public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class); private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/** /**
* server bootstart * server bootstrap
*/ */
private final ServerBootstrap serverBootstrap = new ServerBootstrap(); private final ServerBootstrap serverBootstrap = new ServerBootstrap();

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Command.java

@ -16,7 +16,10 @@
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import com.sun.org.apache.regexp.internal.RE;
import java.io.Serializable; import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* receive task log request command and content fill * receive task log request command and content fill
@ -24,11 +27,12 @@ import java.io.Serializable;
*/ */
public class Command implements Serializable { public class Command implements Serializable {
private static final long serialVersionUID = 1L; private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe; public static final byte MAGIC = (byte) 0xbabe;
public Command(){ public Command(){
this.opaque = REQUEST_ID.getAndIncrement();
} }
public Command(long opaque){ public Command(long opaque){

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** * attempt id */ private String attemptId; /** * application name */ kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); kd">private static final AtomicLong REQUEST = new AtomicLong(1); */ kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** */ kd">private static final AtomicLong REQUEST = new AtomicLong(1); * task id /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); */ */ kd">private static final AtomicLong REQUEST = new AtomicLong(1); private String taskId; /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); * attempt id */ kd">private static final AtomicLong REQUEST = new AtomicLong(1); private String attemptId; /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); * application name */ /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); */ /** /** */ * task id /** */ s */ private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { private String taskId; * attempt id private String attemptId; * application name /** /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** private String attemptId; /** /** /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id private String attemptId; /** */ /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** private String taskId; private String attemptId; /** * attempt id /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); /** private String attemptId; private String attemptId; /** * application name /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); * connector port private String attemptId; private int connectorPort; /** /** * task id */ * task id /** /** * class name private String attemptId; private String className; /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); * method name private String attemptId; private String methodName; /** /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); * parameters private String attemptId; * task id * application name /** kd">private static final AtomicLong REQUEST = new AtomicLong(1); */ s */ private List<Integer> shardItems; public List<Integer> getShardItems() { return shardItems; } public void setShardItems(List<Integer> shardItems) { this.shardItems = shardItems; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public String getApplicationName() { return applicationName; } public void setApplicationName(String applicationName) { this.applicationName = applicationName; } public String getGroupName() { return groupName; } public void setGroupName(String groupName) { this.groupName = groupName; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public int getConnectorPort() { return connectorPort; } public void setConnectorPort(int connectorPort) { this.connectorPort = connectorPort; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

2
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute taks response command */ public class ExecuteTaskResponseCommand implements Serializable { private static final AtomicLong REQUEST = new AtomicLong(1); /** * task id */ private String taskId; /** execute taks response command */ execute taks response command execute taks response command /** execute taks response command */ */ execute taks response command public class ExecuteTaskResponseCommand implements Serializable { /** * receive time */ execute taks response command private static final AtomicLong REQUEST = new AtomicLong(1); /** execute taks response command /** execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } /** * package response command * * @return command */ public Command convert2Command(){ Command command = new Command(REQUEST.getAndIncrement()); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; /** * execute taks response command * task id */ public class ExecuteTaskResponseCommand implements Serializable { */ /** * task id */ */ /** * attempt id */ */ /** */ */ execute taks response command * return result */ public class ExecuteTaskResponseCommand implements Serializable { */ */ */ execute taks response command * received time */ private long receivedTime; */ execute taks response command public class ExecuteTaskResponseCommand implements Serializable { execute taks response command execute count */ private int executeCount; /** * execute time */ private long executeTime; public String getAttemptId() { return attemptId; } public void setAttemptId(String attemptId) { this.attemptId = attemptId; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public long getReceivedTime() { return receivedTime; } public void setReceivedTime(long receivedTime) { this.receivedTime = receivedTime; } public int getExecuteCount() { return executeCount; } public void setExecuteCount(int executeCount) { this.executeCount = executeCount; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Command convert2Command(long opaque){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }

8
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java

@ -28,8 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class Ping implements Serializable { public class Ping implements Serializable {
private static final AtomicLong ID = new AtomicLong(1);
/** /**
* ping body * ping body
*/ */
@ -53,7 +51,7 @@ public class Ping implements Serializable {
} }
/** /**
* ping connect * ping content
* @return result * @return result
*/ */
public static ByteBuf pingContent(){ public static ByteBuf pingContent(){
@ -61,12 +59,12 @@ public class Ping implements Serializable {
} }
/** /**
* package ping command * create ping command
* *
* @return command * @return command
*/ */
public static Command create(){ public static Command create(){
Command command = new Command(ID.getAndIncrement()); Command command = new Command();
command.setType(CommandType.PING); command.setType(CommandType.PING);
command.setBody(EMPTY_BODY_ARRAY); command.setBody(EMPTY_BODY_ARRAY);
return command; return command;

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java

@ -29,11 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class GetLogBytesRequestCommand implements Serializable { public class GetLogBytesRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
/** /**
* log path * log path
*/ */
@ -60,7 +55,7 @@ public class GetLogBytesRequestCommand implements Serializable {
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement()); Command command = new Command();
command.setType(CommandType.GET_LOG_BYTES_REQUEST); command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = FastJsonSerializer.serialize(this); byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body); command.setBody(body);

11
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java

@ -29,11 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/ */
public class RollViewLogRequestCommand implements Serializable { public class RollViewLogRequestCommand implements Serializable {
/**
* request id
*/
private static final AtomicLong REQUEST = new AtomicLong(1);
/** /**
* log path * log path
*/ */
@ -45,7 +40,7 @@ public class RollViewLogRequestCommand implements Serializable {
private int skipLineNum; private int skipLineNum;
/** /**
* query log line number limit * query line number
*/ */
private int limit; private int limit;
@ -83,12 +78,12 @@ public class RollViewLogRequestCommand implements Serializable {
} }
/** /**
* package request command * package request command
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement()); Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this); byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body); command.setBody(body);

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java

@ -30,10 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
public class ViewLogRequestCommand implements Serializable { public class ViewLogRequestCommand implements Serializable {
/** /**
* request id * log path
*/ */
private static final AtomicLong REQUEST = new AtomicLong(1);
private String path; private String path;
public ViewLogRequestCommand() { public ViewLogRequestCommand() {
@ -57,7 +55,7 @@ public class ViewLogRequestCommand implements Serializable {
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement()); Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = FastJsonSerializer.serialize(this); byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body); command.setBody(body);

38
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTimeoutException.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* timeout exception
*/
public class RemotingTimeoutException extends RemotingException{
public RemotingTimeoutException(String message) {
super(message);
}
public RemotingTimeoutException(String address, long timeoutMillis) {
this(address, timeoutMillis, null);
}
public RemotingTimeoutException(String address, long timeoutMillis, Throwable cause) {
super(String.format("wait response on the channel %s timeout %s", address, timeoutMillis), cause);
}
}

27
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingTooMuchRequestException.java

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.exceptions;
/**
* too much request exception
*/
public class RemotingTooMuchRequestException extends RemotingException{
public RemotingTooMuchRequestException(String message) {
super(message);
}
}

31
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/InvokeCallback.java

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.future;
/**
* invoke callback
*/
public interface InvokeCallback {
/**
* operation
*
* @param responseFuture responseFuture
*/
void operationComplete(final ResponseFuture responseFuture);
}

41
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ReleaseSemaphore.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* release semaphore
*/
public class ReleaseSemaphore {
private final Semaphore semaphore;
private final AtomicBoolean released;
public ReleaseSemaphore(Semaphore semaphore){
this.semaphore = semaphore;
this.released = new AtomicBoolean(false);
}
public void release(){
if(this.released.compareAndSet(false, true)){
this.semaphore.release();
}
}
}

164
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/future/ResponseFuture.java

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.future;
import org.apache.dolphinscheduler.remote.command.Command;
import java.util.concurrent.*;
/**
* response future
*/
public class ResponseFuture {
private final static ConcurrentHashMap<Long,ResponseFuture> FUTURE_TABLE = new ConcurrentHashMap<>(256);
/**
* request unique identification
*/
private final long opaque;
/**
* timeout
*/
private final long timeoutMillis;
/**
* invokeCallback function
*/
private final InvokeCallback invokeCallback;
/**
* releaseSemaphore
*/
private final ReleaseSemaphore releaseSemaphore;
private final CountDownLatch latch = new CountDownLatch(1);
private final long beginTimestamp = System.currentTimeMillis();
/**
* response command
*/
private volatile Command responseCommand;
private volatile boolean sendOk = true;
private volatile Throwable cause;
public ResponseFuture(long opaque, long timeoutMillis, InvokeCallback invokeCallback, ReleaseSemaphore releaseSemaphore) {
this.opaque = opaque;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.releaseSemaphore = releaseSemaphore;
FUTURE_TABLE.put(opaque, this);
}
/**
* wait for response
*
* @return command
* @throws InterruptedException
*/
public Command waitResponse() throws InterruptedException {
this.latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
return this.responseCommand;
}
/**
* put response
*
* @param responseCommand responseCommand
*/
public void putResponse(final Command responseCommand) {
this.responseCommand = responseCommand;
this.latch.countDown();
FUTURE_TABLE.remove(opaque);
}
public static ResponseFuture getFuture(long opaque){
return FUTURE_TABLE.get(opaque);
}
/**
* whether timeout
* @return timeout
*/
public boolean isTimeout() {
long diff = System.currentTimeMillis() - this.beginTimestamp;
return diff > this.timeoutMillis;
}
/**
* execute invoke callback
*/
public void executeInvokeCallback() {
if (invokeCallback != null) {
invokeCallback.operationComplete(this);
}
}
public boolean isSendOK() {
return sendOk;
}
public void setSendOk(boolean sendOk) {
this.sendOk = sendOk;
}
public void setCause(Throwable cause) {
this.cause = cause;
}
public Throwable getCause() {
return cause;
}
public long getOpaque() {
return opaque;
}
public long getTimeoutMillis() {
return timeoutMillis;
}
public long getBeginTimestamp() {
return beginTimestamp;
}
public Command getResponseCommand() {
return responseCommand;
}
public void setResponseCommand(Command responseCommand) {
this.responseCommand = responseCommand;
}
public InvokeCallback getInvokeCallback() {
return invokeCallback;
}
/**
* release
*/
public void release() {
if(this.releaseSemaphore != null){
this.releaseSemaphore.release();
}
}
}

87
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java

@ -19,16 +19,12 @@ package org.apache.dolphinscheduler.remote.handler;
import io.netty.channel.*; import io.netty.channel.*;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
/** /**
* netty client request handler * netty client request handler
@ -39,17 +35,18 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/** /**
* netty remote client * netty client
*/ */
private final NettyRemotingClient nettyRemotingClient; private final NettyRemotingClient nettyRemotingClient;
/** /**
* client processors queue * callback thread executor
*/ */
private final ConcurrentHashMap<CommandType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap(); private final ExecutorService callbackExecutor;
public NettyClientHandler(NettyRemotingClient nettyRemotingClient){ public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor){
this.nettyRemotingClient = nettyRemotingClient; this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
} }
/** /**
@ -61,7 +58,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/ */
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
@ -74,80 +71,50 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
*/ */
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
processReceived(ctx.channel(), (Command)msg); processReceived((Command)msg);
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
}
/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if(executorRef == null){
executorRef = nettyRemotingClient.getDefaultExecutor();
}
this.processors.putIfAbsent(commandType, new Pair<NettyRequestProcessor, ExecutorService>(processor, executorRef));
} }
/** /**
* process received logic * process received logic
* *
* @param channel channel * @param responseCommand responseCommand
* @param msg message
*/ */
private void processReceived(final Channel channel, final Command msg) { private void processReceived(final Command responseCommand) {
final CommandType commandType = msg.getType(); ResponseFuture future = ResponseFuture.getFuture(responseCommand.getOpaque());
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(commandType); if(future != null){
if (pair != null) { future.setResponseCommand(responseCommand);
Runnable r = new Runnable() { future.release();
@Override if(future.getInvokeCallback() != null){
public void run() { this.callbackExecutor.submit(new Runnable() {
try { @Override
pair.getLeft().process(channel, msg); public void run() {
} catch (Throwable ex) { future.executeInvokeCallback();
logger.error("process msg {} error : {}", msg, ex);
} }
} });
}; } else{
try { future.putResponse(responseCommand);
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
} }
} else { } else{
logger.warn("commandType {} not support", commandType); logger.warn("receive response {}, but not matched any request ", responseCommand);
} }
} }
/** /**
* caught exception * caught exception
*
* @param ctx channel handler context * @param ctx channel handler context
* @param cause cause * @param cause cause
* @throws Exception * @throws Exception
*/ */
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught : {}", cause); logger.error("exceptionCaught : {}", cause);
nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel())); nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close(); ctx.channel().close();
} }
/** /**
* channel write changed * channel write changed
*
* @param ctx channel handler context * @param ctx channel handler context
* @throws Exception * @throws Exception
*/ */

38
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/CallerThreadExecutePolicy.java

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* caller thread execute
*/
public class CallerThreadExecutePolicy implements RejectedExecutionHandler {
private final Logger logger = LoggerFactory.getLogger(CallerThreadExecutePolicy.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.warn("queue is full, trigger caller thread execute");
r.run();
}
}

61
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/NamedThreadFactory.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* thread factory
*/
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger increment = new AtomicInteger(1);
/**
* name
*/
private final String name;
/**
* count
*/
private final int count;
public NamedThreadFactory(String name){
this(name, 0);
}
public NamedThreadFactory(String name, int count){
this.name = name;
this.count = count;
}
/**
* create thread
* @param r runnable
* @return thread
*/
@Override
public Thread newThread(Runnable r) {
final String threadName = count > 0 ? String.format(name + "_%d_%d", count, increment.getAndIncrement())
: String.format(name + "_%d", increment.getAndIncrement());
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}
}

48
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java

@ -24,6 +24,8 @@ import org.apache.dolphinscheduler.remote.command.Ping;
import org.apache.dolphinscheduler.remote.command.Pong; import org.apache.dolphinscheduler.remote.command.Pong;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.Address;
import org.junit.Assert; import org.junit.Assert;
@ -39,10 +41,10 @@ public class NettyRemotingClientTest {
/** /**
* test ping * test sned sync
*/ */
@Test @Test
public void testSend(){ public void testSendSync(){
NettyServerConfig serverConfig = new NettyServerConfig(); NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig); NettyRemotingServer server = new NettyRemotingServer(serverConfig);
@ -52,26 +54,54 @@ public class NettyRemotingClientTest {
channel.writeAndFlush(Pong.create(command.getOpaque())); channel.writeAndFlush(Pong.create(command.getOpaque()));
} }
}); });
server.start(); server.start();
// //
CountDownLatch latch = new CountDownLatch(1);
AtomicLong opaque = new AtomicLong(1);
final NettyClientConfig clientConfig = new NettyClientConfig(); final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient client = new NettyRemotingClient(clientConfig); NettyRemotingClient client = new NettyRemotingClient(clientConfig);
client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() { Command commandPing = Ping.create();
try {
Command response = client.sendSync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000);
Assert.assertEquals(commandPing.getOpaque(), response.getOpaque());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* test sned async
*/
@Test
public void testSendAsync(){
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
opaque.set(command.getOpaque()); channel.writeAndFlush(Pong.create(command.getOpaque()));
latch.countDown();
} }
}); });
server.start();
//
final NettyClientConfig clientConfig = new NettyClientConfig();
NettyRemotingClient client = new NettyRemotingClient(clientConfig);
CountDownLatch latch = new CountDownLatch(1);
Command commandPing = Ping.create(); Command commandPing = Ping.create();
try { try {
client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing); final AtomicLong opaque = new AtomicLong(0);
client.sendAsync(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing, 2000, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
opaque.set(responseFuture.getOpaque());
latch.countDown();
}
});
latch.await(); latch.await();
Assert.assertEquals(commandPing.getOpaque(), opaque.get());
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
Assert.assertEquals(opaque.get(), commandPing.getOpaque());
} }
} }

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java

@ -378,8 +378,8 @@ public class ProcessUtils {
LogClientService logClient = null; LogClientService logClient = null;
String log = null; String log = null;
try { try {
logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT); logClient = new LogClientService();
log = logClient.viewLog(taskInstance.getLogPath()); log = logClient.viewLog(taskInstance.getHost(), Constants.RPC_PORT, taskInstance.getLogPath());
} finally { } finally {
if(logClient != null){ if(logClient != null){
logClient.close(); logClient.close();

100
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -16,13 +16,11 @@
*/ */
package org.apache.dolphinscheduler.service.log; package org.apache.dolphinscheduler.service.log;
import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*; import org.apache.dolphinscheduler.remote.command.log.*;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Address; import org.apache.dolphinscheduler.remote.utils.Address;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -32,7 +30,7 @@ import org.slf4j.LoggerFactory;
/** /**
* log client * log client
*/ */
public class LogClientService implements NettyRequestProcessor { public class LogClientService {
private static final Logger logger = LoggerFactory.getLogger(LogClientService.class); private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
@ -40,8 +38,6 @@ public class LogClientService implements NettyRequestProcessor {
private final NettyRemotingClient client; private final NettyRemotingClient client;
private final Address address;
/** /**
* request time out * request time out
*/ */
@ -49,18 +45,11 @@ public class LogClientService implements NettyRequestProcessor {
/** /**
* construct client * construct client
* @param host host
* @param port port
*/ */
public LogClientService(String host, int port) { public LogClientService() {
this.address = new Address(host, port);
this.clientConfig = new NettyClientConfig(); this.clientConfig = new NettyClientConfig();
this.clientConfig.setWorkerThreads(1); this.clientConfig.setWorkerThreads(4);
this.client = new NettyRemotingClient(clientConfig); this.client = new NettyRemotingClient(clientConfig);
this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
} }
/** /**
@ -73,94 +62,87 @@ public class LogClientService implements NettyRequestProcessor {
/** /**
* roll view log * roll view log
* @param host host
* @param port port
* @param path path * @param path path
* @param skipLineNum skip line number * @param skipLineNum skip line number
* @param limit limit * @param limit limit
* @return log content * @return log content
*/ */
public String rollViewLog(String path,int skipLineNum,int limit) { public String rollViewLog(String host, int port, String path,int skipLineNum,int limit) {
logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit); logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit); RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
String result = ""; String result = "";
final Address address = new Address(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
this.client.send(address, command); Command response = this.client.sendSync(address, command, logRequestTimeout);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); if(response != null){
result = ((String)promise.getResult()); RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg();
}
} catch (Exception e) { } catch (Exception e) {
logger.error("roll view log error", e); logger.error("roll view log error", e);
} finally {
this.client.closeChannel(address);
} }
return result; return result;
} }
/** /**
* view log * view log
* @param host host
* @param port port
* @param path path * @param path path
* @return log content * @return log content
*/ */
public String viewLog(String path) { public String viewLog(String host, int port, String path) {
logger.info("view log path {}", path); logger.info("view log path {}", path);
ViewLogRequestCommand request = new ViewLogRequestCommand(path); ViewLogRequestCommand request = new ViewLogRequestCommand(path);
String result = ""; String result = "";
final Address address = new Address(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
this.client.send(address, command); Command response = this.client.sendSync(address, command, logRequestTimeout);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); if(response != null){
result = ((String)promise.getResult()); ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
response.getBody(), ViewLogResponseCommand.class);
return viewLog.getMsg();
}
} catch (Exception e) { } catch (Exception e) {
logger.error("view log error", e); logger.error("view log error", e);
} finally {
this.client.closeChannel(address);
} }
return result; return result;
} }
/** /**
* get log size * get log size
* @param host host
* @param port port
* @param path log path * @param path log path
* @return log content bytes * @return log content bytes
*/ */
public byte[] getLogBytes(String path) { public byte[] getLogBytes(String host, int port, String path) {
logger.info("log path {}", path); logger.info("log path {}", path);
GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path); GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
byte[] result = null; byte[] result = null;
final Address address = new Address(host, port);
try { try {
Command command = request.convert2Command(); Command command = request.convert2Command();
this.client.send(address, command); Command response = this.client.sendSync(address, command, logRequestTimeout);
LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout); if(response != null){
result = (byte[])promise.getResult(); GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData();
}
} catch (Exception e) { } catch (Exception e) {
logger.error("get log size error", e); logger.error("get log size error", e);
} finally {
this.client.closeChannel(address);
} }
return result; return result;
} }
@Override
public void process(Channel channel, Command command) {
logger.info("received log response : {}", command);
switch (command.getType()){
case ROLL_VIEW_LOG_RESPONSE:
RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
command.getBody(), RollViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
break;
case VIEW_WHOLE_LOG_RESPONSE:
ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
command.getBody(), ViewLogResponseCommand.class);
LogPromise.notify(command.getOpaque(), viewLog.getMsg());
break;
case GET_LOG_BYTES_RESPONSE:
GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
command.getBody(), GetLogBytesResponseCommand.class);
LogPromise.notify(command.getOpaque(), getLog.getData());
break;
default:
throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
}
}
public static void main(String[] args) throws Exception{
LogClientService logClient = new LogClientService("192.168.220.247", 50051);
byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
System.out.println(new String(logBytes));
}
} }

28
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java

@ -28,14 +28,29 @@ public class LogPromise {
private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Long, LogPromise> PROMISES = new ConcurrentHashMap<>();
/**
* request unique identification
*/
private long opaque; private long opaque;
/**
* start timemillis
*/
private final long start; private final long start;
/**
* timeout
*/
private final long timeout; private final long timeout;
/**
* latch
*/
private final CountDownLatch latch; private final CountDownLatch latch;
/**
* result
*/
private Object result; private Object result;
public LogPromise(long opaque, long timeout){ public LogPromise(long opaque, long timeout){
@ -59,15 +74,28 @@ public class LogPromise {
} }
} }
/**
* countdown
*
* @param result result
*/
private void doCountDown(Object result){ private void doCountDown(Object result){
this.result = result; this.result = result;
this.latch.countDown(); this.latch.countDown();
} }
/**
* whether timeout
* @return timeout
*/
public boolean isTimeout(){ public boolean isTimeout(){
return System.currentTimeMillis() - start > timeout; return System.currentTimeMillis() - start > timeout;
} }
/**
* get result
* @return
*/
public Object getResult(){ public Object getResult(){
try { try {
latch.await(timeout, TimeUnit.MILLISECONDS); latch.await(timeout, TimeUnit.MILLISECONDS);

Loading…
Cancel
Save