From 46b8b1526e9aeb3362bd874e2e84afb0085fd4c6 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Fri, 22 Jan 2021 17:22:33 +0800 Subject: [PATCH 01/20] [Improvement-#4503][dolphinscheduler-dist]Jackson version upgraded to 2.10.5 --- dolphinscheduler-common/sql/soft_version | 1 + .../org/apache/dolphinscheduler/common/utils/JSONUtils.java | 6 +++++- dolphinscheduler-dist/release-docs/LICENSE | 6 +++--- pom.xml | 2 +- tools/dependencies/known-dependencies.txt | 6 +++--- 5 files changed, 13 insertions(+), 8 deletions(-) create mode 100644 dolphinscheduler-common/sql/soft_version diff --git a/dolphinscheduler-common/sql/soft_version b/dolphinscheduler-common/sql/soft_version new file mode 100644 index 0000000000..5409e995b5 --- /dev/null +++ b/dolphinscheduler-common/sql/soft_version @@ -0,0 +1 @@ +32432423 \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index fc11a2add2..4ab04997d8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -294,7 +294,11 @@ public class JSONUtils { public static ObjectNode parseObject(String text) { try { - return (ObjectNode) objectMapper.readTree(text); + if (text.isEmpty()) { + return parseObject(text, ObjectNode.class); + } else { + return (ObjectNode) objectMapper.readTree(text); + } } catch (Exception e) { throw new RuntimeException("String json deserialization exception.", e); } diff --git a/dolphinscheduler-dist/release-docs/LICENSE b/dolphinscheduler-dist/release-docs/LICENSE index 4f8afe6741..44892a242e 100644 --- a/dolphinscheduler-dist/release-docs/LICENSE +++ b/dolphinscheduler-dist/release-docs/LICENSE @@ -286,10 +286,10 @@ The text of each license is also included at licenses/LICENSE-[project].txt. httpclient 4.4.1: https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient/4.4.1, Apache 2.0 httpcore 4.4.1: https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore/4.4.1, Apache 2.0 httpmime 4.5.7: https://mvnrepository.com/artifact/org.apache.httpcomponents/httpmime/4.5.7, Apache 2.0 - jackson-annotations 2.9.10: https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations/2.9.10, Apache 2.0 - jackson-core 2.9.10: https://github.com/FasterXML/jackson-core, Apache 2.0 + jackson-annotations 2.10.5: https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations/2.10.5, Apache 2.0 + jackson-core 2.10.5: https://github.com/FasterXML/jackson-core, Apache 2.0 jackson-core-asl 1.9.13: https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-core-asl/1.9.13, Apache 2.0 - jackson-databind 2.9.10: https://github.com/FasterXML/jackson-databind, Apache 2.0 + jackson-databind 2.10.5: https://github.com/FasterXML/jackson-databind, Apache 2.0 jackson-datatype-jdk8 2.9.10: https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jdk8/2.9.10, Apache 2.0 jackson-datatype-jsr310 2.9.10: https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.9.10, Apache 2.0 jackson-jaxrs 1.9.13: https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-jaxrs/1.9.13, Apache 2.0 and LGPL 2.1 diff --git a/pom.xml b/pom.xml index 5c8dd67613..63776bdb03 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ 1.2.3 2.7.3 2.3.0 - 2.9.10 + 2.10.5 3.2.0 2.0.1 5.0.5 diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index ebec45992f..188db804f1 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -79,10 +79,10 @@ httpclient-4.4.1.jar httpcore-4.4.1.jar httpmime-4.5.12.jar j2objc-annotations-1.1.jar -jackson-annotations-2.9.10.jar -jackson-core-2.9.10.jar +jackson-annotations-2.10.5.jar +jackson-core-2.10.5.jar jackson-core-asl-1.9.13.jar -jackson-databind-2.9.10.jar +jackson-databind-2.10.5.jar jackson-datatype-jdk8-2.9.10.jar jackson-datatype-jsr310-2.9.10.jar jackson-jaxrs-1.9.13.jar From 8ec345c35b49004b9a726bdf1c520afb76ca8e2d Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Thu, 18 Feb 2021 20:48:09 +0800 Subject: [PATCH 02/20] [Improvement][TaskExecuteProcessor]Delete redundant code in method of setTaskCache --- .../server/worker/processor/TaskExecuteProcessor.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index fc8b33a488..5779cac119 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -95,8 +95,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { * @param taskExecutionContext task */ private void setTaskCache(TaskExecutionContext taskExecutionContext) { - TaskExecutionContext preTaskCache = new TaskExecutionContext(); - preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); } From b352c52f33533666cbd5d409adc0bf981870f4d8 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Sun, 21 Feb 2021 23:47:48 +0800 Subject: [PATCH 03/20] [Improvement][master] Replace sleep&task with poll in TaskPriorityQueueConsumer#4137 --- .../consumer/TaskPriorityQueueConsumer.java | 13 ++++--------- .../service/queue/TaskPriorityQueue.java | 13 +++++++++++++ .../service/queue/TaskPriorityQueueImpl.java | 15 +++++++++++++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index d7de840dfa..c4af5c0b03 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -58,11 +58,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -124,12 +120,11 @@ public class TaskPriorityQueueConsumer extends Thread { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); for (int i = 0; i < fetchTaskNum; i++) { - if (taskPriorityQueue.size() <= 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); + TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (Objects.isNull(taskPriority)) { continue; } - // if not task , blocking here - TaskPriority taskPriority = taskPriorityQueue.take(); + boolean dispatchResult = dispatch(taskPriority); if (!dispatchResult) { failedDispatchTasks.add(taskPriority); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java index 14c6b382d4..1325afe9b2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; +import java.util.concurrent.TimeUnit; + /** * task priority queue * @param @@ -41,6 +43,17 @@ public interface TaskPriorityQueue { */ T take() throws TaskPriorityQueueException, InterruptedException; + + /** + * poll taskInfo with timeout + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException; + /** * size * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 694d4c4763..8775a272e5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import org.springframework.stereotype.Service; @@ -61,6 +62,20 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { return queue.take(); } + /** + * poll taskInfo with timeout + * + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + @Override + public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { + return queue.poll(timeout,unit); + } + /** * queue size * From f2e41c298f805da54e680ec12dccd0584eb2a87d Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 01:16:07 +0800 Subject: [PATCH 04/20] [Checkstyle][TaskPriorityQueueConsumer]import check --- .../server/master/consumer/TaskPriorityQueueConsumer.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index c4af5c0b03..69058a49b8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -58,7 +58,12 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; From 354b1cd928433f37fdc532229d6cddd778fab250 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 09:56:52 +0800 Subject: [PATCH 05/20] [Checkstyle][common]delete unknown soft_version --- dolphinscheduler-common/sql/soft_version | 1 - 1 file changed, 1 deletion(-) delete mode 100644 dolphinscheduler-common/sql/soft_version diff --git a/dolphinscheduler-common/sql/soft_version b/dolphinscheduler-common/sql/soft_version deleted file mode 100644 index 5409e995b5..0000000000 --- a/dolphinscheduler-common/sql/soft_version +++ /dev/null @@ -1 +0,0 @@ -32432423 \ No newline at end of file From 5faa5d170331b64b85b145716b41c9e49927f0d0 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 10:20:22 +0800 Subject: [PATCH 06/20] [BUG][PeerTaskInstancePriorityQueue]poll implemented in PriorityQueue --- .../queue/PeerTaskInstancePriorityQueue.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 5ce42242ee..72d0bf6122 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; /** * Task instances priority queue implementation @@ -61,6 +62,22 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue Date: Mon, 22 Feb 2021 10:52:38 +0800 Subject: [PATCH 07/20] [Test][PeerTaskInstancePriorityQueueTest]testPoll added --- .../PeerTaskInstancePriorityQueueTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java index cf39d57b8b..65dd0ce10e 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java @@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; +import java.util.concurrent.TimeUnit; + import org.junit.Assert; import org.junit.Test; @@ -57,6 +59,21 @@ public class PeerTaskInstancePriorityQueueTest { Assert.assertTrue(queue.size() < peekBeforeLength); } + @Test + public void testPoll() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + int peekBeforeLength = queue.size(); + System.out.println(System.currentTimeMillis()); + queue.poll(1000, TimeUnit.MILLISECONDS); + queue.poll(1000, TimeUnit.MILLISECONDS); + System.out.println(System.currentTimeMillis()); + Assert.assertTrue(queue.size()== 0); + queue.poll(1000, TimeUnit.MILLISECONDS); + System.out.println(System.currentTimeMillis()); + + } + + /** * get queue * From 9da30dd1753237e78e3cd6ed510f5ecf3c3bc100 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 12:14:25 +0800 Subject: [PATCH 08/20] [Test][TaskPriorityQueueImplTest]Test with coverity --- .../queue/TaskPriorityQueueImplTest.java | 74 +++++++++++++++++++ .../PeerTaskInstancePriorityQueueTest.java | 5 +- 2 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java new file mode 100644 index 0000000000..3328715c98 --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -0,0 +1,74 @@ +package org.apache.dolphinscheduler.service.queue; + +import static org.junit.Assert.*; + +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; + +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +public class TaskPriorityQueueImplTest { + + @Test + public void put() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + Assert.assertEquals(2,queue.size()); + } + + @Test + public void take() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + int peekBeforeLength = queue.size(); + queue.take(); + Assert.assertTrue(queue.size() < peekBeforeLength); + } + + @Test + public void poll() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + int peekBeforeLength = queue.size(); + queue.poll(1000, TimeUnit.MILLISECONDS); + queue.poll(1000, TimeUnit.MILLISECONDS); + Assert.assertTrue(queue.size() == 0); + System.out.println(System.currentTimeMillis()); + queue.poll(1000, TimeUnit.MILLISECONDS); + System.out.println(System.currentTimeMillis()); + } + + @Test + public void size() throws Exception { + Assert.assertTrue( getPriorityQueue().size() == 2); + } + + + /** + * get queue + * + * @return queue + * @throws Exception + */ + private TaskPriorityQueue getPriorityQueue() throws Exception { + TaskPriorityQueue queue = new TaskPriorityQueueImpl(); + TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1); + TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2); + queue.put(taskInstanceHigPriority); + queue.put(taskInstanceMediumPriority); + return queue; + } + + + /** + * create task priority + * @param priority + * @param processInstanceId + * @return + */ + private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) { + TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default"); + return priorityOne; + } +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java index 65dd0ce10e..1601054fbe 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java @@ -63,17 +63,14 @@ public class PeerTaskInstancePriorityQueueTest { public void testPoll() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); - System.out.println(System.currentTimeMillis()); queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS); + Assert.assertTrue(queue.size() == 0); System.out.println(System.currentTimeMillis()); - Assert.assertTrue(queue.size()== 0); queue.poll(1000, TimeUnit.MILLISECONDS); System.out.println(System.currentTimeMillis()); - } - /** * get queue * From 1838c817a1df93f8341a8fb0ec0ce1ec93d797e0 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 12:18:23 +0800 Subject: [PATCH 09/20] [CodeStyle][QueueTest]import style --- .../service/queue/TaskPriorityQueueImplTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index 3328715c98..de2acadfab 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -1,10 +1,7 @@ package org.apache.dolphinscheduler.service.queue; -import static org.junit.Assert.*; import org.apache.dolphinscheduler.common.enums.Priority; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import java.util.concurrent.TimeUnit; @@ -16,7 +13,7 @@ public class TaskPriorityQueueImplTest { @Test public void put() throws Exception { TaskPriorityQueue queue = getPriorityQueue(); - Assert.assertEquals(2,queue.size()); + Assert.assertEquals(2, queue.size()); } @Test @@ -41,7 +38,7 @@ public class TaskPriorityQueueImplTest { @Test public void size() throws Exception { - Assert.assertTrue( getPriorityQueue().size() == 2); + Assert.assertTrue(getPriorityQueue().size() == 2); } @@ -63,6 +60,7 @@ public class TaskPriorityQueueImplTest { /** * create task priority + * * @param priority * @param processInstanceId * @return From d1189674af1aadd890dde0f2090f72342a2490ef Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 12:21:14 +0800 Subject: [PATCH 10/20] [CodeStyle][QueueTest]code style --- .../java/queue/PeerTaskInstancePriorityQueueTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java index 1601054fbe..03e3743452 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java @@ -39,7 +39,7 @@ public class PeerTaskInstancePriorityQueueTest { TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); - Assert.assertEquals(2,queue.size()); + Assert.assertEquals(2, queue.size()); } @Test @@ -47,7 +47,7 @@ public class PeerTaskInstancePriorityQueueTest { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); queue.peek(); - Assert.assertEquals(peekBeforeLength,queue.size()); + Assert.assertEquals(peekBeforeLength, queue.size()); } @@ -89,8 +89,8 @@ public class PeerTaskInstancePriorityQueueTest { /** * create task instance * - * @param name name - * @param priority priority + * @param name name + * @param priority priority * @return */ private TaskInstance createTaskInstance(String name, Priority priority) { From 28975d73971653a706818eaa82111cc311670f3d Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 12:25:02 +0800 Subject: [PATCH 11/20] [CodeStyle][QueueTest]empty line more than one line of impor style --- .../service/queue/TaskPriorityQueueImplTest.java | 3 --- .../src/test/java/queue/PeerTaskInstancePriorityQueueTest.java | 2 -- 2 files changed, 5 deletions(-) diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index de2acadfab..f3514a09f2 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -1,6 +1,5 @@ package org.apache.dolphinscheduler.service.queue; - import org.apache.dolphinscheduler.common.enums.Priority; import java.util.concurrent.TimeUnit; @@ -41,7 +40,6 @@ public class TaskPriorityQueueImplTest { Assert.assertTrue(getPriorityQueue().size() == 2); } - /** * get queue * @@ -57,7 +55,6 @@ public class TaskPriorityQueueImplTest { return queue; } - /** * create task priority * diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java index 03e3743452..8ad768a331 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java @@ -20,9 +20,7 @@ package queue; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; - import java.util.concurrent.TimeUnit; - import org.junit.Assert; import org.junit.Test; From 450c84baf10a58cc4145974d5568a4f8798dfda5 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 13:24:15 +0800 Subject: [PATCH 12/20] [CodeStyle][PeerQueueTest]TimeUnit seperated from previous import --- .../src/test/java/queue/PeerTaskInstancePriorityQueueTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java index 8ad768a331..03e3743452 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java @@ -20,7 +20,9 @@ package queue; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; + import java.util.concurrent.TimeUnit; + import org.junit.Assert; import org.junit.Test; From 8d00386d27c91fa8dc249c038268d6d5e5dcd118 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 13:25:57 +0800 Subject: [PATCH 13/20] [CodeStyle][QueueTest]import style --- .../src/test/java/queue/TaskUpdateQueueTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java index 2c13afa227..96d69d8940 100644 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java @@ -17,12 +17,13 @@ package queue; +import static org.junit.Assert.assertEquals; + import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import org.junit.Test; -import static org.junit.Assert.*; +import org.junit.Test; public class TaskUpdateQueueTest { @@ -30,7 +31,7 @@ public class TaskUpdateQueueTest { * test put */ @Test - public void testQueue() throws Exception{ + public void testQueue() throws Exception { /** * 1_1_2_1_default From 9d8b35a4b485c9f98abed09e4fca37ba0d385cde Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 14:14:37 +0800 Subject: [PATCH 14/20] [Test][TaskPQImplTest]license header appended --- .../queue/TaskPriorityQueueImplTest.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index f3514a09f2..9868dce9a4 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.common.enums.Priority; From 6e0093c9caec58b960660f2779cb45f90f6d9080 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 15:48:59 +0800 Subject: [PATCH 15/20] [Test][service]Adjust test of queue --- .../PeerTaskInstancePriorityQueueTest.java | 57 ++++++++----- .../queue/TaskPriorityQueueImplTest.java | 56 +++++++++++++ .../src/test/java/queue/TaskPriorityTest.java | 83 ------------------- .../test/java/queue/TaskUpdateQueueTest.java | 58 ------------- 4 files changed, 93 insertions(+), 161 deletions(-) rename dolphinscheduler-service/src/test/java/{ => org/apache/dolphinscheduler/service}/queue/PeerTaskInstancePriorityQueueTest.java (71%) delete mode 100644 dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java delete mode 100644 dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java similarity index 71% rename from dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 03e3743452..19feab57c8 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -15,25 +15,21 @@ * limitations under the License. */ -package queue; +package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -/** - * Task instances priority queue implementation - * All the task instances are in the same process instance. - */ public class PeerTaskInstancePriorityQueueTest { @Test - public void testPut() throws Exception { + public void put() throws TaskPriorityQueueException { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); @@ -43,16 +39,7 @@ public class PeerTaskInstancePriorityQueueTest { } @Test - public void testPeek() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - int peekBeforeLength = queue.size(); - queue.peek(); - Assert.assertEquals(peekBeforeLength, queue.size()); - - } - - @Test - public void testTake() throws Exception { + public void take() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); queue.take(); @@ -60,7 +47,7 @@ public class PeerTaskInstancePriorityQueueTest { } @Test - public void testPoll() throws Exception { + public void poll() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); queue.poll(1000, TimeUnit.MILLISECONDS); @@ -71,6 +58,37 @@ public class PeerTaskInstancePriorityQueueTest { System.out.println(System.currentTimeMillis()); } + @Test + public void peek() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + int peekBeforeLength = queue.size(); + queue.peek(); + Assert.assertEquals(peekBeforeLength, queue.size()); + } + + @Test + public void size() throws Exception { + Assert.assertTrue(getPeerTaskInstancePriorityQueue().size() == 2); + } + + @Test + public void contains() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + queue.put(taskInstanceMediumPriority); + Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); + } + + @Test + public void remove() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + queue.put(taskInstanceMediumPriority); + int peekBeforeLength = queue.size(); + queue.remove(taskInstanceMediumPriority); + Assert.assertNotEquals(peekBeforeLength, queue.size()); + } + /** * get queue * @@ -80,7 +98,7 @@ public class PeerTaskInstancePriorityQueueTest { private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); - TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); return queue; @@ -99,5 +117,4 @@ public class PeerTaskInstancePriorityQueueTest { taskInstance.setTaskInstancePriority(priority); return taskInstance; } - } \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index 9868dce9a4..d90011b847 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.common.enums.Priority; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.Assert; @@ -26,6 +29,59 @@ import org.junit.Test; public class TaskPriorityQueueImplTest { + @Test + public void testSort() { + TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default"); + TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default"); + TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default"); + List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 1, 0, 0, "default"); + priorityTwo = new TaskPriority(0, 2, 0, 0, "default"); + priorityThree = new TaskPriority(0, 3, 0, 0, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 1, 0, "default"); + priorityTwo = new TaskPriority(0, 0, 2, 0, "default"); + priorityThree = new TaskPriority(0, 0, 3, 0, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 1, "default"); + priorityTwo = new TaskPriority(0, 0, 0, 2, "default"); + priorityThree = new TaskPriority(0, 0, 0, 3, "default"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); + priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2"); + priorityThree = new TaskPriority(0, 0, 0, 0, "default_3"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys + ); + } + @Test public void put() throws Exception { TaskPriorityQueue queue = getPriorityQueue(); diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java deleted file mode 100644 index 151177016f..0000000000 --- a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package queue; - -import org.apache.dolphinscheduler.service.queue.TaskPriority; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.junit.Assert; -import org.junit.Test; - -public class TaskPriorityTest { - - @Test - public void testSort() { - TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default"); - TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default"); - TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default"); - List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPrioritys); - Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys - ); - - priorityOne = new TaskPriority(0, 1, 0, 0, "default"); - priorityTwo = new TaskPriority(0, 2, 0, 0, "default"); - priorityThree = new TaskPriority(0, 3, 0, 0, "default"); - taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPrioritys); - Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys - ); - - priorityOne = new TaskPriority(0, 0, 1, 0, "default"); - priorityTwo = new TaskPriority(0, 0, 2, 0, "default"); - priorityThree = new TaskPriority(0, 0, 3, 0, "default"); - taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPrioritys); - Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys - ); - - priorityOne = new TaskPriority(0, 0, 0, 1, "default"); - priorityTwo = new TaskPriority(0, 0, 0, 2, "default"); - priorityThree = new TaskPriority(0, 0, 0, 3, "default"); - taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPrioritys); - Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys - ); - - priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); - priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2"); - priorityThree = new TaskPriority(0, 0, 0, 0, "default_3"); - taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); - Collections.sort(taskPrioritys); - Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys - ); - } -} diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java deleted file mode 100644 index 96d69d8940..0000000000 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package queue; - -import static org.junit.Assert.assertEquals; - -import org.apache.dolphinscheduler.service.queue.TaskPriority; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; - -import org.junit.Test; - -public class TaskUpdateQueueTest { - - /** - * test put - */ - @Test - public void testQueue() throws Exception { - - /** - * 1_1_2_1_default - * 1_1_2_2_default - * 1_1_0_3_default - * 1_1_0_4_default - */ - TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default"); - TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default"); - TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default"); - TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default"); - - TaskPriorityQueue queue = new TaskPriorityQueueImpl(); - queue.put(taskInfo1); - queue.put(taskInfo2); - queue.put(taskInfo3); - queue.put(taskInfo4); - - assertEquals(taskInfo3, queue.take()); - assertEquals(taskInfo4, queue.take()); - assertEquals(taskInfo1, queue.take()); - assertEquals(taskInfo2, queue.take()); - } -} From 757c116b24d66caeb0b2870c28d04884c1682368 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Mon, 22 Feb 2021 22:38:56 +0800 Subject: [PATCH 16/20] [BUG][TaskExecuteProcessor]setTaskCache is first preTaskCache --- .../server/worker/processor/TaskExecuteProcessor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 5779cac119..1b3d1757e1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -95,7 +95,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { * @param taskExecutionContext task */ private void setTaskCache(TaskExecutionContext taskExecutionContext) { - taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + TaskExecutionContext preTaskCache = new TaskExecutionContext(); + preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache); } public TaskExecuteProcessor(AlertClientService alertClientService) { From 3a215cb952089f23c35a2fd4d62ce9268d0cbbe7 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Wed, 24 Feb 2021 10:49:07 +0800 Subject: [PATCH 17/20] [Performance][PeerTaskInstancePriorityQueue]Nearly realization of poll with timeout --- .../queue/PeerTaskInstancePriorityQueue.java | 29 +++++++++++++++++-- .../PeerTaskInstancePriorityQueueTest.java | 21 +++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 72d0bf6122..aa74cfc760 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -24,6 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * Task instances priority queue implementation @@ -40,6 +42,16 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + /** + * Lock used for all public operations + */ + private final ReentrantLock lock = new ReentrantLock(true); + + /** + * Condition for blocking when empty + */ + private final Condition notEmpty = lock.newCondition(); + /** * put task instance to priority queue * @@ -64,6 +76,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue 0) { + nanos--; + } + } finally { + lock.unlock(); } - return queue.poll(); + return result; } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 19feab57c8..0d9833354e 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -52,10 +52,23 @@ public class PeerTaskInstancePriorityQueueTest { int peekBeforeLength = queue.size(); queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS); - Assert.assertTrue(queue.size() == 0); - System.out.println(System.currentTimeMillis()); + Assert.assertEquals(0, queue.size()); + Thread producer = new Thread(() -> { + System.out.println(String.format("Ready to producing...,now time is %s ", System.currentTimeMillis())); + try { + Thread.sleep(100); + TaskInstance task = createTaskInstance("low_task", Priority.LOW); + queue.put(task); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println(String.format("End to produce %s at time %s", + queue.peek() != null ? queue.peek().getName() : null, System.currentTimeMillis())); + }); + producer.start(); + System.out.println("Begin to consume at " + System.currentTimeMillis()); queue.poll(1000, TimeUnit.MILLISECONDS); - System.out.println(System.currentTimeMillis()); + System.out.println("End to consume at " + System.currentTimeMillis()); } @Test @@ -68,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void size() throws Exception { - Assert.assertTrue(getPeerTaskInstancePriorityQueue().size() == 2); + Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size()); } @Test From 488726139fc31d0159f674edd6d787a37be6df1f Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Wed, 24 Feb 2021 10:52:48 +0800 Subject: [PATCH 18/20] [Performance][PeerTaskInstancePriorityQueue]code style formatter --- .../service/queue/PeerTaskInstancePriorityQueue.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index aa74cfc760..7a1bc799ed 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -24,7 +24,6 @@ import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -47,11 +46,6 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue Date: Wed, 24 Feb 2021 11:47:19 +0800 Subject: [PATCH 19/20] [BUG][PeerTaskInstancePriorityQueue]poll with timeout is not currently supported --- .../queue/PeerTaskInstancePriorityQueue.java | 17 +++---------- .../PeerTaskInstancePriorityQueueTest.java | 25 ++++--------------- 2 files changed, 9 insertions(+), 33 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 7a1bc799ed..40edfc36a2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -70,6 +70,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit) * because this method of override interface used without considering accuracy of timeout * @@ -80,19 +81,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue 0) { - nanos--; - } - } finally { - lock.unlock(); - } - return result; + public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException { + throw new TaskPriorityQueueException("This operation is not currently supported," + + "and suggest to use PriorityBlockingQueue if you want!"); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 0d9833354e..550c4bea3a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -49,26 +49,11 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void poll() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - int peekBeforeLength = queue.size(); - queue.poll(1000, TimeUnit.MILLISECONDS); - queue.poll(1000, TimeUnit.MILLISECONDS); - Assert.assertEquals(0, queue.size()); - Thread producer = new Thread(() -> { - System.out.println(String.format("Ready to producing...,now time is %s ", System.currentTimeMillis())); - try { - Thread.sleep(100); - TaskInstance task = createTaskInstance("low_task", Priority.LOW); - queue.put(task); - } catch (Exception e) { - e.printStackTrace(); - } - System.out.println(String.format("End to produce %s at time %s", - queue.peek() != null ? queue.peek().getName() : null, System.currentTimeMillis())); - }); - producer.start(); - System.out.println("Begin to consume at " + System.currentTimeMillis()); - queue.poll(1000, TimeUnit.MILLISECONDS); - System.out.println("End to consume at " + System.currentTimeMillis()); + try { + queue.poll(1000, TimeUnit.MILLISECONDS); + } catch (TaskPriorityQueueException e) { + e.printStackTrace(); + } } @Test From b2a871b4c5b8de6efd1840c1bd7b77d8c5265f59 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Wed, 24 Feb 2021 12:18:29 +0800 Subject: [PATCH 20/20] [BUG][PeerTaskInstancePriorityQueue]string formatter --- .../service/queue/PeerTaskInstancePriorityQueue.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 40edfc36a2..aa278a624e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -82,8 +82,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue