Browse Source

[Improvement-4012][common/remote] Json util code integration,remove the remote module json util (#4013)

* Json util code integration,remove the remote module json util.

* update code checkstyle.

* update code checkstyle.

* add code checkstyle.

* add test class.

* update JSONUtils class.

* update JSONUtils class.
pull/3/MERGE
zhuangchong 4 years ago committed by GitHub
parent
commit
4b502e361f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  2. 35
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java
  3. 8
      dolphinscheduler-remote/pom.xml
  4. 28
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java
  5. 13
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java
  6. 27
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
  7. 14
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java
  8. 22
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java
  9. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java
  10. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java
  11. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java
  12. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java
  13. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java
  14. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java
  15. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java
  16. 6
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java
  17. 92
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java
  18. 58
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java
  19. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
  20. 64
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  21. 16
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java
  22. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java
  23. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
  24. 15
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  25. 23
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java
  26. 101
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java
  27. 64
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java
  28. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
  29. 2
      pom.xml

39
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import static java.nio.charset.StandardCharsets.UTF_8;
import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
@ -126,6 +128,22 @@ public class JSONUtils {
return null; return null;
} }
/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
public static <T> T parseObject(byte[] src, Class<T> clazz) {
if (src == null) {
return null;
}
String json = new String(src, UTF_8);
return parseObject(json, clazz);
}
/** /**
* json to list * json to list
* *
@ -253,6 +271,27 @@ public class JSONUtils {
} }
} }
/**
* serialize to json byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] toJsonByteArray(T obj) {
if (obj == null) {
return null;
}
String json = "";
try {
json = toJsonString(obj);
} catch (Exception e) {
logger.error("json serialize exception.", e);
}
return json.getBytes(UTF_8);
}
public static ObjectNode parseObject(String text) { public static ObjectNode parseObject(String text) {
try { try {
return (ObjectNode) objectMapper.readTree(text); return (ObjectNode) objectMapper.readTree(text);

35
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java

@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import com.fasterxml.jackson.databind.JsonNode; import org.apache.dolphinscheduler.common.enums.DataType;
import com.fasterxml.jackson.databind.SerializationFeature; import org.apache.dolphinscheduler.common.enums.Direct;
import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.dolphinscheduler.common.process.Property;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -28,13 +28,15 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class JSONUtilsTest { public class JSONUtilsTest {
@Test @Test
@ -108,9 +110,8 @@ public class JSONUtilsTest {
Assert.assertEquals(Direct.IN, direct); Assert.assertEquals(Direct.IN, direct);
} }
@Test @Test
public void String2MapTest() { public void string2MapTest() {
String str = list2String(); String str = list2String();
List<LinkedHashMap> maps = JSONUtils.toList(str, List<LinkedHashMap> maps = JSONUtils.toList(str,
@ -145,6 +146,18 @@ public class JSONUtilsTest {
Assert.assertNull(JSONUtils.parseObject("foo", String.class)); Assert.assertNull(JSONUtils.parseObject("foo", String.class));
} }
@Test
public void testJsonByteArray() {
String str = "foo";
byte[] serializeByte = JSONUtils.toJsonByteArray(str);
String deserialize = JSONUtils.parseObject(serializeByte, String.class);
Assert.assertEquals(str, deserialize);
str = null;
serializeByte = JSONUtils.toJsonByteArray(str);
deserialize = JSONUtils.parseObject(serializeByte, String.class);
Assert.assertNull(deserialize);
}
@Test @Test
public void testToList() { public void testToList() {
Assert.assertEquals(new ArrayList(), Assert.assertEquals(new ArrayList(),

8
dolphinscheduler-remote/pom.xml

@ -35,6 +35,10 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-common</artifactId>
</dependency>
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
@ -48,10 +52,6 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

28
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java

@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import com.fasterxml.jackson.annotation.JsonFormat; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
/** /**
* execute task request command * execute task request command
*/ */
@ -35,7 +37,7 @@ public class TaskExecuteAckCommand implements Serializable {
/** /**
* startTime * startTime
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date startTime; private Date startTime;
/** /**
@ -111,23 +113,23 @@ public class TaskExecuteAckCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_ACK); command.setType(CommandType.TASK_EXECUTE_ACK);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }
@Override @Override
public String toString() { public String toString() {
return "TaskExecuteAckCommand{" + return "TaskExecuteAckCommand{"
"taskInstanceId=" + taskInstanceId + + "taskInstanceId=" + taskInstanceId
", startTime=" + startTime + + ", startTime=" + startTime
", host='" + host + '\'' + + ", host='" + host + '\''
", status=" + status + + ", status=" + status
", logPath='" + logPath + '\'' + + ", logPath='" + logPath + '\''
", executePath='" + executePath + '\'' + + ", executePath='" + executePath + '\''
'}'; + '}';
} }
} }

13
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable; import java.io.Serializable;
@ -50,18 +51,18 @@ public class TaskExecuteRequestCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST); command.setType(CommandType.TASK_EXECUTE_REQUEST);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }
@Override @Override
public String toString() { public String toString() {
return "TaskExecuteRequestCommand{" + return "TaskExecuteRequestCommand{"
"taskExecutionContext='" + taskExecutionContext + '\'' + + "taskExecutionContext='" + taskExecutionContext + '\''
'}'; + '}';
} }
} }

27
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java

@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import com.fasterxml.jackson.annotation.JsonFormat; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
/** /**
* execute task response command * execute task response command
*/ */
public class TaskExecuteResponseCommand implements Serializable { public class TaskExecuteResponseCommand implements Serializable {
public TaskExecuteResponseCommand() { public TaskExecuteResponseCommand() {
} }
@ -49,7 +50,7 @@ public class TaskExecuteResponseCommand implements Serializable {
/** /**
* end time * end time
*/ */
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date endTime; private Date endTime;
@ -120,22 +121,22 @@ public class TaskExecuteResponseCommand implements Serializable {
* package response command * package response command
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_RESPONSE); command.setType(CommandType.TASK_EXECUTE_RESPONSE);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }
@Override @Override
public String toString() { public String toString() {
return "TaskExecuteResponseCommand{" + return "TaskExecuteResponseCommand{"
"taskInstanceId=" + taskInstanceId + + "taskInstanceId=" + taskInstanceId
", status=" + status + + ", status=" + status
", endTime=" + endTime + + ", endTime=" + endTime
", processId=" + processId + + ", processId=" + processId
", appIds='" + appIds + '\'' + + ", appIds='" + appIds + '\''
'}'; + '}';
} }
} }

14
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable; import java.io.Serializable;
@ -30,7 +31,6 @@ public class TaskKillRequestCommand implements Serializable {
*/ */
private int taskInstanceId; private int taskInstanceId;
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
} }
@ -44,18 +44,18 @@ public class TaskKillRequestCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.TASK_KILL_REQUEST); command.setType(CommandType.TASK_KILL_REQUEST);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }
@Override @Override
public String toString() { public String toString() {
return "TaskKillRequestCommand{" + return "TaskKillRequestCommand{"
"taskInstanceId=" + taskInstanceId + + "taskInstanceId=" + taskInstanceId
'}'; + '}';
} }
} }

