Browse Source

some updates for TaskQueueZkImpl (#1874)

small changes, no need more people review, I will merge.
pull/2/head
Tboy 4 years ago committed by dailidong
parent
commit
38eaaa98e6
  1. 31
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

31
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

@ -30,7 +30,7 @@ import java.util.*;
/** /**
* A singleton of a task queue implemented with zookeeper * A singleton of a task queue implemented with zookeeper
* tasks queue implemention * tasks queue implementation
*/ */
@Service @Service
public class TaskQueueZkImpl implements ITaskQueue { public class TaskQueueZkImpl implements ITaskQueue {
@ -72,7 +72,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
} catch (Exception e) { } catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e); logger.error("get all tasks from tasks queue exception",e);
} }
return new ArrayList<>(); return Collections.emptyList();
} }
/** /**
@ -196,11 +196,11 @@ public class TaskQueueZkImpl implements ITaskQueue {
} }
} }
List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); List<String> tasksList = getTasksListFromTreeSet(tasksNum, taskTreeSet);
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size()); logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(tasksList.toArray()), size - tasksList.size());
return taskslist; return tasksList;
}else{ }else{
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
@ -208,7 +208,7 @@ public class TaskQueueZkImpl implements ITaskQueue {
} catch (Exception e) { } catch (Exception e) {
logger.error("add task to tasks queue exception",e); logger.error("add task to tasks queue exception",e);
} }
return new ArrayList<String>(); return Collections.emptyList();
} }
@ -221,15 +221,15 @@ public class TaskQueueZkImpl implements ITaskQueue {
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) { public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator(); Iterator<String> iterator = taskTreeSet.iterator();
int j = 0; int j = 0;
List<String> taskslist = new ArrayList<>(tasksNum); List<String> tasksList = new ArrayList<>(tasksNum);
while(iterator.hasNext()){ while(iterator.hasNext()){
if(j++ >= tasksNum){ if(j++ >= tasksNum){
break; break;
} }
String task = iterator.next(); String task = iterator.next();
taskslist.add(getOriginTaskFormat(task)); tasksList.add(getOriginTaskFormat(task));
} }
return taskslist; return tasksList;
} }
/** /**
@ -330,22 +330,13 @@ public class TaskQueueZkImpl implements ITaskQueue {
*/ */
@Override @Override
public Set<String> smembers(String key) { public Set<String> smembers(String key) {
Set<String> tasksSet = new HashSet<>();
try { try {
List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key)); List<String> list = zookeeperOperator.getChildrenKeys(getTasksPath(key));
return new HashSet<>(list);
for (String task : list) {
tasksSet.add(task);
}
return tasksSet;
} catch (Exception e) { } catch (Exception e) {
logger.error("get all tasks from tasks queue exception",e); logger.error("get all tasks from tasks queue exception",e);
} }
return Collections.emptySet();
return tasksSet;
} }
/** /**

Loading…
Cancel
Save