|
|
|
@ -17,12 +17,15 @@
|
|
|
|
|
package cn.escheduler.common.queue; |
|
|
|
|
|
|
|
|
|
import cn.escheduler.common.Constants; |
|
|
|
|
import org.junit.After; |
|
|
|
|
import org.junit.Assert; |
|
|
|
|
import org.junit.Before; |
|
|
|
|
import org.junit.Test; |
|
|
|
|
import org.slf4j.Logger; |
|
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
|
|
|
|
|
|
import java.util.Arrays; |
|
|
|
|
import java.util.List; |
|
|
|
|
import java.util.Random; |
|
|
|
|
|
|
|
|
|
import static org.junit.Assert.assertEquals; |
|
|
|
@ -34,61 +37,62 @@ public class TaskQueueImplTest {
|
|
|
|
|
|
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); |
|
|
|
|
|
|
|
|
|
ITaskQueue tasksQueue = null; |
|
|
|
|
|
|
|
|
|
@Before |
|
|
|
|
public void before(){ |
|
|
|
|
tasksQueue = TaskQueueFactory.getTaskQueueInstance(); |
|
|
|
|
//clear all data
|
|
|
|
|
tasksQueue.delete(); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void testTaskQueue(){ |
|
|
|
|
|
|
|
|
|
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); |
|
|
|
|
@After |
|
|
|
|
public void after(){ |
|
|
|
|
//clear all data
|
|
|
|
|
tasksQueue.delete(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Test |
|
|
|
|
public void testAdd(){ |
|
|
|
|
|
|
|
|
|
//add
|
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775"); |
|
|
|
|
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775"); |
|
|
|
|
|
|
|
|
|
//pop
|
|
|
|
|
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); |
|
|
|
|
List<String> tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); |
|
|
|
|
|
|
|
|
|
assertEquals(node1,"1"); |
|
|
|
|
if(tasks.size() < 0){ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); |
|
|
|
|
assertEquals(node2,"2"); |
|
|
|
|
//pop
|
|
|
|
|
String node1 = tasks.get(0); |
|
|
|
|
|
|
|
|
|
//sadd
|
|
|
|
|
String task1 = "1.1.1.1-1-mr"; |
|
|
|
|
String task2 = "1.1.1.2-2-mr"; |
|
|
|
|
String task3 = "1.1.1.3-3-mr"; |
|
|
|
|
String task4 = "1.1.1.4-4-mr"; |
|
|
|
|
String task5 = "1.1.1.5-5-mr"; |
|
|
|
|
assertEquals(node1,"0_0000000001_1_0000000001"); |
|
|
|
|
|
|
|
|
|
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1); |
|
|
|
|
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2); |
|
|
|
|
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3); |
|
|
|
|
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4); |
|
|
|
|
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); |
|
|
|
|
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task
|
|
|
|
|
tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); |
|
|
|
|
|
|
|
|
|
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5); |
|
|
|
|
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); |
|
|
|
|
//srem
|
|
|
|
|
tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5); |
|
|
|
|
//smembers
|
|
|
|
|
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4); |
|
|
|
|
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); |
|
|
|
|
if(tasks.size() < 0){ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
String node2 = tasks.get(0); |
|
|
|
|
assertEquals(node2,"0_0000000001_1_0000000001"); |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* test one million data from zookeeper queue |
|
|
|
|
*/ |
|
|
|
|
@Test |
|
|
|
|
public void extremeTest(){ |
|
|
|
|
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); |
|
|
|
|
//clear all data
|
|
|
|
|
tasksQueue.delete(); |
|
|
|
|
int total = 30 * 10000; |
|
|
|
|
|
|
|
|
|
for(int i = 0; i < total; i++) |
|
|
|
@ -104,11 +108,6 @@ public class TaskQueueImplTest {
|
|
|
|
|
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); |
|
|
|
|
assertEquals(node1,"0"); |
|
|
|
|
|
|
|
|
|
//clear all data
|
|
|
|
|
tasksQueue.delete(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|