22
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.command; package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
@ -52,7 +53,6 @@ public class TaskKillResponseCommand implements Serializable {
*/ */
protected List<String> appIds; protected List<String> appIds;
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
} }
@ -98,22 +98,22 @@ public class TaskKillResponseCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.TASK_KILL_RESPONSE); command.setType(CommandType.TASK_KILL_RESPONSE);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }
@Override @Override
public String toString() { public String toString() {
return "TaskKillResponseCommand{" + return "TaskKillResponseCommand{"
"taskInstanceId=" + taskInstanceId + + "taskInstanceId=" + taskInstanceId
", host='" + host + '\'' + + ", host='" + host + '\''
", status=" + status + + ", status=" + status
", processId=" + processId + + ", processId=" + processId
", appIds=" + appIds + + ", appIds=" + appIds
'}'; + '}';
} }
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -53,10 +53,10 @@ public class GetLogBytesRequestCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.GET_LOG_BYTES_REQUEST); command.setType(CommandType.GET_LOG_BYTES_REQUEST);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -54,10 +54,10 @@ public class GetLogBytesResponseCommand implements Serializable {
* @param opaque request unique identification * @param opaque request unique identification
* @return command * @return command
*/ */
public Command convert2Command(long opaque){ public Command convert2Command(long opaque) {
Command command = new Command(opaque); Command command = new Command(opaque);
command.setType(CommandType.GET_LOG_BYTES_RESPONSE); command.setType(CommandType.GET_LOG_BYTES_RESPONSE);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -53,10 +53,10 @@ public class RemoveTaskLogRequestCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.REMOVE_TAK_LOG_REQUEST); command.setType(CommandType.REMOVE_TAK_LOG_REQUEST);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -53,10 +53,10 @@ public class RemoveTaskLogResponseCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(long opaque){ public Command convert2Command(long opaque) {
Command command = new Command(opaque); Command command = new Command(opaque);
command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE); command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -81,10 +81,10 @@ public class RollViewLogRequestCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); command.setType(CommandType.ROLL_VIEW_LOG_REQUEST);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -54,10 +54,10 @@ public class RollViewLogResponseCommand implements Serializable {
* @param opaque request unique identification * @param opaque request unique identification
* @return command * @return command
*/ */
public Command convert2Command(long opaque){ public Command convert2Command(long opaque) {
Command command = new Command(opaque); Command command = new Command(opaque);
command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE); command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -53,10 +53,10 @@ public class ViewLogRequestCommand implements Serializable {
* *
* @return command * @return command
*/ */
public Command convert2Command(){ public Command convert2Command() {
Command command = new Command(); Command command = new Command();
command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

6
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.remote.command.log; package org.apache.dolphinscheduler.remote.command.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
@ -54,10 +54,10 @@ public class ViewLogResponseCommand implements Serializable {
* @param opaque request unique identification * @param opaque request unique identification
* @return command * @return command
*/ */
public Command convert2Command(long opaque){ public Command convert2Command(long opaque) {
Command command = new Command(opaque); Command command = new Command(opaque);
command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE); command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE);
byte[] body = JsonSerializer.serialize(this); byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body); command.setBody(body);
return command; return command;
} }

92
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java

@ -1,92 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* json serialize or deserialize
*/
public class JsonSerializer {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
private JsonSerializer(){
}
/**
* serialize to byte
*
* @param obj object
* @param <T> object type
* @return byte array
*/
public static <T> byte[] serialize(T obj) {
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
logger.error("serializeToString exception!", e);
}
return json.getBytes(Constants.UTF8);
}
/**
* serialize to string
* @param obj object
* @param <T> object type
* @return string
*/
public static <T> String serializeToString(T obj) {
String json = "";
try {
json = objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
logger.error("serializeToString exception!", e);
}
return json;
}
/**
* deserialize
*
* @param src byte array
* @param clazz class
* @param <T> deserialize type
* @return deserialize type
*/
public static <T> T deserialize(byte[] src, Class<T> clazz) {
String json = new String(src, StandardCharsets.UTF_8);
try {
return objectMapper.readValue(json, clazz);
} catch (IOException e) {
logger.error("deserialize exception!", e);
return null;
}
}
}

58
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java

@ -1,58 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.junit.Assert;
import org.junit.Test;
public class JsonSerializerTest {
@Test
public void testSerialize(){
TestObj testObj = new TestObj();
testObj.setAge(12);
byte[] serializeByte = JsonSerializer.serialize(testObj);
//
TestObj deserialize = JsonSerializer.deserialize(serializeByte, TestObj.class);
Assert.assertEquals(testObj.getAge(), deserialize.getAge());
}
static class TestObj {
private int age;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "TestObj{" +
"age=" + age +
'}';
}
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java

@ -18,9 +18,9 @@
package org.apache.dolphinscheduler.server.entity; package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
@ -475,7 +475,7 @@ public class TaskExecutionContext implements Serializable {
public Command toCommand() { public Command toCommand() {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JsonSerializer.serializeToString(this)); requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(this));
return requestCommand.convert2Command(); return requestCommand.convert2Command();
} }

64
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -14,19 +14,30 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.log; package org.apache.dolphinscheduler.server.log;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.utils.IOUtils; import org.apache.dolphinscheduler.common.utils.IOUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.log.*; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*; import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collections; import java.util.Collections;
@ -38,6 +49,11 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/** /**
* logger request process logic * logger request process logic
*/ */
@ -47,7 +63,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
private final ThreadPoolExecutor executor; private final ThreadPoolExecutor executor;
public LoggerRequestProcessor(){ public LoggerRequestProcessor() {
this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
} }
@ -59,35 +75,35 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
* reuqest task log command type * reuqest task log command type
*/ */
final CommandType commandType = command.getType(); final CommandType commandType = command.getType();
switch (commandType){ switch (commandType) {
case GET_LOG_BYTES_REQUEST: case GET_LOG_BYTES_REQUEST:
GetLogBytesRequestCommand getLogRequest = JsonSerializer.deserialize( GetLogBytesRequestCommand getLogRequest = JSONUtils.parseObject(
command.getBody(), GetLogBytesRequestCommand.class); command.getBody(), GetLogBytesRequestCommand.class);
byte[] bytes = getFileContentBytes(getLogRequest.getPath()); byte[] bytes = getFileContentBytes(getLogRequest.getPath());
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque()));
break; break;
case VIEW_WHOLE_LOG_REQUEST: case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = JsonSerializer.deserialize( ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
command.getBody(), ViewLogRequestCommand.class); command.getBody(), ViewLogRequestCommand.class);
String msg = readWholeFileContent(viewLogRequest.getPath()); String msg = readWholeFileContent(viewLogRequest.getPath());
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque())); channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break; break;
case ROLL_VIEW_LOG_REQUEST: case ROLL_VIEW_LOG_REQUEST:
RollViewLogRequestCommand rollViewLogRequest = JsonSerializer.deserialize( RollViewLogRequestCommand rollViewLogRequest = JSONUtils.parseObject(
command.getBody(), RollViewLogRequestCommand.class); command.getBody(), RollViewLogRequestCommand.class);
List<String> lines = readPartFileContent(rollViewLogRequest.getPath(), List<String> lines = readPartFileContent(rollViewLogRequest.getPath(),
rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit()); rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit());
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
for (String line : lines){ for (String line : lines) {
builder.append(line + "\r\n"); builder.append(line + "\r\n");
} }
RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString()); RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString());
channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque())); channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque()));
break; break;
case REMOVE_TAK_LOG_REQUEST: case REMOVE_TAK_LOG_REQUEST:
RemoveTaskLogRequestCommand removeTaskLogRequest = JsonSerializer.deserialize( RemoveTaskLogRequestCommand removeTaskLogRequest = JSONUtils.parseObject(
command.getBody(), RemoveTaskLogRequestCommand.class); command.getBody(), RemoveTaskLogRequestCommand.class);
String taskLogPath = removeTaskLogRequest.getPath(); String taskLogPath = removeTaskLogRequest.getPath();
@ -95,10 +111,10 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
File taskLogFile = new File(taskLogPath); File taskLogFile = new File(taskLogPath);
Boolean status = true; Boolean status = true;
try { try {
if (taskLogFile.exists()){ if (taskLogFile.exists()) {
status = taskLogFile.delete(); status = taskLogFile.delete();
} }
}catch (Exception e){ } catch (Exception e) {
status = false; status = false;
} }
@ -110,7 +126,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
} }
} }
public ExecutorService getExecutor(){ public ExecutorService getExecutor() {
return this.executor; return this.executor;
} }
@ -121,7 +137,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
* @return byte array of file * @return byte array of file
* @throws Exception exception * @throws Exception exception
*/ */
private byte[] getFileContentBytes(String filePath){ private byte[] getFileContentBytes(String filePath) {
InputStream in = null; InputStream in = null;
ByteArrayOutputStream bos = null; ByteArrayOutputStream bos = null;
try { try {
@ -133,9 +149,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
bos.write(buf, 0, len); bos.write(buf, 0, len);
} }
return bos.toByteArray(); return bos.toByteArray();
}catch (IOException e){ } catch (IOException e) {
logger.error("get file bytes error",e); logger.error("get file bytes error",e);
}finally { } finally {
IOUtils.closeQuietly(bos); IOUtils.closeQuietly(bos);
IOUtils.closeQuietly(in); IOUtils.closeQuietly(in);
} }
@ -152,7 +168,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
*/ */
private List<String> readPartFileContent(String filePath, private List<String> readPartFileContent(String filePath,
int skipLine, int skipLine,
int limit){ int limit) {
try (Stream<String> stream = Files.lines(Paths.get(filePath))) { try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); return stream.skip(skipLine).limit(limit).collect(Collectors.toList());
} catch (IOException e) { } catch (IOException e) {
@ -167,19 +183,19 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
* @param filePath file path * @param filePath file path
* @return whole file content * @return whole file content
*/ */
private String readWholeFileContent(String filePath){ private String readWholeFileContent(String filePath) {
BufferedReader br = null; BufferedReader br = null;
String line; String line;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
try { try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));
while ((line = br.readLine()) != null){ while ((line = br.readLine()) != null) {
sb.append(line + "\r\n"); sb.append(line + "\r\n");
} }
return sb.toString(); return sb.toString();
}catch (IOException e){ } catch (IOException e) {
logger.error("read file error",e); logger.error("read file error",e);
}finally { } finally {
IOUtils.closeQuietly(br); IOUtils.closeQuietly(br);
} }
return ""; return "";

16
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java

@ -17,10 +17,12 @@
package org.apache.dolphinscheduler.server.master.processor; package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
@ -28,17 +30,17 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*; import io.netty.channel.Channel;
/** /**
* task ack processor * task ack processor
@ -63,7 +65,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/ */
private ProcessService processService; private ProcessService processService;
public TaskAckProcessor(){ public TaskAckProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
@ -77,7 +79,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteAckCommand taskAckCommand = JsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class); TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand); logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
@ -96,10 +98,10 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){ while (Stopper.isRunning()) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId());
if (taskInstance != null && ackStatus.typeIsRunning()){ if (taskInstance != null && ackStatus.typeIsRunning()) {
break; break;
} }
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(SLEEP_TIME_MILLIS);

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java

