Browse Source

update worker get task from queue

pull/2/head
lenboo 5 years ago
parent
commit
d720108e2f
  1. 31
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

31
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -22,6 +22,7 @@ import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
@ -157,7 +158,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
//向前版本兼容
//向前版本兼ProcessInstanceService
if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
@ -209,17 +210,33 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
while(iterator.hasNext()){
if(j++ < tasksNum){
String task = iterator.next();
String[] taskArray = task.split(Constants.UNDERLINE);
int processInstanceId = Integer.parseInt(taskArray[1]);
int taskId = Integer.parseInt(taskArray[3]);
String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE
+ taskArray[2] + Constants.UNDERLINE + taskId;
taskslist.add(destTask);
taskslist.add(getOriginTaskFormat(task));
}
}
return taskslist;
}
/**
* format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* processInstanceId and task id need to be convert to int.
* @param formatTask
* @return
*/
private String getOriginTaskFormat(String formatTask){
String[] taskArray = formatTask.split(Constants.UNDERLINE);
int processInstanceId = Integer.parseInt(taskArray[1]);
int taskId = Integer.parseInt(taskArray[3]);
String suffix = "";
for(int index =4; index < taskArray.length; index++){
suffix += taskArray[index] + Constants.UNDERLINE;
}
String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId);
if(StringUtils.isNotEmpty(suffix)){
destTask += Constants.UNDERLINE + suffix;
}
return destTask;
}
@Override
public void removeNode(String key, String nodeValue){

Loading…
Cancel
Save