From 3a215cb952089f23c35a2fd4d62ce9268d0cbbe7 Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Wed, 24 Feb 2021 10:49:07 +0800 Subject: [PATCH] [Performance][PeerTaskInstancePriorityQueue]Nearly realization of poll with timeout --- .../queue/PeerTaskInstancePriorityQueue.java | 29 +++++++++++++++++-- .../PeerTaskInstancePriorityQueueTest.java | 21 +++++++++++--- 2 files changed, 43 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 72d0bf612..aa74cfc76 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -24,6 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * Task instances priority queue implementation @@ -40,6 +42,16 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + /** + * Lock used for all public operations + */ + private final ReentrantLock lock = new ReentrantLock(true); + + /** + * Condition for blocking when empty + */ + private final Condition notEmpty = lock.newCondition(); + /** * put task instance to priority queue * @@ -64,6 +76,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue 0) { + nanos--; + } + } finally { + lock.unlock(); } - return queue.poll(); + return result; } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 19feab57c..0d9833354 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -52,10 +52,23 @@ public class PeerTaskInstancePriorityQueueTest { int peekBeforeLength = queue.size(); queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS); - Assert.assertTrue(queue.size() == 0); - System.out.println(System.currentTimeMillis()); + Assert.assertEquals(0, queue.size()); + Thread producer = new Thread(() -> { + System.out.println(String.format("Ready to producing...,now time is %s ", System.currentTimeMillis())); + try { + Thread.sleep(100); + TaskInstance task = createTaskInstance("low_task", Priority.LOW); + queue.put(task); + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println(String.format("End to produce %s at time %s", + queue.peek() != null ? queue.peek().getName() : null, System.currentTimeMillis())); + }); + producer.start(); + System.out.println("Begin to consume at " + System.currentTimeMillis()); queue.poll(1000, TimeUnit.MILLISECONDS); - System.out.println(System.currentTimeMillis()); + System.out.println("End to consume at " + System.currentTimeMillis()); } @Test @@ -68,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void size() throws Exception { - Assert.assertTrue(getPeerTaskInstancePriorityQueue().size() == 2); + Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size()); } @Test