From 1ab0a02a818fa3146786c2e54481c1ad4484d38a Mon Sep 17 00:00:00 2001 From: lenboo Date: Mon, 8 Jul 2019 21:01:25 +0800 Subject: [PATCH] update worker task queue --- .../common/queue/TaskQueueZkImpl.java | 26 ++++++++++++++++--- .../common/queue/TaskQueueImplTest.java | 14 +++++----- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index e6c9df2833..eed7fb1ac5 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -151,7 +151,27 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int size = list.size(); - Set taskTreeSet = new TreeSet<>(); + Set taskTreeSet = new TreeSet<>(new Comparator() { + @Override + public int compare(String o1, String o2) { + + String s1 = o1; + String s2 = o2; + String[] s1Array = s1.split(Constants.UNDERLINE); + if(s1Array.length>4){ + // warning: if this length > 5, need to be changed + s1 = s1.substring(0, s1.lastIndexOf(Constants.UNDERLINE) ); + } + + String[] s2Array = s2.split(Constants.UNDERLINE); + if(s2Array.length>4){ + // warning: if this length > 5, need to be changed + s2 = s2.substring(0, s2.lastIndexOf(Constants.UNDERLINE) ); + } + + return s1.compareTo(s2); + } + }); for (int i = 0; i < size; i++) { @@ -173,8 +193,8 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { continue; } } + formatTask += Constants.UNDERLINE + taskDetailArrs[4]; } - taskTreeSet.add(formatTask); } @@ -229,7 +249,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { int taskId = Integer.parseInt(taskArray[3]); StringBuilder sb = new StringBuilder(50); - String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId); + String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[2], taskId); sb.append(destTask); diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 72a6e46200..21d2f5858e 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -17,6 +17,8 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; +import cn.escheduler.common.utils.IpUtils; +import cn.escheduler.common.utils.OSUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -58,31 +60,31 @@ public class TaskQueueImplTest { @Test public void testAdd(){ + //add - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_0_1_1_-1"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775,"+IpUtils.ipToLong(OSUtils.getHost())); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775"); List tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); - if(tasks.size() < 0){ + if(tasks.size() <= 0){ return; } //pop String node1 = tasks.get(0); - assertEquals(node1,"0_0000000001_1_0000000001"); + assertEquals(node1,"1_0_1_1_-1"); tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); - if(tasks.size() < 0){ + if(tasks.size() <= 0){ return; } String node2 = tasks.get(0); - assertEquals(node2,"0_0000000001_1_0000000001"); }