shardItems) {
this.shardItems = shardItems;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getApplicationName() {
return applicationName;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public int getConnectorPort() {
return connectorPort;
}
public void setConnectorPort(int connectorPort) {
this.connectorPort = connectorPort;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_REQUEST);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
new file mode 100644
index 0000000000..fafb57535b
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java
@@ -0,0 +1 @@
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public class ExecuteTaskResponseCommand implements Serializable {
private static final AtomicLong REQUEST = new AtomicLong(1);
private String taskId;
private String attemptId;
private Object result;
private long receivedTime;
private int executeCount;
private long executeTime;
public String getAttemptId() {
return attemptId;
}
public void setAttemptId(String attemptId) {
this.attemptId = attemptId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
public long getReceivedTime() {
return receivedTime;
}
public void setReceivedTime(long receivedTime) {
this.receivedTime = receivedTime;
}
public int getExecuteCount() {
return executeCount;
}
public void setExecuteCount(int executeCount) {
this.executeCount = executeCount;
}
public long getExecuteTime() {
return executeTime;
}
public void setExecuteTime(long executeTime) {
this.executeTime = executeTime;
}
public Command convert2Command(){
Command command = new Command(REQUEST.getAndIncrement());
command.setType(CommandType.EXECUTE_TASK_RESPONSE);
byte[] body = FastJsonSerializer.serialize(this);
command.setBody(body);
return command;
}
}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
new file mode 100644
index 0000000000..365d451564
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class Ping implements Serializable {
+
+ private static final AtomicLong ID = new AtomicLong(1);
+
+ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+ private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+ private static final ByteBuf PING_BUF;
+
+ static {
+ ByteBuf ping = Unpooled.buffer();
+ ping.writeByte(Command.MAGIC);
+ ping.writeByte(CommandType.PING.ordinal());
+ ping.writeLong(0);
+ ping.writeInt(0);
+ ping.writeBytes(EMPTY_BODY);
+ PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+ }
+
+ public static ByteBuf pingContent(){
+ return PING_BUF.duplicate();
+ }
+
+ public static Command create(){
+ Command command = new Command(ID.getAndIncrement());
+ command.setType(CommandType.PING);
+ command.setBody(EMPTY_BODY_ARRAY);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
new file mode 100644
index 0000000000..bc5abdad79
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.Serializable;
+
+
+public class Pong implements Serializable {
+
+ protected static ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER;
+
+ private static byte[] EMPTY_BODY_ARRAY = new byte[0];
+
+ private static final ByteBuf PONG_BUF;
+
+ static {
+ ByteBuf ping = Unpooled.buffer();
+ ping.writeByte(Command.MAGIC);
+ ping.writeByte(CommandType.PONG.ordinal());
+ ping.writeLong(0);
+ ping.writeInt(0);
+ ping.writeBytes(EMPTY_BODY);
+ PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly();
+ }
+
+ public static ByteBuf pingContent(){
+ return PONG_BUF.duplicate();
+ }
+
+ public static Command create(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.PONG);
+ command.setBody(EMPTY_BODY_ARRAY);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
new file mode 100644
index 0000000000..1a2e6e4dd1
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * get log bytes request command
+ */
+public class GetLogBytesRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public GetLogBytesRequestCommand() {
+ }
+
+ public GetLogBytesRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.GET_LOG_BYTES_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
new file mode 100644
index 0000000000..05692fb5c9
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * get log bytes response command
+ */
+public class GetLogBytesResponseCommand implements Serializable {
+
+ private byte[] data;
+
+ public GetLogBytesResponseCommand() {
+ }
+
+ public GetLogBytesResponseCommand(byte[] data) {
+ this.data = data;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
new file mode 100644
index 0000000000..49d19aa1f2
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * roll view log request command
+ */
+public class RollViewLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ private int skipLineNum;
+
+ private int limit;
+
+ public RollViewLogRequestCommand() {
+ }
+
+ public RollViewLogRequestCommand(String path, int skipLineNum, int limit) {
+ this.path = path;
+ this.skipLineNum = skipLineNum;
+ this.limit = limit;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public int getSkipLineNum() {
+ return skipLineNum;
+ }
+
+ public void setSkipLineNum(int skipLineNum) {
+ this.skipLineNum = skipLineNum;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
new file mode 100644
index 0000000000..def3257073
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * roll view log response command
+ */
+public class RollViewLogResponseCommand implements Serializable {
+
+ private String msg;
+
+ public RollViewLogResponseCommand() {
+ }
+
+ public RollViewLogResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
new file mode 100644
index 0000000000..9ba9cd3c23
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * view log request command
+ */
+public class ViewLogRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public ViewLogRequestCommand() {
+ }
+
+ public ViewLogRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
new file mode 100644
index 0000000000..6e3c799a3d
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.command.log;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * view log response command
+ */
+public class ViewLogResponseCommand implements Serializable {
+
+ private String msg;
+
+ public ViewLogResponseCommand() {
+ }
+
+ public ViewLogResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
new file mode 100644
index 0000000000..56d2643a67
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyClientConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+/**
+ * netty client config
+ */
+public class NettyClientConfig {
+
+ private int workerThreads = Constants.CPUS;
+
+ private boolean tcpNoDelay = true;
+
+ private boolean soKeepalive = true;
+
+ private int sendBufferSize = 65535;
+
+ private int receiveBufferSize = 65535;
+
+ public int getWorkerThreads() {
+ return workerThreads;
+ }
+
+ public void setWorkerThreads(int workerThreads) {
+ this.workerThreads = workerThreads;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isSoKeepalive() {
+ return soKeepalive;
+ }
+
+ public void setSoKeepalive(boolean soKeepalive) {
+ this.soKeepalive = soKeepalive;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
new file mode 100644
index 0000000000..847f316089
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/config/NettyServerConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.config;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+
+/**
+ * netty server config
+ */
+public class NettyServerConfig {
+
+ private int soBacklog = 1024;
+
+ private boolean tcpNoDelay = true;
+
+ private boolean soKeepalive = true;
+
+ private int sendBufferSize = 65535;
+
+ private int receiveBufferSize = 65535;
+
+ private int workerThread = Constants.CPUS;
+
+ private int listenPort = 12346;
+
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ public void setListenPort(int listenPort) {
+ this.listenPort = listenPort;
+ }
+
+ public int getSoBacklog() {
+ return soBacklog;
+ }
+
+ public void setSoBacklog(int soBacklog) {
+ this.soBacklog = soBacklog;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public boolean isSoKeepalive() {
+ return soKeepalive;
+ }
+
+ public void setSoKeepalive(boolean soKeepalive) {
+ this.soKeepalive = soKeepalive;
+ }
+
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ public void setSendBufferSize(int sendBufferSize) {
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public int getWorkerThread() {
+ return workerThread;
+ }
+
+ public void setWorkerThread(int workerThread) {
+ this.workerThread = workerThread;
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
new file mode 100644
index 0000000000..29d48db8f8
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/exceptions/RemotingException.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.exceptions;
+
+/**
+ * remote exception
+ */
+public class RemotingException extends Exception {
+
+ public RemotingException() {
+ super();
+ }
+
+ /** Constructs a new runtime exception with the specified detail message.
+ * The cause is not initialized, and may subsequently be initialized by a
+ * call to {@link #initCause}.
+ *
+ * @param message the detail message. The detail message is saved for
+ * later retrieval by the {@link #getMessage()} method.
+ */
+ public RemotingException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs a new runtime exception with the specified detail message and
+ * cause. Note that the detail message associated with
+ * {@code cause} is not automatically incorporated in
+ * this runtime exception's detail message.
+ *
+ * @param message the detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method).
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public RemotingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /** Constructs a new runtime exception with the specified cause and a
+ * detail message of (cause==null ? null : cause.toString())
+ * (which typically contains the class and detail message of
+ * cause). This constructor is useful for runtime exceptions
+ * that are little more than wrappers for other throwables.
+ *
+ * @param cause the cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is
+ * permitted, and indicates that the cause is nonexistent or
+ * unknown.)
+ * @since 1.4
+ */
+ public RemotingException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a new runtime exception with the specified detail
+ * message, cause, suppression enabled or disabled, and writable
+ * stack trace enabled or disabled.
+ *
+ * @param message the detail message.
+ * @param cause the cause. (A {@code null} value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ * @param enableSuppression whether or not suppression is enabled
+ * or disabled
+ * @param writableStackTrace whether or not the stack trace should
+ * be writable
+ *
+ * @since 1.7
+ */
+ protected RemotingException(String message, Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
new file mode 100644
index 0000000000..b06308090f
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyClientHandler.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * netty client request handler
+ */
+@ChannelHandler.Sharable
+public class NettyClientHandler extends ChannelInboundHandlerAdapter {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
+
+ private final NettyRemotingClient nettyRemotingClient;
+
+ private final ConcurrentHashMap> processors = new ConcurrentHashMap();
+
+ public NettyClientHandler(NettyRemotingClient nettyRemotingClient){
+ this.nettyRemotingClient = nettyRemotingClient;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, nettyRemotingClient.getDefaultExecutor());
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = nettyRemotingClient.getDefaultExecutor();
+ }
+ this.processors.putIfAbsent(commandType, new Pair(processor, executorRef));
+ }
+
+ private void processReceived(final Channel channel, final Command msg) {
+ final CommandType commandType = msg.getType();
+ final Pair pair = processors.get(commandType);
+ if (pair != null) {
+ Runnable r = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ pair.getLeft().process(channel, msg);
+ } catch (Throwable ex) {
+ logger.error("process msg {} error : {}", msg, ex);
+ }
+ }
+ };
+ try {
+ pair.getRight().submit(r);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("commandType {} not support", commandType);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exceptionCaught : {}", cause);
+ nettyRemotingClient.removeChannel(ChannelUtils.toAddress(ctx.channel()));
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ Channel ch = ctx.channel();
+ ChannelConfig config = ch.config();
+
+ if (!ch.isWritable()) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is not writable, over high water level : {}",
+ new Object[]{ch, config.getWriteBufferHighWaterMark()});
+ }
+
+ config.setAutoRead(false);
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is writable, to low water : {}",
+ new Object[]{ch, config.getWriteBufferLowWaterMark()});
+ }
+ config.setAutoRead(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
new file mode 100644
index 0000000000..8a7ee39a77
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/handler/NettyServerHandler.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.handler;
+
+import io.netty.channel.*;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
+import org.apache.dolphinscheduler.remote.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * netty server request handler
+ */
+@ChannelHandler.Sharable
+public class NettyServerHandler extends ChannelInboundHandlerAdapter {
+
+ private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+
+ private final NettyRemotingServer nettyRemotingServer;
+
+ private final ConcurrentHashMap> processors = new ConcurrentHashMap();
+
+ public NettyServerHandler(NettyRemotingServer nettyRemotingServer){
+ this.nettyRemotingServer = nettyRemotingServer;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ processReceived(ctx.channel(), (Command)msg);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
+ this.registerProcessor(commandType, processor, null);
+ }
+
+ public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
+ ExecutorService executorRef = executor;
+ if(executorRef == null){
+ executorRef = nettyRemotingServer.getDefaultExecutor();
+ }
+ this.processors.putIfAbsent(commandType, new Pair(processor, executorRef));
+ }
+
+ private void processReceived(final Channel channel, final Command msg) {
+ final CommandType commandType = msg.getType();
+ final Pair pair = processors.get(commandType);
+ if (pair != null) {
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ pair.getLeft().process(channel, msg);
+ } catch (Throwable ex) {
+ logger.error("process msg {} error : {}", msg, ex);
+ }
+ }
+ };
+ try {
+ pair.getRight().submit(r);
+ } catch (RejectedExecutionException e) {
+ logger.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
+ }
+ } else {
+ logger.warn("commandType {} not support", commandType);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ logger.error("exceptionCaught : {}", cause);
+ ctx.channel().close();
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ Channel ch = ctx.channel();
+ ChannelConfig config = ch.config();
+
+ if (!ch.isWritable()) {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is not writable, over high water level : {}",
+ new Object[]{ch, config.getWriteBufferHighWaterMark()});
+ }
+
+ config.setAutoRead(false);
+ } else {
+ if (logger.isWarnEnabled()) {
+ logger.warn("{} is writable, to low water : {}",
+ new Object[]{ch, config.getWriteBufferLowWaterMark()});
+ }
+ config.setAutoRead(true);
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
new file mode 100644
index 0000000000..10a8195710
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/NettyRequestProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.processor;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+
+/**
+ * netty request processor
+ */
+public interface NettyRequestProcessor {
+
+ void process(final Channel channel, final Command command);
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
new file mode 100644
index 0000000000..221b895cb9
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Address.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.io.Serializable;
+
+/**
+ * server address
+ */
+public class Address implements Serializable {
+
+ private String host;
+
+ private int port;
+
+ public Address(){
+ //NOP
+ }
+
+ public Address(String host, int port){
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Address other = (Address) obj;
+ if (host == null) {
+ if (other.host != null) {
+ return false;
+ }
+ } else if (!host.equals(other.host)) {
+ return false;
+ }
+ return port == other.port;
+ }
+
+ @Override
+ public String toString() {
+ return "Address [host=" + host + ", port=" + port + "]";
+ }
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
new file mode 100644
index 0000000000..e9d93da41d
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/ChannelUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import io.netty.channel.Channel;
+
+import java.net.InetSocketAddress;
+
+/**
+ * channel utils
+ */
+public class ChannelUtils {
+
+ public static String getLocalAddress(Channel channel){
+ return ((InetSocketAddress)channel.localAddress()).getAddress().getHostAddress();
+ }
+
+ public static String getRemoteAddress(Channel channel){
+ return ((InetSocketAddress)channel.remoteAddress()).getAddress().getHostAddress();
+ }
+
+ public static Address toAddress(Channel channel){
+ InetSocketAddress socketAddress = ((InetSocketAddress)channel.remoteAddress());
+ return new Address(socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
new file mode 100644
index 0000000000..c0a930ca41
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import java.nio.charset.Charset;
+
+public class Constants {
+
+ public static final String COMMA = ",";
+
+ public static final String SLASH = "/";
+
+ public static final Charset UTF8 = Charset.forName("UTF-8");
+
+ public static final int CPUS = Runtime.getRuntime().availableProcessors();
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
new file mode 100644
index 0000000000..a9b85461ff
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/FastJsonSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.remote.utils;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * json serialize or deserialize
+ */
+public class FastJsonSerializer {
+
+ public static byte[] serialize(T obj) {
+ String json = JSON.toJSONString(obj);
+ return json.getBytes(Constants.UTF8);
+ }
+
+ public static String serializeToString(T obj) {
+ return JSON.toJSONString(obj);
+ }
+
+ public static T deserialize(byte[] src, Class clazz) {
+ return JSON.parseObject(new String(src, Constants.UTF8), clazz);
+ }
+
+}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
new file mode 100644
index 0000000000..a79a3748cd
--- /dev/null
+++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Pair.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.remote.utils;
+
+
+public class Pair {
+
+ private L left;
+
+ private R right;
+
+ public Pair(L left, R right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public L getLeft() {
+ return left;
+ }
+
+ public void setLeft(L left) {
+ this.left = left;
+ }
+
+ public R getRight() {
+ return right;
+ }
+
+ public void setRight(R right) {
+ this.right = right;
+ }
+}
diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
new file mode 100644
index 0000000000..19fd564bd5
--- /dev/null
+++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
@@ -0,0 +1,71 @@
+package org.apache.dolphinscheduler.remote;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.Ping;
+import org.apache.dolphinscheduler.remote.command.Pong;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class NettyRemotingClientTest {
+
+
+ @Test
+ public void testSend(){
+ NettyServerConfig serverConfig = new NettyServerConfig();
+
+ NettyRemotingServer server = new NettyRemotingServer(serverConfig);
+ server.registerProcessor(CommandType.PING, new NettyRequestProcessor() {
+ @Override
+ public void process(Channel channel, Command command) {
+ channel.writeAndFlush(Pong.create(command.getOpaque()));
+ }
+ });
+ server.start();
+ //
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicLong opaque = new AtomicLong(1);
+ final NettyClientConfig clientConfig = new NettyClientConfig();
+ NettyRemotingClient client = new NettyRemotingClient(clientConfig);
+ client.registerProcessor(CommandType.PONG, new NettyRequestProcessor() {
+ @Override
+ public void process(Channel channel, Command command) {
+ opaque.set(command.getOpaque());
+ latch.countDown();
+ }
+ });
+ Command commandPing = Ping.create();
+ try {
+ client.send(new Address("127.0.0.1", serverConfig.getListenPort()), commandPing);
+ latch.await();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ Assert.assertEquals(opaque.get(), commandPing.getOpaque());
+ }
+}
diff --git a/dolphinscheduler-rpc/pom.xml b/dolphinscheduler-rpc/pom.xml
deleted file mode 100644
index 680a4a24c0..0000000000
--- a/dolphinscheduler-rpc/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-
-
-
-
-
- org.apache.dolphinscheduler
- dolphinscheduler
- 1.2.1-SNAPSHOT
-
- 4.0.0
-
- dolphinscheduler-rpc
-
- dolphinscheduler-rpc
- https://github.com/apache/incubator-dolphinscheduler
-
-
- UTF-8
- 1.8
- 1.8
-
- 3.5.1
- 1.9.0
-
-
-
-
- com.google.protobuf
- protobuf-java
- ${protobuf.version}
-
-
- io.grpc
- grpc-netty
- ${grpc.version}
-
-
- io.grpc
- grpc-protobuf
- ${grpc.version}
-
-
- io.grpc
- grpc-stub
- ${grpc.version}
-
-
-
- com.google.guava
- guava
-
-
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.5.0.Final
-
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.5.0
-
- com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
-
-
-
- compile
-
- compile
-
-
-
- compile-custom
-
- compile-custom
-
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
-
-
- ${java.version}
- ${project.build.sourceEncoding}
-
-
-
-
-
diff --git a/dolphinscheduler-rpc/src/main/proto/scheduler.proto b/dolphinscheduler-rpc/src/main/proto/scheduler.proto
deleted file mode 100644
index b8b595cb2a..0000000000
--- a/dolphinscheduler-rpc/src/main/proto/scheduler.proto
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-syntax = "proto3";
-
-package schduler;
-
-option java_multiple_files = true;
-option java_package = "org.apache.dolphinscheduler.rpc";
-option java_outer_classname = "SchdulerProto";
-
-
-/**
- * return str info
- */
-message RetStrInfo {
- /**
- * str msg info
- */
- string msg = 1 ;
-}
-
-/**
- * return byte info
- */
-message RetByteInfo {
- /**
- * byte data info
- */
- bytes data = 1;
-}
-
-/**
- * log parameter
- */
-message LogParameter {
-
- /**
- * path
- */
- string path = 1 ;
-
- /**
- * skip line num
- */
- int32 skipLineNum = 2 ;
-
- /**
- * display limt num
- */
- int32 limit = 3 ;
-}
-
-
-/**
- * path parameter
- */
-message PathParameter {
-
- /**
- * path
- */
- string path = 1 ;
-}
-
-/**
- * log view service
- */
-service LogViewService {
-
- /**
- * roll view log
- */
- rpc rollViewLog(LogParameter) returns (RetStrInfo) {};
-
- /**
- * view all log
- */
- rpc viewLog(PathParameter) returns (RetStrInfo) {};
-
- /**
- * get log bytes
- */
- rpc getLogBytes(PathParameter) returns (RetByteInfo) {};
-}
-
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 751fd919a8..080b87ebaa 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -71,7 +71,7 @@
org.apache.dolphinscheduler
- dolphinscheduler-rpc
+ dolphinscheduler-service
org.apache.curator
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
new file mode 100644
index 0000000000..4e4404ea1c
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * logger request process logic
+ */
+public class LoggerRequestProcessor implements NettyRequestProcessor {
+
+ private final Logger logger = LoggerFactory.getLogger(LoggerRequestProcessor.class);
+
+ private final ThreadPoolExecutor executor;
+
+ public LoggerRequestProcessor(){
+ this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received command : {}", command);
+
+ /**
+ * reuqest task log command type
+ */
+ final CommandType commandType = command.getType();
+ switch (commandType){
+ case GET_LOG_BYTES_REQUEST:
+ GetLogBytesRequestCommand getLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesRequestCommand.class);
+ byte[] bytes = getFileContentBytes(getLogRequest.getPath());
+ GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
+ channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case VIEW_WHOLE_LOG_REQUEST:
+ ViewLogRequestCommand viewLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), ViewLogRequestCommand.class);
+ String msg = readWholeFileContent(viewLogRequest.getPath());
+ ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
+ channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
+ break;
+ case ROLL_VIEW_LOG_REQUEST:
+ RollViewLogRequestCommand rollViewLogRequest = FastJsonSerializer.deserialize(
+ command.getBody(), RollViewLogRequestCommand.class);
+ List lines = readPartFileContent(rollViewLogRequest.getPath(),
+ rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
+ StringBuilder builder = new StringBuilder();
+ for (String line : lines){
+ builder.append(line + "\r\n");
+ }
+ RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
+ channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
+ break;
+ default:
+ throw new IllegalArgumentException("unknown commandType");
+ }
+ }
+
+ public ExecutorService getExecutor(){
+ return this.executor;
+ }
+
+ /**
+ * get files content bytes,for down load file
+ *
+ * @param filePath file path
+ * @return byte array of file
+ * @throws Exception exception
+ */
+ private byte[] getFileContentBytes(String filePath){
+ InputStream in = null;
+ ByteArrayOutputStream bos = null;
+ try {
+ in = new FileInputStream(filePath);
+ bos = new ByteArrayOutputStream();
+ byte[] buf = new byte[1024];
+ int len;
+ while ((len = in.read(buf)) != -1) {
+ bos.write(buf, 0, len);
+ }
+ return bos.toByteArray();
+ }catch (IOException e){
+ logger.error("get file bytes error",e);
+ }finally {
+ if (bos != null){
+ try {
+ bos.close();
+ } catch (IOException ignore) {}
+ }
+ if (in != null){
+ try {
+ in.close();
+ } catch (IOException ignore) {}
+ }
+ }
+ return new byte[0];
+ }
+
+ /**
+ * read part file content,can skip any line and read some lines
+ *
+ * @param filePath file path
+ * @param skipLine skip line
+ * @param limit read lines limit
+ * @return part file content
+ */
+ private List readPartFileContent(String filePath,
+ int skipLine,
+ int limit){
+ try (Stream stream = Files.lines(Paths.get(filePath))) {
+ return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
+ } catch (IOException e) {
+ logger.error("read file error",e);
+ }
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * read whole file content
+ *
+ * @param filePath file path
+ * @return whole file content
+ */
+ private String readWholeFileContent(String filePath){
+ BufferedReader br = null;
+ String line;
+ StringBuilder sb = new StringBuilder();
+ try {
+ br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
+ while ((line = br.readLine()) != null){
+ sb.append(line + "\r\n");
+ }
+ return sb.toString();
+ }catch (IOException e){
+ logger.error("read file error",e);
+ }finally {
+ try {
+ if (br != null){
+ br.close();
+ }
+ } catch (IOException ignore) {}
+ }
+ return "";
+ }
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
new file mode 100644
index 0000000000..3520fb09ec
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerServer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.log;
+
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.remote.NettyRemotingServer;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * logger server
+ */
+public class LoggerServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
+
+ /**
+ * netty server
+ */
+ private final NettyRemotingServer server;
+
+ /**
+ * netty server config
+ */
+ private final NettyServerConfig serverConfig;
+
+ /**
+ * loggger request processor
+ */
+ private final LoggerRequestProcessor requestProcessor;
+
+ public LoggerServer(){
+ this.serverConfig = new NettyServerConfig();
+ this.serverConfig.setListenPort(Constants.RPC_PORT);
+ this.server = new NettyRemotingServer(serverConfig);
+ this.requestProcessor = new LoggerRequestProcessor();
+ this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ this.server.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, requestProcessor, requestProcessor.getExecutor());
+ }
+
+ /**
+ * main launches the server from the command line.
+ * @param args arguments
+ */
+ public static void main(String[] args) {
+ final LoggerServer server = new LoggerServer();
+ server.start();
+ }
+
+ /**
+ * server start
+ */
+ public void start() {
+ this.server.start();
+ logger.info("logger server started, listening on port : {}" , Constants.RPC_PORT);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ LoggerServer.this.stop();
+ }
+ });
+ }
+
+ /**
+ * stop
+ */
+ public void stop() {
+ this.server.close();
+ logger.info("logger server shut down");
+ }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
deleted file mode 100644
index 1c6c97b88f..0000000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LogClient.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.grpc.StatusRuntimeException;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * log client
- */
-public class LogClient {
-
- /**
- * logger of LogClient
- */
- private static final Logger logger = LoggerFactory.getLogger(LogClient.class);
-
- /**
- * managed channel
- */
- private final ManagedChannel channel;
-
- /**
- * blocking stub
- */
- private final LogViewServiceGrpc.LogViewServiceBlockingStub blockingStub;
-
- /**
- * Construct client connecting to HelloWorld server at host:port.
- *
- * @param host host
- * @param port port
- */
- public LogClient(String host, int port) {
- this(ManagedChannelBuilder.forAddress(host, port)
- // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
- // needing certificates.
- .usePlaintext(true));
- }
-
- /**
- * Construct client for accessing RouteGuide server using the existing channel.
- *
- * @param channelBuilder channel builder
- */
- LogClient(ManagedChannelBuilder> channelBuilder) {
- /**
- * set max message read size
- */
- channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);
- channel = channelBuilder.build();
- blockingStub = LogViewServiceGrpc.newBlockingStub(channel);
- }
-
- /**
- * shut down channel
- *
- * @throws InterruptedException interrupted exception
- */
- public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
- }
-
- /**
- * roll view log
- *
- * @param path log path
- * @param skipLineNum skip line num
- * @param limit limit
- * @return log content
- */
- public String rollViewLog(String path,int skipLineNum,int limit) {
- logger.info("roll view log , path : {},skipLineNum : {} ,limit :{}", path, skipLineNum, limit);
- LogParameter pathParameter = LogParameter
- .newBuilder()
- .setPath(path)
- .setSkipLineNum(skipLineNum)
- .setLimit(limit)
- .build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.rollViewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("roll view log failed", e);
- return null;
- }
- }
-
- /**
- * view all log
- *
- * @param path log path
- * @return log content
- */
- public String viewLog(String path) {
- logger.info("view log path : {}",path);
-
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetStrInfo retStrInfo;
- try {
- retStrInfo = blockingStub.viewLog(pathParameter);
- return retStrInfo.getMsg();
- } catch (StatusRuntimeException e) {
- logger.error("view log failed", e);
- return null;
- }
- }
-
- /**
- * get log bytes
- *
- * @param path log path
- * @return log content
- */
- public byte[] getLogBytes(String path) {
- logger.info("get log bytes {}",path);
-
- PathParameter pathParameter = PathParameter.newBuilder().setPath(path).build();
- RetByteInfo retByteInfo;
- try {
- retByteInfo = blockingStub.getLogBytes(pathParameter);
- return retByteInfo.getData().toByteArray();
- } catch (StatusRuntimeException e) {
- logger.error("get log bytes failed ", e);
- return null;
- }
- }
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
deleted file mode 100644
index 5ec5df92fc..0000000000
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/rpc/LoggerServer.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.rpc;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.dolphinscheduler.common.Constants;
-import com.google.protobuf.ByteString;
-import io.grpc.Server;
-import io.grpc.ServerBuilder;
-import org.apache.dolphinscheduler.rpc.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * logger server
- */
-public class LoggerServer {
-
- private static final Logger logger = LoggerFactory.getLogger(LoggerServer.class);
-
- /**
- * server
- */
- private Server server;
-
- /**
- * server start
- * @throws IOException io exception
- */
- public void start() throws IOException {
- /* The port on which the server should run */
- int port = Constants.RPC_PORT;
- server = ServerBuilder.forPort(port)
- .addService(new LogViewServiceGrpcImpl())
- .build()
- .start();
- logger.info("server started, listening on port : {}" , port);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- // Use stderr here since the logger may have been reset by its JVM shutdown hook.
- logger.info("shutting down gRPC server since JVM is shutting down");
- LoggerServer.this.stop();
- logger.info("server shut down");
- }
- });
- }
-
- /**
- * stop
- */
- private void stop() {
- if (server != null) {
- server.shutdown();
- }
- }
-
- /**
- * await termination on the main thread since the grpc library uses daemon threads.
- */
- private void blockUntilShutdown() throws InterruptedException {
- if (server != null) {
- server.awaitTermination();
- }
- }
-
- /**
- * main launches the server from the command line.
- */
-
- /**
- * main launches the server from the command line.
- * @param args arguments
- * @throws IOException io exception
- * @throws InterruptedException interrupted exception
- */
- public static void main(String[] args) throws IOException, InterruptedException {
- final LoggerServer server = new LoggerServer();
- server.start();
- server.blockUntilShutdown();
- }
-
- /**
- * Log View Service Grpc Implementation
- */
- static class LogViewServiceGrpcImpl extends LogViewServiceGrpc.LogViewServiceImplBase {
- @Override
- public void rollViewLog(LogParameter request, StreamObserver responseObserver) {
-
- logger.info("log parameter path : {} ,skip line : {}, limit : {}",
- request.getPath(),
- request.getSkipLineNum(),
- request.getLimit());
- List list = readFile(request.getPath(), request.getSkipLineNum(), request.getLimit());
- StringBuilder sb = new StringBuilder();
- boolean errorLineFlag = false;
- for (String line : list){
- sb.append(line + "\r\n");
- }
- RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
- responseObserver.onNext(retInfoBuild);
- responseObserver.onCompleted();
- }
-
- @Override
- public void viewLog(PathParameter request, StreamObserver responseObserver) {
- logger.info("task path is : {} " , request.getPath());
- RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(readFile(request.getPath())).build();
- responseObserver.onNext(retInfoBuild);
- responseObserver.onCompleted();
- }
-
- @Override
- public void getLogBytes(PathParameter request, StreamObserver responseObserver) {
- try {
- ByteString bytes = ByteString.copyFrom(getFileBytes(request.getPath()));
- RetByteInfo.Builder builder = RetByteInfo.newBuilder();
- builder.setData(bytes);
- responseObserver.onNext(builder.build());
- responseObserver.onCompleted();
- }catch (Exception e){
- logger.error("get log bytes failed",e);
- }
- }
- }
-
- /**
- * get files bytes
- *
- * @param path path
- * @return byte array of file
- * @throws Exception exception
- */
- private static byte[] getFileBytes(String path){
- InputStream in = null;
- ByteArrayOutputStream bos = null;
- try {
- in = new FileInputStream(path);
- bos = new ByteArrayOutputStream();
- byte[] buf = new byte[1024];
- int len = 0;
- while ((len = in.read(buf)) != -1) {
- bos.write(buf, 0, len);
- }
- return bos.toByteArray();
- }catch (IOException e){
- logger.error("get file bytes error",e);
- }finally {
- if (bos != null){
- try {
- bos.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (in != null){
- try {
- in.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- return null;
- }
-
- /**
- * read file content
- *
- * @param path
- * @param skipLine
- * @param limit
- * @return
- */
- private static List readFile(String path,int skipLine,int limit){
- try (Stream stream = Files.lines(Paths.get(path))) {
- return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
- } catch (IOException e) {
- logger.error("read file failed",e);
- }
- return null;
- }
-
- /**
- * read file content
- *
- * @param path path
- * @return string of file content
- * @throws Exception exception
- */
- private static String readFile(String path){
- BufferedReader br = null;
- String line = null;
- StringBuilder sb = new StringBuilder();
- try {
- br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
- boolean errorLineFlag = false;
- while ((line = br.readLine()) != null){
- sb.append(line + "\r\n");
- }
-
- return sb.toString();
- }catch (IOException e){
- logger.error("read file failed",e);
- }finally {
- try {
- if (br != null){
- br.close();
- }
- } catch (IOException e) {
- logger.error(e.getMessage(),e);
- }
- }
- return null;
- }
-
-}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
index fd0a08cd8e..69284ee69c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ProcessUtils.java
@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.server.rpc.LogClient;
import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -375,7 +375,7 @@ public class ProcessUtils {
public static void killYarnJob(TaskInstance taskInstance) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
+ LogClientService logClient = new LogClientService(taskInstance.getHost(), Constants.RPC_PORT);
String log = logClient.viewLog(taskInstance.getLogPath());
if (StringUtils.isNotEmpty(log)) {
diff --git a/dolphinscheduler-service/pom.xml b/dolphinscheduler-service/pom.xml
new file mode 100644
index 0000000000..31a2837254
--- /dev/null
+++ b/dolphinscheduler-service/pom.xml
@@ -0,0 +1,29 @@
+
+
+
+
+ dolphinscheduler
+ org.apache.dolphinscheduler
+ 1.2.1-SNAPSHOT
+
+ 4.0.0
+
+ dolphinscheduler-service
+
+ dolphinscheduler-service
+ http://www.example.com
+
+
+ UTF-8
+ 1.7
+ 1.7
+
+
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-remote
+
+
+
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java
new file mode 100644
index 0000000000..7607159c7c
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/MasterResponseCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+
+/**
+ * view log response command
+ */
+public class MasterResponseCommand implements Serializable {
+
+ private String msg;
+
+ public MasterResponseCommand() {
+ }
+
+ public MasterResponseCommand(String msg) {
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public Command convert2Command(long opaque){
+ Command command = new Command(opaque);
+ command.setType(CommandType.MASTER_RESPONSE);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java
new file mode 100644
index 0000000000..419add4d7c
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/WorkerRequestCommand.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service;
+
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * view log request command
+ */
+public class WorkerRequestCommand implements Serializable {
+
+ private static final AtomicLong REQUEST = new AtomicLong(1);
+
+ private String path;
+
+ public WorkerRequestCommand() {
+ }
+
+ public WorkerRequestCommand(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public Command convert2Command(){
+ Command command = new Command(REQUEST.getAndIncrement());
+ command.setType(CommandType.WORKER_REQUEST);
+ byte[] body = FastJsonSerializer.serialize(this);
+ command.setBody(body);
+ return command;
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
new file mode 100644
index 0000000000..a316c7046d
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.log;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * log client
+ */
+public class LogClientService implements NettyRequestProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private final Address address;
+
+ /**
+ * request time out
+ */
+ private final long logRequestTimeout = 10 * 1000;
+
+ /**
+ * construct client
+ * @param host host
+ * @param port port
+ */
+ public LogClientService(String host, int port) {
+ this.address = new Address(host, port);
+ this.clientConfig = new NettyClientConfig();
+ this.clientConfig.setWorkerThreads(1);
+ this.client = new NettyRemotingClient(clientConfig);
+ this.client.registerProcessor(CommandType.ROLL_VIEW_LOG_RESPONSE,this);
+ this.client.registerProcessor(CommandType.VIEW_WHOLE_LOG_RESPONSE, this);
+ this.client.registerProcessor(CommandType.GET_LOG_BYTES_RESPONSE, this);
+
+ }
+
+ /**
+ * shutdown
+ */
+ public void shutdown() {
+ this.client.close();
+ logger.info("logger client shutdown");
+ }
+
+ /**
+ * roll view log
+ * @param path path
+ * @param skipLineNum skip line number
+ * @param limit limit
+ * @return log content
+ */
+ public String rollViewLog(String path,int skipLineNum,int limit) {
+ logger.info("roll view log, path {}, skipLineNum {} ,limit {}", path, skipLineNum, limit);
+ RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ logger.error("roll view log error", e);
+ }
+ return result;
+ }
+
+ /**
+ * view log
+ * @param path path
+ * @return log content
+ */
+ public String viewLog(String path) {
+ logger.info("view log path {}", path);
+ ViewLogRequestCommand request = new ViewLogRequestCommand(path);
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ logger.error("view log error", e);
+ }
+ return result;
+ }
+
+ /**
+ * get log size
+ * @param path log path
+ * @return log content bytes
+ */
+ public byte[] getLogBytes(String path) {
+ logger.info("log path {}", path);
+ GetLogBytesRequestCommand request = new GetLogBytesRequestCommand(path);
+ byte[] result = null;
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = (byte[])promise.getResult();
+ } catch (Exception e) {
+ logger.error("get log size error", e);
+ }
+ return result;
+ }
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received log response : {}", command);
+ switch (command.getType()){
+ case ROLL_VIEW_LOG_RESPONSE:
+ RollViewLogResponseCommand rollReviewLog = FastJsonSerializer.deserialize(
+ command.getBody(), RollViewLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), rollReviewLog.getMsg());
+ break;
+ case VIEW_WHOLE_LOG_RESPONSE:
+ ViewLogResponseCommand viewLog = FastJsonSerializer.deserialize(
+ command.getBody(), ViewLogResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), viewLog.getMsg());
+ break;
+ case GET_LOG_BYTES_RESPONSE:
+ GetLogBytesResponseCommand getLog = FastJsonSerializer.deserialize(
+ command.getBody(), GetLogBytesResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), getLog.getData());
+ break;
+ default:
+ throw new UnsupportedOperationException(String.format("command type : %s is not supported ", command.getType()));
+ }
+ }
+
+ public static void main(String[] args) throws Exception{
+ LogClientService logClient = new LogClientService("192.168.220.247", 50051);
+ byte[] logBytes = logClient.getLogBytes("/opt/program/incubator-dolphinscheduler/logs/1/463/540.log");
+ System.out.println(new String(logBytes));
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
new file mode 100644
index 0000000000..8920b8a527
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogPromise.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.log;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * log asyc callback
+ */
+public class LogPromise {
+
+ private static final ConcurrentHashMap PROMISES = new ConcurrentHashMap<>();
+
+ private long opaque;
+
+ private final long start;
+
+ private final long timeout;
+
+ private final CountDownLatch latch;
+
+ private Object result;
+
+ public LogPromise(long opaque, long timeout){
+ this.opaque = opaque;
+ this.timeout = timeout;
+ this.start = System.currentTimeMillis();
+ this.latch = new CountDownLatch(1);
+ PROMISES.put(opaque, this);
+ }
+
+
+ /**
+ * notify client finish
+ * @param opaque unique identification
+ * @param result result
+ */
+ public static void notify(long opaque, Object result){
+ LogPromise promise = PROMISES.remove(opaque);
+ if(promise != null){
+ promise.doCountDown(result);
+ }
+ }
+
+ private void doCountDown(Object result){
+ this.result = result;
+ this.latch.countDown();
+ }
+
+ public boolean isTimeout(){
+ return System.currentTimeMillis() - start > timeout;
+ }
+
+ public Object getResult(){
+ try {
+ latch.await(timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignore) {
+ }
+ PROMISES.remove(opaque);
+ return this.result;
+ }
+
+
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java
new file mode 100644
index 0000000000..c1071226a8
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/worker/WorkerClientService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.service.worker;
+
+import io.netty.channel.Channel;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.*;
+import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
+import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
+import org.apache.dolphinscheduler.remote.utils.Address;
+import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
+import org.apache.dolphinscheduler.service.MasterResponseCommand;
+import org.apache.dolphinscheduler.service.WorkerRequestCommand;
+import org.apache.dolphinscheduler.service.log.LogPromise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * log client
+ */
+public class WorkerClientService implements NettyRequestProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(WorkerClientService.class);
+
+ private final NettyClientConfig clientConfig;
+
+ private final NettyRemotingClient client;
+
+ private final Address address;
+
+ /**
+ * request time out
+ */
+ private final long logRequestTimeout = 10 * 1000;
+
+ /**
+ * construct client
+ * @param host host
+ * @param port port
+ */
+ public WorkerClientService(String host, int port) {
+ this.address = new Address(host, port);
+ this.clientConfig = new NettyClientConfig();
+ this.clientConfig.setWorkerThreads(1);
+ this.client = new NettyRemotingClient(clientConfig);
+ this.client.registerProcessor(CommandType.MASTER_RESPONSE, this);
+
+ }
+
+ /**
+ * shutdown
+ */
+ public void shutdown() {
+ this.client.close();
+ logger.info("logger client shutdown");
+ }
+
+
+ public String reportResult() {
+ WorkerRequestCommand request = new WorkerRequestCommand();
+ String result = "";
+ try {
+ Command command = request.convert2Command();
+ this.client.send(address, command);
+ LogPromise promise = new LogPromise(command.getOpaque(), logRequestTimeout);
+ result = ((String)promise.getResult());
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.error("roll view log error", e);
+ }
+ return result;
+ }
+
+
+ @Override
+ public void process(Channel channel, Command command) {
+ logger.info("received log response : {}", command);
+ MasterResponseCommand masterResponseCommand = FastJsonSerializer.deserialize(
+ command.getBody(), MasterResponseCommand.class);
+ LogPromise.notify(command.getOpaque(), masterResponseCommand.getMsg());
+ }
+
+ public static void main(String[] args) throws Exception{
+ WorkerClientService workerClientService = new WorkerClientService("192.168.220.247", 1128);
+ String result = workerClientService.reportResult();
+ System.out.println(result);
+
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 875577c672..f6009dc689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -229,7 +229,12 @@
org.apache.dolphinscheduler
- dolphinscheduler-rpc
+ dolphinscheduler-remote
+ ${project.version}
+
+
+ org.apache.dolphinscheduler
+ dolphinscheduler-service
${project.version}
@@ -770,9 +775,6 @@
**/dolphinscheduler-ui/src/view/common/outro.inc
**/dolphinscheduler-ui/src/view/common/meta.inc
**/dolphinscheduler-ui/src/combo/1.0.0/3rd.css
-
- **/dolphinscheduler-rpc/src/main/java/org/apache/dolphinscheduler/rpc/LogViewServiceGrpc.java
-
true
@@ -859,8 +861,9 @@
dolphinscheduler-api
dolphinscheduler-dao
dolphinscheduler-alert
- dolphinscheduler-rpc
dolphinscheduler-dist
+ dolphinscheduler-remote
+ dolphinscheduler-service