diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index 56ef74d6ee..fc11a2add2 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.common.utils; +import static java.nio.charset.StandardCharsets.UTF_8; + import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; @@ -126,6 +128,22 @@ public class JSONUtils { return null; } + /** + * deserialize + * + * @param src byte array + * @param clazz class + * @param deserialize type + * @return deserialize type + */ + public static T parseObject(byte[] src, Class clazz) { + if (src == null) { + return null; + } + String json = new String(src, UTF_8); + return parseObject(json, clazz); + } + /** * json to list * @@ -253,6 +271,27 @@ public class JSONUtils { } } + /** + * serialize to json byte + * + * @param obj object + * @param object type + * @return byte array + */ + public static byte[] toJsonByteArray(T obj) { + if (obj == null) { + return null; + } + String json = ""; + try { + json = toJsonString(obj); + } catch (Exception e) { + logger.error("json serialize exception.", e); + } + + return json.getBytes(UTF_8); + } + public static ObjectNode parseObject(String text) { try { return (ObjectNode) objectMapper.readTree(text); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java index e273496f56..af12d5a625 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.utils; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.dolphinscheduler.common.enums.DataType; +import org.apache.dolphinscheduler.common.enums.Direct; +import org.apache.dolphinscheduler.common.model.TaskNode; +import org.apache.dolphinscheduler.common.process.Property; import java.util.ArrayList; import java.util.HashMap; @@ -28,13 +28,15 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import org.apache.dolphinscheduler.common.enums.DataType; -import org.apache.dolphinscheduler.common.enums.Direct; -import org.apache.dolphinscheduler.common.model.TaskNode; -import org.apache.dolphinscheduler.common.process.Property; import org.junit.Assert; import org.junit.Test; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class JSONUtilsTest { @Test @@ -108,9 +110,8 @@ public class JSONUtilsTest { Assert.assertEquals(Direct.IN, direct); } - @Test - public void String2MapTest() { + public void string2MapTest() { String str = list2String(); List maps = JSONUtils.toList(str, @@ -145,6 +146,18 @@ public class JSONUtilsTest { Assert.assertNull(JSONUtils.parseObject("foo", String.class)); } + @Test + public void testJsonByteArray() { + String str = "foo"; + byte[] serializeByte = JSONUtils.toJsonByteArray(str); + String deserialize = JSONUtils.parseObject(serializeByte, String.class); + Assert.assertEquals(str, deserialize); + str = null; + serializeByte = JSONUtils.toJsonByteArray(str); + deserialize = JSONUtils.parseObject(serializeByte, String.class); + Assert.assertNull(deserialize); + } + @Test public void testToList() { Assert.assertEquals(new ArrayList(), diff --git a/dolphinscheduler-remote/pom.xml b/dolphinscheduler-remote/pom.xml index 4d398f3069..4af03a362f 100644 --- a/dolphinscheduler-remote/pom.xml +++ b/dolphinscheduler-remote/pom.xml @@ -35,6 +35,10 @@ + + org.apache.dolphinscheduler + dolphinscheduler-common + io.netty netty-all @@ -48,10 +52,6 @@ junit test - - com.fasterxml.jackson.core - jackson-databind - diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java index 135c14975b..2fc70f1fbc 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.command; -import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; + /** * execute task request command */ @@ -35,7 +37,7 @@ public class TaskExecuteAckCommand implements Serializable { /** * startTime */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date startTime; /** @@ -111,23 +113,23 @@ public class TaskExecuteAckCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_ACK); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } @Override public String toString() { - return "TaskExecuteAckCommand{" + - "taskInstanceId=" + taskInstanceId + - ", startTime=" + startTime + - ", host='" + host + '\'' + - ", status=" + status + - ", logPath='" + logPath + '\'' + - ", executePath='" + executePath + '\'' + - '}'; + return "TaskExecuteAckCommand{" + + "taskInstanceId=" + taskInstanceId + + ", startTime=" + startTime + + ", host='" + host + '\'' + + ", status=" + status + + ", logPath='" + logPath + '\'' + + ", executePath='" + executePath + '\'' + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java index 4ae28e3ca5..5b2e33922c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.command; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; @@ -50,18 +51,18 @@ public class TaskExecuteRequestCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } @Override public String toString() { - return "TaskExecuteRequestCommand{" + - "taskExecutionContext='" + taskExecutionContext + '\'' + - '}'; + return "TaskExecuteRequestCommand{" + + "taskExecutionContext='" + taskExecutionContext + '\'' + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index 7f6ee668a8..de5b82c729 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -14,20 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.command; -import com.fasterxml.jackson.annotation.JsonFormat; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; + /** * execute task response command */ public class TaskExecuteResponseCommand implements Serializable { - public TaskExecuteResponseCommand() { } @@ -49,7 +50,7 @@ public class TaskExecuteResponseCommand implements Serializable { /** * end time */ - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date endTime; @@ -120,22 +121,22 @@ public class TaskExecuteResponseCommand implements Serializable { * package response command * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_RESPONSE); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } @Override public String toString() { - return "TaskExecuteResponseCommand{" + - "taskInstanceId=" + taskInstanceId + - ", status=" + status + - ", endTime=" + endTime + - ", processId=" + processId + - ", appIds='" + appIds + '\'' + - '}'; + return "TaskExecuteResponseCommand{" + + "taskInstanceId=" + taskInstanceId + + ", status=" + status + + ", endTime=" + endTime + + ", processId=" + processId + + ", appIds='" + appIds + '\'' + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java index 4c0830b7cf..155b31785e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.command; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; @@ -30,7 +31,6 @@ public class TaskKillRequestCommand implements Serializable { */ private int taskInstanceId; - public int getTaskInstanceId() { return taskInstanceId; } @@ -44,18 +44,18 @@ public class TaskKillRequestCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } @Override public String toString() { - return "TaskKillRequestCommand{" + - "taskInstanceId=" + taskInstanceId + - '}'; + return "TaskKillRequestCommand{" + + "taskInstanceId=" + taskInstanceId + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index 4b48c1ef4f..f77221d2e5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.command; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; import java.util.List; @@ -52,7 +53,6 @@ public class TaskKillResponseCommand implements Serializable { */ protected List appIds; - public int getTaskInstanceId() { return taskInstanceId; } @@ -98,22 +98,22 @@ public class TaskKillResponseCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } @Override public String toString() { - return "TaskKillResponseCommand{" + - "taskInstanceId=" + taskInstanceId + - ", host='" + host + '\'' + - ", status=" + status + - ", processId=" + processId + - ", appIds=" + appIds + - '}'; + return "TaskKillResponseCommand{" + + "taskInstanceId=" + taskInstanceId + + ", host='" + host + '\'' + + ", status=" + status + + ", processId=" + processId + + ", appIds=" + appIds + + '}'; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java index e4b21e2f89..ef71e07cde 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesRequestCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -53,10 +53,10 @@ public class GetLogBytesRequestCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.GET_LOG_BYTES_REQUEST); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java index 349ec03855..e8e3eb2a10 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetLogBytesResponseCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -54,10 +54,10 @@ public class GetLogBytesResponseCommand implements Serializable { * @param opaque request unique identification * @return command */ - public Command convert2Command(long opaque){ + public Command convert2Command(long opaque) { Command command = new Command(opaque); command.setType(CommandType.GET_LOG_BYTES_RESPONSE); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java index a91cb2add0..c5960d69f2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -53,10 +53,10 @@ public class RemoveTaskLogRequestCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.REMOVE_TAK_LOG_REQUEST); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java index 39e8672127..6883ece815 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -53,10 +53,10 @@ public class RemoveTaskLogResponseCommand implements Serializable { * * @return command */ - public Command convert2Command(long opaque){ + public Command convert2Command(long opaque) { Command command = new Command(opaque); command.setType(CommandType.REMOVE_TAK_LOG_RESPONSE); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java index 00129c7e78..4afee09e6d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogRequestCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -81,10 +81,10 @@ public class RollViewLogRequestCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.ROLL_VIEW_LOG_REQUEST); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java index a4f4f86c9b..0e9e44a87b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/RollViewLogResponseCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -54,10 +54,10 @@ public class RollViewLogResponseCommand implements Serializable { * @param opaque request unique identification * @return command */ - public Command convert2Command(long opaque){ + public Command convert2Command(long opaque) { Command command = new Command(opaque); command.setType(CommandType.ROLL_VIEW_LOG_RESPONSE); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java index 1d51653eac..e8094690dd 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogRequestCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -53,10 +53,10 @@ public class ViewLogRequestCommand implements Serializable { * * @return command */ - public Command convert2Command(){ + public Command convert2Command() { Command command = new Command(); command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java index 6940104f71..33e263087c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/ViewLogResponseCommand.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.remote.command.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; @@ -54,10 +54,10 @@ public class ViewLogResponseCommand implements Serializable { * @param opaque request unique identification * @return command */ - public Command convert2Command(long opaque){ + public Command convert2Command(long opaque) { Command command = new Command(opaque); command.setType(CommandType.VIEW_WHOLE_LOG_RESPONSE); - byte[] body = JsonSerializer.serialize(this); + byte[] body = JSONUtils.toJsonByteArray(this); command.setBody(body); return command; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java deleted file mode 100644 index 0c05232dd1..0000000000 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/JsonSerializer.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.remote.utils; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -/** - * json serialize or deserialize - */ -public class JsonSerializer { - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class); - - private JsonSerializer(){ - - } - /** - * serialize to byte - * - * @param obj object - * @param object type - * @return byte array - */ - public static byte[] serialize(T obj) { - String json = ""; - try { - json = objectMapper.writeValueAsString(obj); - } catch (JsonProcessingException e) { - logger.error("serializeToString exception!", e); - } - - return json.getBytes(Constants.UTF8); - } - - /** - * serialize to string - * @param obj object - * @param object type - * @return string - */ - public static String serializeToString(T obj) { - String json = ""; - try { - json = objectMapper.writeValueAsString(obj); - } catch (JsonProcessingException e) { - logger.error("serializeToString exception!", e); - } - - return json; - } - - /** - * deserialize - * - * @param src byte array - * @param clazz class - * @param deserialize type - * @return deserialize type - */ - public static T deserialize(byte[] src, Class clazz) { - - String json = new String(src, StandardCharsets.UTF_8); - try { - return objectMapper.readValue(json, clazz); - } catch (IOException e) { - logger.error("deserialize exception!", e); - return null; - } - - } - -} diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java deleted file mode 100644 index cb92db7f25..0000000000 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/JsonSerializerTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.remote; - - -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; -import org.junit.Assert; -import org.junit.Test; - -public class JsonSerializerTest { - - @Test - public void testSerialize(){ - TestObj testObj = new TestObj(); - testObj.setAge(12); - byte[] serializeByte = JsonSerializer.serialize(testObj); - - // - TestObj deserialize = JsonSerializer.deserialize(serializeByte, TestObj.class); - - Assert.assertEquals(testObj.getAge(), deserialize.getAge()); - } - - static class TestObj { - - private int age; - - public int getAge() { - return age; - } - - public void setAge(int age) { - this.age = age; - } - - @Override - public String toString() { - return "TestObj{" + - "age=" + age + - '}'; - } - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 1589c365c2..7d28a77938 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -18,9 +18,9 @@ package org.apache.dolphinscheduler.server.entity; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import java.io.Serializable; import java.util.Date; @@ -475,7 +475,7 @@ public class TaskExecutionContext implements Serializable { public Command toCommand() { TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); - requestCommand.setTaskExecutionContext(JsonSerializer.serializeToString(this)); + requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(this)); return requestCommand.convert2Command(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index 458afa63b3..b31785c15e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -14,19 +14,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.log; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.utils.IOUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.log.*; +import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand; +import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; +import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.*; +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; @@ -38,6 +49,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + /** * logger request process logic */ @@ -47,7 +63,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { private final ThreadPoolExecutor executor; - public LoggerRequestProcessor(){ + public LoggerRequestProcessor() { this.executor = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); } @@ -59,35 +75,35 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { * reuqest task log command type */ final CommandType commandType = command.getType(); - switch (commandType){ + switch (commandType) { case GET_LOG_BYTES_REQUEST: - GetLogBytesRequestCommand getLogRequest = JsonSerializer.deserialize( + GetLogBytesRequestCommand getLogRequest = JSONUtils.parseObject( command.getBody(), GetLogBytesRequestCommand.class); byte[] bytes = getFileContentBytes(getLogRequest.getPath()); GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); channel.writeAndFlush(getLogResponse.convert2Command(command.getOpaque())); break; case VIEW_WHOLE_LOG_REQUEST: - ViewLogRequestCommand viewLogRequest = JsonSerializer.deserialize( + ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject( command.getBody(), ViewLogRequestCommand.class); String msg = readWholeFileContent(viewLogRequest.getPath()); ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque())); break; case ROLL_VIEW_LOG_REQUEST: - RollViewLogRequestCommand rollViewLogRequest = JsonSerializer.deserialize( + RollViewLogRequestCommand rollViewLogRequest = JSONUtils.parseObject( command.getBody(), RollViewLogRequestCommand.class); List lines = readPartFileContent(rollViewLogRequest.getPath(), rollViewLogRequest.getSkipLineNum(), rollViewLogRequest.getLimit()); StringBuilder builder = new StringBuilder(); - for (String line : lines){ + for (String line : lines) { builder.append(line + "\r\n"); } RollViewLogResponseCommand rollViewLogRequestResponse = new RollViewLogResponseCommand(builder.toString()); channel.writeAndFlush(rollViewLogRequestResponse.convert2Command(command.getOpaque())); break; case REMOVE_TAK_LOG_REQUEST: - RemoveTaskLogRequestCommand removeTaskLogRequest = JsonSerializer.deserialize( + RemoveTaskLogRequestCommand removeTaskLogRequest = JSONUtils.parseObject( command.getBody(), RemoveTaskLogRequestCommand.class); String taskLogPath = removeTaskLogRequest.getPath(); @@ -95,10 +111,10 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { File taskLogFile = new File(taskLogPath); Boolean status = true; try { - if (taskLogFile.exists()){ + if (taskLogFile.exists()) { status = taskLogFile.delete(); } - }catch (Exception e){ + } catch (Exception e) { status = false; } @@ -110,7 +126,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { } } - public ExecutorService getExecutor(){ + public ExecutorService getExecutor() { return this.executor; } @@ -121,7 +137,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { * @return byte array of file * @throws Exception exception */ - private byte[] getFileContentBytes(String filePath){ + private byte[] getFileContentBytes(String filePath) { InputStream in = null; ByteArrayOutputStream bos = null; try { @@ -133,9 +149,9 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { bos.write(buf, 0, len); } return bos.toByteArray(); - }catch (IOException e){ + } catch (IOException e) { logger.error("get file bytes error",e); - }finally { + } finally { IOUtils.closeQuietly(bos); IOUtils.closeQuietly(in); } @@ -152,7 +168,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { */ private List readPartFileContent(String filePath, int skipLine, - int limit){ + int limit) { try (Stream stream = Files.lines(Paths.get(filePath))) { return stream.skip(skipLine).limit(limit).collect(Collectors.toList()); } catch (IOException e) { @@ -167,19 +183,19 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { * @param filePath file path * @return whole file content */ - private String readWholeFileContent(String filePath){ + private String readWholeFileContent(String filePath) { BufferedReader br = null; String line; StringBuilder sb = new StringBuilder(); try { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); - while ((line = br.readLine()) != null){ + while ((line = br.readLine()) != null) { sb.append(line + "\r\n"); } return sb.toString(); - }catch (IOException e){ + } catch (IOException e) { logger.error("read file error",e); - }finally { + } finally { IOUtils.closeQuietly(br); } return ""; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index 0f038dd6ee..0b4d26d1ce 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -17,10 +17,12 @@ package org.apache.dolphinscheduler.server.master.processor; -import io.netty.channel.Channel; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; + import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; @@ -28,17 +30,17 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; +import io.netty.channel.Channel; /** * task ack processor @@ -63,7 +65,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { */ private ProcessService processService; - public TaskAckProcessor(){ + public TaskAckProcessor() { this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.processService = SpringApplicationContext.getBean(ProcessService.class); @@ -77,7 +79,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteAckCommand taskAckCommand = JsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class); + TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class); logger.info("taskAckCommand : {}", taskAckCommand); taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); @@ -96,10 +98,10 @@ public class TaskAckProcessor implements NettyRequestProcessor { taskResponseService.addResponse(taskResponseEvent); - while (Stopper.isRunning()){ + while (Stopper.isRunning()) { TaskInstance taskInstance = processService.findTaskInstanceById(taskAckCommand.getTaskInstanceId()); - if (taskInstance != null && ackStatus.typeIsRunning()){ + if (taskInstance != null && ackStatus.typeIsRunning()) { break; } ThreadUtils.sleep(SLEEP_TIME_MILLIS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index 2e51998cbd..afd0577d87 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -17,16 +17,18 @@ package org.apache.dolphinscheduler.server.master.processor; -import io.netty.channel.Channel; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.channel.Channel; + /** * task response processor */ @@ -45,9 +47,8 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskKillResponseCommand responseCommand = JsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class); + TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class); logger.info("received task kill response command : {}", responseCommand); } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index 2633ccd634..ffe3d5d9a8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -17,27 +17,29 @@ package org.apache.dolphinscheduler.server.master.processor; -import io.netty.channel.Channel; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; + import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.dolphinscheduler.common.Constants.*; +import io.netty.channel.Channel; /** * task response processor @@ -61,7 +63,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ private ProcessService processService; - public TaskResponseProcessor(){ + public TaskResponseProcessor() { this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); this.processService = SpringApplicationContext.getBean(ProcessService.class); @@ -78,7 +80,7 @@ public class TaskResponseProcessor implements NettyRequestProcessor { public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteResponseCommand responseCommand = JsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class); + TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class); logger.info("received command : {}", responseCommand); taskInstanceCacheManager.cacheTaskInstance(responseCommand); @@ -95,15 +97,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor { taskResponseService.addResponse(taskResponseEvent); - while (Stopper.isRunning()){ + while (Stopper.isRunning()) { TaskInstance taskInstance = processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId()); - if (taskInstance != null && responseStatus.typeIsFinished()){ + if (taskInstance != null && responseStatus.typeIsFinished()) { break; } ThreadUtils.sleep(SLEEP_TIME_MILLIS); } } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 3717ce37ae..13a9dca9d6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -73,7 +72,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { */ private final TaskCallbackService taskCallbackService; - public TaskExecuteProcessor(){ + public TaskExecuteProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); @@ -84,12 +83,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteRequestCommand taskRequestCommand = JsonSerializer.deserialize( + TaskExecuteRequestCommand taskRequestCommand = JSONUtils.parseObject( command.getBody(), TaskExecuteRequestCommand.class); logger.info("received command : {}", taskRequestCommand); - if(taskRequestCommand == null){ + if (taskRequestCommand == null) { logger.error("task execute request command is null"); return; } @@ -97,7 +96,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { String contextJson = taskRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class); - if(taskExecutionContext == null){ + if (taskExecutionContext == null) { logger.error("task execution context is null"); return; } @@ -162,9 +161,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); ackCommand.setHost(taskExecutionContext.getHost()); ackCommand.setStartTime(taskExecutionContext.getStartTime()); - if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){ + if (taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())) { ackCommand.setExecutePath(null); - }else{ + } else { ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); } taskExecutionContext.setLogPath(ackCommand.getLogPath()); @@ -176,7 +175,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { * @param taskExecutionContext taskExecutionContext * @return execute local path */ - private String getExecLocalPath(TaskExecutionContext taskExecutionContext){ + private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { return FileUtils.getProcessExecDir(taskExecutionContext.getProjectId(), taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 800db211c3..21108d1291 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,9 +17,9 @@ package org.apache.dolphinscheduler.server.worker.processor; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; @@ -39,12 +38,15 @@ import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContext import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + /** * task kill processor */ @@ -67,8 +69,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; - - public TaskKillProcessor(){ + public TaskKillProcessor() { this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); @@ -83,7 +84,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskKillRequestCommand killCommand = JsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class); + TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class); logger.info("received kill command : {}", killCommand); Pair> result = doKill(killCommand); @@ -101,14 +102,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param killCommand * @return kill result */ - private Pair> doKill(TaskKillRequestCommand killCommand){ + private Pair> doKill(TaskKillRequestCommand killCommand) { List appIds = Collections.EMPTY_LIST; try { TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); Integer processId = taskExecutionContext.getProcessId(); - if (processId == null || processId.equals(0)){ + if (processId == null || processId.equals(0)) { logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId()); return Pair.of(false, appIds); } @@ -145,7 +146,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); taskKillResponseCommand.setAppIds(result.getRight()); TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); - if(taskExecutionContext != null){ + if (taskExecutionContext != null) { taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setHost(taskExecutionContext.getHost()); taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); @@ -183,7 +184,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { } catch (Exception e) { logger.error("kill yarn job error",e); } finally { - if(logClient != null){ + if (logClient != null) { logClient.close(); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java new file mode 100644 index 0000000000..f4805a7561 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; + +import java.net.InetSocketAddress; +import java.util.Date; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import io.netty.channel.Channel; + +/** + * task ack processor test + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class, TaskResponseEvent.class}) +public class TaskAckProcessorTest { + + private TaskAckProcessor taskAckProcessor; + private TaskResponseService taskResponseService; + private TaskInstanceCacheManagerImpl taskInstanceCacheManager; + private ProcessService processService; + private TaskExecuteAckCommand taskExecuteAckCommand; + private TaskResponseEvent taskResponseEvent; + private Channel channel; + + @Before + public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + + taskResponseService = PowerMockito.mock(TaskResponseService.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService); + + taskInstanceCacheManager = PowerMockito.mock(TaskInstanceCacheManagerImpl.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class)).thenReturn(taskInstanceCacheManager); + + processService = PowerMockito.mock(ProcessService.class); + PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService); + + taskAckProcessor = new TaskAckProcessor(); + + channel = PowerMockito.mock(Channel.class); + taskResponseEvent = PowerMockito.mock(TaskResponseEvent.class); + + taskExecuteAckCommand = new TaskExecuteAckCommand(); + taskExecuteAckCommand.setStatus(1); + taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker"); + taskExecuteAckCommand.setHost("localhost"); + taskExecuteAckCommand.setLogPath("/temp/worker.log"); + taskExecuteAckCommand.setStartTime(new Date()); + taskExecuteAckCommand.setTaskInstanceId(1); + } + + @Test + public void testProcess() { + Command command = taskExecuteAckCommand.convert2Command(); + Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType()); + InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345); + PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress); + PowerMockito.mockStatic(TaskResponseEvent.class); + + PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt())) + .thenReturn(taskResponseEvent); + TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class); + PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance); + + taskAckProcessor.process(channel,command); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java new file mode 100644 index 0000000000..c7f047569e --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.processor; + +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; + +import java.util.ArrayList; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; + +import io.netty.channel.Channel; + +/** + * task response processor test + */ +public class TaskKillResponseProcessorTest { + + private TaskKillResponseProcessor taskKillResponseProcessor; + + private TaskKillResponseCommand taskKillResponseCommand; + + private Channel channel; + + @Before + public void before() { + taskKillResponseProcessor = new TaskKillResponseProcessor(); + channel = PowerMockito.mock(Channel.class); + taskKillResponseCommand = new TaskKillResponseCommand(); + taskKillResponseCommand.setAppIds( + new ArrayList() {{ add("task_1"); }}); + taskKillResponseCommand.setHost("localhost"); + taskKillResponseCommand.setProcessId(1); + taskKillResponseCommand.setStatus(1); + taskKillResponseCommand.setTaskInstanceId(1); + + } + + @Test + public void testProcess() { + Command command = taskKillResponseCommand.convert2Command(); + Assert.assertEquals(CommandType.TASK_KILL_RESPONSE,command.getType()); + taskKillResponseProcessor.process(channel,command); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java index 474bf12c77..75753c78d6 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java @@ -14,8 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.log; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand; @@ -28,12 +30,10 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * log client */ @@ -90,7 +90,7 @@ public class LogClientService { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - RollViewLogResponseCommand rollReviewLog = JsonSerializer.deserialize( + RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject( response.getBody(), RollViewLogResponseCommand.class); return rollReviewLog.getMsg(); } @@ -119,7 +119,7 @@ public class LogClientService { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - ViewLogResponseCommand viewLog = JsonSerializer.deserialize( + ViewLogResponseCommand viewLog = JSONUtils.parseObject( response.getBody(), ViewLogResponseCommand.class); return viewLog.getMsg(); } @@ -148,7 +148,7 @@ public class LogClientService { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - GetLogBytesResponseCommand getLog = JsonSerializer.deserialize( + GetLogBytesResponseCommand getLog = JSONUtils.parseObject( response.getBody(), GetLogBytesResponseCommand.class); return getLog.getData(); } @@ -160,7 +160,6 @@ public class LogClientService { return result; } - /** * remove task log * @@ -178,7 +177,7 @@ public class LogClientService { Command command = request.convert2Command(); Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT); if (response != null) { - RemoveTaskLogResponseCommand taskLogResponse = JsonSerializer.deserialize( + RemoveTaskLogResponseCommand taskLogResponse = JSONUtils.parseObject( response.getBody(), RemoveTaskLogResponseCommand.class); return taskLogResponse.getStatus(); } diff --git a/pom.xml b/pom.xml index 92206689a1..8bba558867 100644 --- a/pom.xml +++ b/pom.xml @@ -831,6 +831,8 @@ **/server/master/MasterExecThreadTest.java **/server/master/ParamsTest.java **/server/master/SubProcessTaskTest.java + **/server/master/processor/TaskAckProcessorTest.java + **/server/master/processor/TaskKillResponseProcessorTest.java **/server/register/ZookeeperNodeManagerTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java