diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index 76d88868f2..b0ecc62710 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -145,7 +145,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { public List poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); - String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; List list = zk.getChildren().forPath(getTasksPath(key)); if(list != null && list.size() > 0){ @@ -155,7 +154,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int size = list.size(); - Set taskTreeSet = new TreeSet<>(new Comparator() { @Override public int compare(String o1, String o2) { @@ -183,7 +181,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); - //forward compatibility 向前版本兼容 + //forward compatibility if(taskDetailArrs.length >= 4){ //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} @@ -201,9 +199,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { formatTask += Constants.UNDERLINE + taskDetailArrs[4]; } taskTreeSet.add(formatTask); - } - } List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java deleted file mode 100644 index 14e90ebcdc..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueImplTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.queue; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.IpUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.zk.ZKServer; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Random; - -import static org.junit.Assert.assertEquals; - -/** - * task queue test - */ -public class TaskQueueImplTest { - - private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); - - ITaskQueue tasksQueue = null; - - @Before - public void before(){ - ZKServer.start(); - - tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - - //clear all data - tasksQueue.delete(); - - } - - - @After - public void after(){ - //clear all data - tasksQueue.delete(); - ZKServer.stop(); - } - - - @Test - public void testAdd(){ - - - //add - tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); - tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1"); - tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost())); - tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" + IpUtils.ipToLong(OSUtils.getHost()) + 10); - - List tasks = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1); - - if(tasks.size() <= 0){ - return; - } - - //pop - String node1 = tasks.get(0); - - assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost())); - - - } - - - - /** - * test one million data from zookeeper queue - */ - @Ignore - @Test - public void extremeTest(){ - int total = 30 * 10000; - - for(int i = 0; i < total; i++) { - for(int j = 0; j < total; j++) { - //${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} - //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} - String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j, j == 0 ? 0 : j + new Random().nextInt(100)); - tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, formatTask); - } - } - - String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1).get(0); - assertEquals(node1,"0"); - - } - -} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java new file mode 100644 index 0000000000..3db253daf0 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/queue/TaskQueueZKImplTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.queue; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.IpUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; + +import org.apache.dolphinscheduler.common.zk.ZKServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.*; + +/** + * task queue test + */ +public class TaskQueueZKImplTest { + + + private ITaskQueue tasksQueue = null; + + @Before + public void before(){ + + ZKServer.start(); + tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + //clear all data + tasksQueue.delete(); + } + + @After + public void after(){ + //clear all data + tasksQueue.delete(); + ZKServer.stop(); + } + + + /** + * test take out all the elements + */ + @Test + public void getAllTasks(){ + + //add + init(); + // get all + List allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); + assertEquals(allTasks.size(),2); + //delete all + tasksQueue.delete(); + allTasks = tasksQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); + assertEquals(allTasks.size(),0); + } + + /** + * test check task exists in the task queue or not + */ + @Test + public void checkTaskExists(){ + + String task= "1_0_1_1_-1"; + //add + init(); + // check Exist true + boolean taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task); + assertTrue(taskExists); + + //remove task + tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + // check Exist false + taskExists = tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, task); + assertFalse(taskExists); + } + + /** + * test add element to the queue + */ + @Test + public void add(){ + + //add + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1"); + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost())); + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_2_1_1_" + IpUtils.ipToLong(OSUtils.getHost()) + 10); + + List tasks = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() <= 0){ + return; + } + + //pop + String node1 = tasks.get(0); + assertEquals(node1,"0_0_0_1_" + IpUtils.ipToLong(OSUtils.getHost())); + } + + /** + * test element pops out of the queue + */ + @Test + public void poll(){ + + //add + init(); + List taskList = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 2); + assertEquals(taskList.size(),2); + + assertEquals(taskList.get(0),"0_1_1_1_-1"); + assertEquals(taskList.get(1),"1_0_1_1_-1"); + } + + /** + * test remove element from queue + */ + @Test + public void removeNode(){ + String task = "1_0_1_1_-1"; + //add + init(); + tasksQueue.removeNode(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + assertFalse(tasksQueue.checkTaskExists(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task)); + } + + /** + * test add an element to the set + */ + @Test + public void sadd(){ + + String task = "1_0_1_1_-1"; + tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + //check size + assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); + } + + + /** + * test delete the value corresponding to the key in the set + */ + @Test + public void srem(){ + + String task = "1_0_1_1_-1"; + tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + //check size + assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); + //remove and get size + tasksQueue.srem(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0); + } + + /** + * test gets all the elements of the set based on the key + */ + @Test + public void smembers(){ + + //first init + assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),0); + //add + String task = "1_0_1_1_-1"; + tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + //check size + assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),1); + //add + task = "0_1_1_1_"; + tasksQueue.sadd(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,task); + //check size + assertEquals(tasksQueue.smembers(Constants.DOLPHINSCHEDULER_TASKS_QUEUE).size(),2); + } + + + /** + * init data + */ + private void init(){ + //add + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE,"0_1_1_1_-1"); + } + + + + /** + * test one million data from zookeeper queue + */ + @Ignore + @Test + public void extremeTest(){ + int total = 30 * 10000; + + for(int i = 0; i < total; i++) { + for(int j = 0; j < total; j++) { + //${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + String formatTask = String.format("%s_%d_%s_%d", i, i + 1, j, j == 0 ? 0 : j + new Random().nextInt(100)); + tasksQueue.add(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, formatTask); + } + } + + String node1 = tasksQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, 1).get(0); + assertEquals(node1,"0"); + + } + +} diff --git a/pom.xml b/pom.xml index 2524461575..2413c56f21 100644 --- a/pom.xml +++ b/pom.xml @@ -614,6 +614,7 @@ **/common/utils/*.java **/common/graph/*.java + **/common/queue/*.java **/api/utils/CheckUtilsTest.java **/api/utils/FileUtilsTest.java