diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java deleted file mode 100644 index f90d3fff18..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Ping.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.command; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -import java.io.Serializable; - -/** - * ping machine - */ -public class Ping implements Serializable { - - /** - * ping body - */ - protected static final ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; - - /** - * request command body - */ - private static final byte[] EMPTY_BODY_ARRAY = new byte[0]; - - private static final ByteBuf PING_BUF; - - static { - ByteBuf ping = Unpooled.buffer(); - ping.writeByte(Command.MAGIC); - ping.writeByte(CommandType.PING.ordinal()); - ping.writeLong(0); - ping.writeInt(0); - ping.writeBytes(EMPTY_BODY); - PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); - } - - /** - * ping content - * @return result - */ - public static ByteBuf pingContent(){ - return PING_BUF.duplicate(); - } - - /** - * create ping command - * - * @return command - */ - public static Command create(){ - Command command = new Command(); - command.setType(CommandType.PING); - command.setBody(EMPTY_BODY_ARRAY); - return command; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java deleted file mode 100644 index 5ed762ff3a..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/Pong.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.command; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -import java.io.Serializable; - -/** - * Pong return after ping - */ -public class Pong implements Serializable { - - /** - * pong body - */ - protected static final ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; - - /** - * pong command body - */ - private static final byte[] EMPTY_BODY_ARRAY = new byte[0]; - - /** - * pong byte buffer - */ - private static final ByteBuf PONG_BUF; - - static { - ByteBuf ping = Unpooled.buffer(); - ping.writeByte(Command.MAGIC); - ping.writeByte(CommandType.PONG.ordinal()); - ping.writeLong(0); - ping.writeInt(0); - ping.writeBytes(EMPTY_BODY); - PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); - } - - /** - * pong content - * @return result - */ - public static ByteBuf pongContent(){ - return PONG_BUF.duplicate(); - } - - /** - * package pong command - * - * @param opaque request unique identification - * @return command - */ - public static Command create(long opaque){ - Command command = new Command(opaque); - command.setType(CommandType.PONG); - command.setBody(EMPTY_BODY_ARRAY); - return command; - } -} diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java deleted file mode 100644 index 1adf5a80ca..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskInfo.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote.command; - -import com.fasterxml.jackson.annotation.JsonFormat; - -import java.io.Serializable; -import java.util.Date; - -/** - * master/worker task transport - */ -public class TaskInfo implements Serializable{ - - /** - * task instance id - */ - private Integer taskId; - - - /** - * task name - */ - private String taskName; - - /** - * task start time - */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") - private Date startTime; - - /** - * task type - */ - private String taskType; - - /** - * task execute path - */ - private String executePath; - - /** - * task json - */ - private String taskJson; - - - /** - * process instance id - */ - private Integer processInstanceId; - - - /** - * process instance schedule time - */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") - private Date scheduleTime; - - /** - * process instance global parameters - */ - private String globalParams; - - - /** - * execute user id - */ - private Integer executorId; - - - /** - * command type if complement - */ - private Integer cmdTypeIfComplement; - - - /** - * tenant code - */ - private String tenantCode; - - /** - * task queue - */ - private String queue; - - - /** - * process define id - */ - private Integer processDefineId; - - /** - * project id - */ - private Integer projectId; - - public Integer getTaskId() { - return taskId; - } - - public void setTaskId(Integer taskId) { - this.taskId = taskId; - } - - public String getTaskName() { - return taskName; - } - - public void setTaskName(String taskName) { - this.taskName = taskName; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public String getTaskType() { - return taskType; - } - - public void setTaskType(String taskType) { - this.taskType = taskType; - } - - public String getExecutePath() { - return executePath; - } - - public void setExecutePath(String executePath) { - this.executePath = executePath; - } - - public String getTaskJson() { - return taskJson; - } - - public void setTaskJson(String taskJson) { - this.taskJson = taskJson; - } - - public Integer getProcessInstanceId() { - return processInstanceId; - } - - public void setProcessInstanceId(Integer processInstanceId) { - this.processInstanceId = processInstanceId; - } - - public Date getScheduleTime() { - return scheduleTime; - } - - public void setScheduleTime(Date scheduleTime) { - this.scheduleTime = scheduleTime; - } - - public String getGlobalParams() { - return globalParams; - } - - public void setGlobalParams(String globalParams) { - this.globalParams = globalParams; - } - - public String getTenantCode() { - return tenantCode; - } - - public void setTenantCode(String tenantCode) { - this.tenantCode = tenantCode; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public Integer getProcessDefineId() { - return processDefineId; - } - - public void setProcessDefineId(Integer processDefineId) { - this.processDefineId = processDefineId; - } - - public Integer getProjectId() { - return projectId; - } - - public void setProjectId(Integer projectId) { - this.projectId = projectId; - } - - public Integer getExecutorId() { - return executorId; - } - - public void setExecutorId(Integer executorId) { - this.executorId = executorId; - } - - public Integer getCmdTypeIfComplement() { - return cmdTypeIfComplement; - } - - public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) { - this.cmdTypeIfComplement = cmdTypeIfComplement; - } - - @Override - public String toString() { - return "TaskInfo{" + - "taskId=" + taskId + - ", taskName='" + taskName + '\'' + - ", startTime=" + startTime + - ", taskType='" + taskType + '\'' + - ", executePath='" + executePath + '\'' + - ", taskJson='" + taskJson + '\'' + - ", processInstanceId=" + processInstanceId + - ", scheduleTime=" + scheduleTime + - ", globalParams='" + globalParams + '\'' + - ", executorId=" + executorId + - ", cmdTypeIfComplement=" + cmdTypeIfComplement + - ", tenantCode='" + tenantCode + '\'' + - ", queue='" + queue + '\'' + - ", processDefineId=" + processDefineId + - ", projectId=" + projectId + - '}'; - } -} diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index a3f6c7b582..5de0a0e502 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.remote; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.Ping; -import org.apache.dolphinscheduler.remote.command.Pong; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.future.InvokeCallback; @@ -28,12 +26,15 @@ import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Host; +import java.io.Serializable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; import org.junit.Assert; import org.junit.Test; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; /** @@ -109,4 +110,100 @@ public class NettyRemotingClientTest { server.close(); client.close(); } + + private static class Ping implements Serializable { + + /** + * ping body + */ + protected static final ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; + + /** + * request command body + */ + private static final byte[] EMPTY_BODY_ARRAY = new byte[0]; + + private static final ByteBuf PING_BUF; + + static { + ByteBuf ping = Unpooled.buffer(); + ping.writeByte(Command.MAGIC); + ping.writeByte(CommandType.PING.ordinal()); + ping.writeLong(0); + ping.writeInt(0); + ping.writeBytes(EMPTY_BODY); + PING_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); + } + + /** + * ping content + * + * @return result + */ + public static ByteBuf pingContent() { + return PING_BUF.duplicate(); + } + + /** + * create ping command + * + * @return command + */ + public static Command create() { + Command command = new Command(); + command.setType(CommandType.PING); + command.setBody(EMPTY_BODY_ARRAY); + return command; + } + } + + private static class Pong implements Serializable { + + /** + * pong body + */ + protected static final ByteBuf EMPTY_BODY = Unpooled.EMPTY_BUFFER; + + /** + * pong command body + */ + private static final byte[] EMPTY_BODY_ARRAY = new byte[0]; + + /** + * pong byte buffer + */ + private static final ByteBuf PONG_BUF; + + static { + ByteBuf ping = Unpooled.buffer(); + ping.writeByte(Command.MAGIC); + ping.writeByte(CommandType.PONG.ordinal()); + ping.writeLong(0); + ping.writeInt(0); + ping.writeBytes(EMPTY_BODY); + PONG_BUF = Unpooled.unreleasableBuffer(ping).asReadOnly(); + } + + /** + * pong content + * + * @return result + */ + public static ByteBuf pongContent() { + return PONG_BUF.duplicate(); + } + + /** + * package pong command + * + * @param opaque request unique identification + * @return command + */ + public static Command create(long opaque) { + Command command = new Command(opaque); + command.setType(CommandType.PONG); + command.setBody(EMPTY_BODY_ARRAY); + return command; + } + } }