From 88b022cb571c76bf6687998bde30d7608f19e1a8 Mon Sep 17 00:00:00 2001 From: "gabry.wu" Date: Thu, 9 Jan 2020 10:41:48 +0800 Subject: [PATCH] ITaskQueue should have "hasTask" function (#1744) * issue https://github.com/apache/incubator-dolphinscheduler/issues/1742 * remove unnecessary catch * fix exception type * fix bad code smell * remove log exceptions * catch exception --- .../dolphinscheduler/common/queue/ITaskQueue.java | 7 +++++++ .../common/queue/TaskQueueZkImpl.java | 15 +++++++++++++++ .../common/zk/ZookeeperOperator.java | 11 +++++++++++ .../common/queue/TaskQueueZKImplTest.java | 11 ++++++++++- .../server/worker/runner/FetchTaskThread.java | 5 +++-- 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java index 6e937f0c3e..5beb8111ad 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/ITaskQueue.java @@ -30,6 +30,13 @@ public interface ITaskQueue { */ List getAllTasks(String key); + /** + * check if has a task + * @param key queue name + * @return true if has; false if not + */ + boolean hasTask(String key); + /** * check task exists in the task queue or not * diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index 8f5677d1b9..eb9e04753d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -75,6 +75,21 @@ public class TaskQueueZkImpl implements ITaskQueue { return new ArrayList<>(); } + /** + * check if has a task + * @param key queue name + * @return true if has; false if not + */ + @Override + public boolean hasTask(String key) { + try { + return zookeeperOperator.hasChildren(key); + } catch (Exception e) { + logger.error("check has task in tasks queue exception",e); + } + return false; + } + /** * check task exists in the task queue or not * diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java index c6faec2b78..9442afd7a0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java @@ -27,6 +27,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -139,6 +140,16 @@ public class ZookeeperOperator implements InitializingBean { } } + public boolean hasChildren(final String key){ + Stat stat ; + try { + stat = zkClient.checkExists().forPath(key); + return stat.getNumChildren() >= 1; + } catch (Exception ex) { + throw new IllegalStateException(ex); + } + } + public boolean isExisted(final String key) { try { return zkClient.checkExists().forPath(key) != null; diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java index 1b44673149..b34a7d6924 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java @@ -64,7 +64,16 @@ public class TaskQueueZKImplTest extends BaseTaskQueueTest { allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); assertEquals(allTasks.size(),0); } - + @Test + public void hasTask(){ + init(); + boolean hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); + assertTrue(hasTask); + //delete all + tasksQueue.delete(); + hasTask = tasksQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); + assertFalse(hasTask); + } /** * test check task exists in the task queue or not */ diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java index 6a2593104a..4899f4b2b5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java @@ -150,8 +150,9 @@ public class FetchTaskThread implements Runnable{ } //whether have tasks, if no tasks , no need lock //get all tasks - List tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); - if (CollectionUtils.isEmpty(tasksQueueList)){ + boolean hasTask = taskQueue.hasTask(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); + + if (!hasTask){ Thread.sleep(Constants.SLEEP_TIME_MILLIS); continue; }