From 0bdfa0cff958b3ee089d732590e735404bdc81dc Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Fri, 17 Jun 2022 16:14:56 +0800 Subject: [PATCH] Fix PeerTaskInstancePriorityQueue cannot contains method use taskInstanceId to check (#10491) --- .../runner/WorkflowExecuteRunnable.java | 2 +- .../queue/PeerTaskInstancePriorityQueue.java | 32 +++++++++++++------ .../PeerTaskInstancePriorityQueueTest.java | 6 ++++ 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index ae627c44f5..dfaa1810be 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -1103,7 +1103,7 @@ public class WorkflowExecuteRunnable implements Runnable { } /** - * encapsulation task + * encapsulation task, this method will only create a new task instance, the return task instance will not contain id. * * @param processInstance process instance * @param taskNode taskNode diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 2e939ee332..cc7f39e402 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -29,6 +29,8 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; + /** * Task instances priority queue implementation * All the task instances are in the same process instance. @@ -43,7 +45,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); - private final Set taskInstanceIdSet = Collections.synchronizedSet(new HashSet<>()); + private final Set taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>()); /** * put task instance to priority queue @@ -53,8 +55,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { + private static class TaskInfoComparator implements Comparator { /** * compare o1 o2 diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 67e40d1189..0daf8bf66a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -36,6 +36,8 @@ public class PeerTaskInstancePriorityQueueTest { queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); Assert.assertEquals(2, queue.size()); + Assert.assertTrue(queue.contains(taskInstanceHigPriority)); + Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); } @Test @@ -108,6 +110,9 @@ public class PeerTaskInstancePriorityQueueTest { TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); + TaskInstance taskInstance2 = createTaskInstance("medium2", Priority.MEDIUM, 1); + taskInstance2.setProcessInstanceId(2); + Assert.assertFalse(queue.contains(taskInstance2)); } @Test @@ -118,6 +123,7 @@ public class PeerTaskInstancePriorityQueueTest { int peekBeforeLength = queue.size(); queue.remove(taskInstanceMediumPriority); Assert.assertNotEquals(peekBeforeLength, queue.size()); + Assert.assertFalse(queue.contains(taskInstanceMediumPriority)); } /**