@ -17,16 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor; package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/** /**
* task response processor * task response processor
*/ */
@ -45,9 +47,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillResponseCommand responseCommand = JsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class); TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
logger.info("received task kill response command : {}", responseCommand); logger.info("received task kill response command : {}", responseCommand);
} }
} }

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java

@ -17,27 +17,29 @@
package org.apache.dolphinscheduler.server.master.processor; package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.Constants.*; import io.netty.channel.Channel;
/** /**
* task response processor * task response processor
@ -61,7 +63,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/ */
private ProcessService processService; private ProcessService processService;
public TaskResponseProcessor(){ public TaskResponseProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
@ -78,7 +80,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseCommand responseCommand = JsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class); TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand); logger.info("received command : {}", responseCommand);
taskInstanceCacheManager.cacheTaskInstance(responseCommand); taskInstanceCacheManager.cacheTaskInstance(responseCommand);
@ -95,15 +97,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
taskResponseService.addResponse(taskResponseEvent); taskResponseService.addResponse(taskResponseEvent);
while (Stopper.isRunning()){ while (Stopper.isRunning()) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
if (taskInstance != null && responseStatus.typeIsFinished()){ if (taskInstance != null && responseStatus.typeIsFinished()) {
break; break;
} }
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(SLEEP_TIME_MILLIS);
} }
} }
} }

15
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@ -73,7 +72,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
*/ */
private final TaskCallbackService taskCallbackService; private final TaskCallbackService taskCallbackService;
public TaskExecuteProcessor(){ public TaskExecuteProcessor() {
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
@ -84,12 +83,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType())); String.format("invalid command type : %s", command.getType()));
TaskExecuteRequestCommand taskRequestCommand = JsonSerializer.deserialize( TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRequestCommand.class); command.getBody(), TaskExecuteRequestCommand.class);
logger.info("received command : {}", taskRequestCommand); logger.info("received command : {}", taskRequestCommand);
if(taskRequestCommand == null){ if (taskRequestCommand == null) {
logger.error("task execute request command is null"); logger.error("task execute request command is null");
return; return;
} }
@ -97,7 +96,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
String contextJson = taskRequestCommand.getTaskExecutionContext(); String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
if(taskExecutionContext == null){ if (taskExecutionContext == null) {
logger.error("task execution context is null"); logger.error("task execution context is null");
return; return;
} }
@ -162,9 +161,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime()); ackCommand.setStartTime(taskExecutionContext.getStartTime());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) {
ackCommand.setExecutePath(null); ackCommand.setExecutePath(null);
}else{ } else {
ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
} }
taskExecutionContext.setLogPath(ackCommand.getLogPath()); taskExecutionContext.setLogPath(ackCommand.getLogPath());
@ -176,7 +175,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* @param taskExecutionContext taskExecutionContext * @param taskExecutionContext taskExecutionContext
* @return execute local path * @return execute local path
*/ */
private String getExecLocalPath(TaskExecutionContext taskExecutionContext){ private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(),
taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessDefineId(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),

23
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.Preconditions;
@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -39,12 +38,15 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
/** /**
* task kill processor * task kill processor
*/ */
@ -67,8 +69,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
*/ */
private TaskExecutionContextCacheManager taskExecutionContextCacheManager; private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
public TaskKillProcessor() {
public TaskKillProcessor(){
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
@ -83,7 +84,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class); TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
logger.info("received kill command : {}", killCommand); logger.info("received kill command : {}", killCommand);
Pair<Boolean, List<String>> result = doKill(killCommand); Pair<Boolean, List<String>> result = doKill(killCommand);
@ -101,14 +102,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param killCommand * @param killCommand
* @return kill result * @return kill result
*/ */
private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand){ private Pair<Boolean, List<String>> doKill(TaskKillRequestCommand killCommand) {
List<String> appIds = Collections.EMPTY_LIST; List<String> appIds = Collections.EMPTY_LIST;
try { try {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
Integer processId = taskExecutionContext.getProcessId(); Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)){ if (processId == null || processId.equals(0)) {
logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId()); logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
return Pair.of(false, appIds); return Pair.of(false, appIds);
} }
@ -145,7 +146,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
taskKillResponseCommand.setAppIds(result.getRight()); taskKillResponseCommand.setAppIds(result.getRight());
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
if(taskExecutionContext != null){ if (taskExecutionContext != null) {
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost()); taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
@ -183,7 +184,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
} catch (Exception e) { } catch (Exception e) {
logger.error("kill yarn job error",e); logger.error("kill yarn job error",e);
} finally { } finally {
if(logClient != null){ if (logClient != null) {
logClient.close(); logClient.close();
} }
} }

101
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.net.InetSocketAddress;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
* task ack processor test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class})
public class TaskAckProcessorTest {
private TaskAckProcessor taskAckProcessor;
private TaskResponseService taskResponseService;
private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
private ProcessService processService;
private TaskExecuteAckCommand taskExecuteAckCommand;
private TaskResponseEvent taskResponseEvent;
private Channel channel;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
taskResponseService = PowerMockito.mock(TaskResponseService.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager);
processService = PowerMockito.mock(ProcessService.class);
PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskAckProcessor = new TaskAckProcessor();
channel = PowerMockito.mock(Channel.class);
taskResponseEvent = PowerMockito.mock(TaskResponseEvent.class);
taskExecuteAckCommand = new TaskExecuteAckCommand();
taskExecuteAckCommand.setStatus(1);
taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
taskExecuteAckCommand.setHost("localhost");
taskExecuteAckCommand.setLogPath("/temp/worker.log");
taskExecuteAckCommand.setStartTime(new Date());
taskExecuteAckCommand.setTaskInstanceId(1);
}
@Test
public void testProcess() {
Command command = taskExecuteAckCommand.convert2Command();
Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType());
InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345);
PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress);
PowerMockito.mockStatic(TaskResponseEvent.class);
PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt()))
.thenReturn(taskResponseEvent);
TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class);
PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
taskAckProcessor.process(channel,command);
}
}

64
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.mockito.PowerMockito;
import io.netty.channel.Channel;
/**
* task response processor test
*/
public class TaskKillResponseProcessorTest {
private TaskKillResponseProcessor taskKillResponseProcessor;
private TaskKillResponseCommand taskKillResponseCommand;
private Channel channel;
@Before
public void before() {
taskKillResponseProcessor = new TaskKillResponseProcessor();
channel = PowerMockito.mock(Channel.class);
taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setAppIds(
new ArrayList<String>() {{ add("task_1"); }});
taskKillResponseCommand.setHost("localhost");
taskKillResponseCommand.setProcessId(1);
taskKillResponseCommand.setStatus(1);
taskKillResponseCommand.setTaskInstanceId(1);
}
@Test
public void testProcess() {
Command command = taskKillResponseCommand.convert2Command();
Assert.assertEquals(CommandType.TASK_KILL_RESPONSE,command.getType());
taskKillResponseProcessor.process(channel,command);
}
}

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java

@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.service.log; package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@ -28,12 +30,10 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* log client * log client
*/ */
@ -90,7 +90,7 @@ public class LogClientService {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
RollViewLogResponseCommand rollReviewLog = JsonSerializer.deserialize( RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(
response.getBody(), RollViewLogResponseCommand.class); response.getBody(), RollViewLogResponseCommand.class);
return rollReviewLog.getMsg(); return rollReviewLog.getMsg();
} }
@ -119,7 +119,7 @@ public class LogClientService {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
ViewLogResponseCommand viewLog = JsonSerializer.deserialize( ViewLogResponseCommand viewLog = JSONUtils.parseObject(
response.getBody(), ViewLogResponseCommand.class); response.getBody(), ViewLogResponseCommand.class);
return viewLog.getMsg(); return viewLog.getMsg();
} }
@ -148,7 +148,7 @@ public class LogClientService {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
GetLogBytesResponseCommand getLog = JsonSerializer.deserialize( GetLogBytesResponseCommand getLog = JSONUtils.parseObject(
response.getBody(), GetLogBytesResponseCommand.class); response.getBody(), GetLogBytesResponseCommand.class);
return getLog.getData(); return getLog.getData();
} }
@ -160,7 +160,6 @@ public class LogClientService {
return result; return result;
} }
/** /**
* remove task log * remove task log
* *
@ -178,7 +177,7 @@ public class LogClientService {
Command command = request.convert2Command(); Command command = request.convert2Command();
Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
if (response != null) { if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize( RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject(
response.getBody(), RemoveTaskLogResponseCommand.class); response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus(); return taskLogResponse.getStatus();
} }

2
pom.xml

@ -831,6 +831,8 @@
<include>**/server/master/MasterExecThreadTest.java</include> <include>**/server/master/MasterExecThreadTest.java</include>
<include>**/server/master/ParamsTest.java</include> <include>**/server/master/ParamsTest.java</include>
<include>**/server/master/SubProcessTaskTest.java</include> <include>**/server/master/SubProcessTaskTest.java</include>
<include>**/server/master/processor/TaskAckProcessorTest.java</include>
<include>**/server/master/processor/TaskKillResponseProcessorTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include> <include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include> <include>**/server/utils/ExecutionContextTestUtils.java</include>

Loading…
Cancel
Save