Browse Source

update worker group

pull/2/head
baoliang 6 years ago
parent
commit
e76d2dcde3
  1. 6
      escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
  2. 16
      escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java
  3. 9
      escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
  4. 33
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  5. 6
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  6. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  7. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java
  8. 13
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  9. 16
      escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java
  10. 3
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
  11. 50
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  12. 11
      escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

6
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java

@ -66,13 +66,15 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "receiversCc", required = false) String receiversCc,
@RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "runMode", required = false) RunMode runMode,
@RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority,
@RequestParam(value = "workerGroupId", required = false) int workerGroupId,
@RequestParam(value = "timeout", required = false) Integer timeout) { @RequestParam(value = "timeout", required = false) Integer timeout) {
try { try {
logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, "
+ "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, "
+ "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, timeout: {}", + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}",
loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy, loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy,
taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,timeout); taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,
workerGroupId, timeout);
if (timeout == null) { if (timeout == null) {
timeout = cn.escheduler.common.Constants.MAX_TASK_TIMEOUT; timeout = cn.escheduler.common.Constants.MAX_TASK_TIMEOUT;

16
escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java

@ -113,6 +113,12 @@ public class TaskNode {
*/ */
private Priority taskInstancePriority; private Priority taskInstancePriority;
/**
* worker group id
*/
private int workerGroupId;
/** /**
* task time out * task time out
*/ */
@ -224,6 +230,7 @@ public class TaskNode {
Objects.equals(extras, taskNode.extras) && Objects.equals(extras, taskNode.extras) &&
Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(runFlag, taskNode.runFlag) &&
Objects.equals(dependence, taskNode.dependence) && Objects.equals(dependence, taskNode.dependence) &&
Objects.equals(workerGroupId, taskNode.workerGroupId) &&
CollectionUtils.equalLists(depList, taskNode.depList); CollectionUtils.equalLists(depList, taskNode.depList);
} }
@ -303,6 +310,15 @@ public class TaskNode {
", dependence='" + dependence + '\'' + ", dependence='" + dependence + '\'' +
", taskInstancePriority=" + taskInstancePriority + ", taskInstancePriority=" + taskInstancePriority +
", timeout='" + timeout + '\'' + ", timeout='" + timeout + '\'' +
", workerGroupId='" + workerGroupId + '\'' +
'}'; '}';
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
} }

9
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java

@ -54,10 +54,17 @@ public interface ITaskQueue {
* an element pops out of the queue * an element pops out of the queue
* *
* @param key queue name * @param key queue name
* @param remove where remove the element
* @return * @return
*/ */
String poll(String key); String poll(String key, boolean remove);
/**
* remove a element from queue
* @param key
* @param value
*/
void removeNode(String key, String value);
/** /**
* add an element to the set * add an element to the set

33
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -137,10 +137,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* *
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low * 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* @param key task queue name * @param key task queue name
* @param remove whether remove the element
* @return the task id to be executed * @return the task id to be executed
*/ */
@Override @Override
public String poll(String key) { public String poll(String key, boolean remove) {
try{ try{
CuratorFramework zk = getZkClient(); CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
@ -181,18 +182,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String[] vals = targetTaskKey.split(Constants.UNDERLINE); String[] vals = targetTaskKey.split(Constants.UNDERLINE);
try{ if(remove){
zk.delete().forPath(taskIdPath); removeNode(key, targetTaskKey);
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_remove" + Constants.SINGLE_SLASH + targetTaskKey;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(targetTaskKey));
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, task detail: %s exception" ,targetTaskKey, vals[vals.length - 1]) ,e);
} }
logger.info("consume task: {},there still have {} tasks need to be executed", targetTaskKey, size - 1); logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
return targetTaskKey;
return vals[vals.length - 1];
}else{ }else{
logger.error("should not go here, task queue poll error, please check!"); logger.error("should not go here, task queue poll error, please check!");
} }
@ -204,6 +198,21 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
return null; return null;
} }
@Override
public void removeNode(String key, String nodeValue){
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
String taskIdPath = tasksQueuePath + nodeValue;
logger.info("consume task {}", taskIdPath);
try{
zk.delete().forPath(taskIdPath);
}catch(Exception e){
logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
}
}
/** /**

6
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -49,9 +49,9 @@ public class TaskQueueImplTest {
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4");
//pop //pop
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node1,"1"); assertEquals(node1,"1");
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node2,"2"); assertEquals(node2,"2");
//sadd //sadd
@ -99,7 +99,7 @@ public class TaskQueueImplTest {
} }
} }
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node1,"0"); assertEquals(node1,"0");
//clear all data //clear all data

14
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -88,6 +88,9 @@ public class ProcessDao extends AbstractBaseDao {
@Autowired @Autowired
private ResourceMapper resourceMapper; private ResourceMapper resourceMapper;
@Autowired
private WorkerGroupMapper workerGroupMapper;
@Autowired @Autowired
private ErrorCommandMapper errorCommandMapper; private ErrorCommandMapper errorCommandMapper;
@ -115,6 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
scheduleMapper = getMapper(ScheduleMapper.class); scheduleMapper = getMapper(ScheduleMapper.class);
udfFuncMapper = getMapper(UdfFuncMapper.class); udfFuncMapper = getMapper(UdfFuncMapper.class);
resourceMapper = getMapper(ResourceMapper.class); resourceMapper = getMapper(ResourceMapper.class);
workerGroupMapper = getMapper(WorkerGroupMapper.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance(); taskQueue = TaskQueueFactory.getTaskQueueInstance();
} }
@ -477,6 +481,7 @@ public class ProcessDao extends AbstractBaseDao {
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority // set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
processInstance.setWorkerGroupId(command.getWorkerGroupId());
return processInstance; return processInstance;
} }
@ -1575,6 +1580,15 @@ public class ProcessDao extends AbstractBaseDao {
return userMapper.queryQueueByProcessInstanceId(processInstanceId); return userMapper.queryQueueByProcessInstanceId(processInstanceId);
} }
/**
* query worker group by id
* @param workerGroupId
* @return
*/
public WorkerGroup queryWorkerGroupById(int workerGroupId){
return workerGroupMapper.queryById(workerGroupId);
}
} }

16
escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java

@ -91,6 +91,12 @@ public class Command {
private Date updateTime; private Date updateTime;
/**
*
*/
private int workerGroupId;
public Command(){ public Command(){
this.taskDependType = TaskDependType.TASK_POST; this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE; this.failureStrategy = FailureStrategy.CONTINUE;
@ -229,6 +235,15 @@ public class Command {
this.updateTime = updateTime; this.updateTime = updateTime;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "Command{" + return "Command{" +
@ -245,6 +260,7 @@ public class Command {
", startTime=" + startTime + ", startTime=" + startTime +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", updateTime=" + updateTime + ", updateTime=" + updateTime +
", workerGroupId=" + workerGroupId +
'}'; '}';
} }
} }

13
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@ -177,6 +177,12 @@ public class ProcessInstance {
*/ */
private Priority processInstancePriority; private Priority processInstancePriority;
/**
* worker group id
*/
private int workerGroupId;
public ProcessInstance(){ public ProcessInstance(){
} }
@ -481,6 +487,13 @@ public class ProcessInstance {
this.duration = duration; this.duration = duration;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {

16
escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java

@ -182,6 +182,13 @@ public class TaskInstance {
private String dependentResult; private String dependentResult;
/**
* worker group id
* @return
*/
private int workerGroupId;
public ProcessInstance getProcessInstance() { public ProcessInstance getProcessInstance() {
return processInstance; return processInstance;
} }
@ -439,6 +446,14 @@ public class TaskInstance {
this.processInstancePriority = processInstancePriority; this.processInstancePriority = processInstancePriority;
} }
public int getWorkerGroupId() {
return workerGroupId;
}
public void setWorkerGroupId(int workerGroupId) {
this.workerGroupId = workerGroupId;
}
@Override @Override
public String toString() { public String toString() {
return "TaskInstance{" + return "TaskInstance{" +
@ -470,6 +485,7 @@ public class TaskInstance {
", retryInterval=" + retryInterval + ", retryInterval=" + retryInterval +
", taskInstancePriority=" + taskInstancePriority + ", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", workGroupId=" + workerGroupId +
'}'; '}';
} }

3
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

@ -404,6 +404,9 @@ public class MasterExecThread implements Runnable {
taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority());
} }
int workerGroupId = taskNode.getWorkerGroupId();
taskInstance.setWorkerGroupId(workerGroupId);
} }
return taskInstance; return taskInstance;
} }

50
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -26,6 +26,7 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerGroup;
import cn.escheduler.server.zk.ZKWorkerClient; import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils; import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.Configuration;
@ -33,7 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -90,6 +93,42 @@ public class FetchTaskThread implements Runnable{
this.taskQueue = taskQueue; this.taskQueue = taskQueue;
} }
/**
* Check if the task runs on this worker
* @param taskInstance
* @param host
* @return
*/
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return false;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId == 0 ? processWorkerGroupId : taskWorkerGroupId);
if(taskWorkerGroupId == 0){
return true;
}
WorkerGroup workerGroup = processDao.queryWorkerGroupById(taskInstance.getWorkerGroupId());
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
return true;
}
String ips = workerGroup.getIpList();
if(ips == null){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
}
String[] ipArray = ips.split(",");
List<String> ipList = Arrays.asList(ipArray);
return ipList.contains(host);
}
@Override @Override
public void run() { public void run() {
@ -116,11 +155,13 @@ public class FetchTaskThread implements Runnable{
} }
// task instance id str // task instance id str
String taskInstIdStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
if (!StringUtils.isEmpty(taskInstIdStr)) { if (!StringUtils.isEmpty(taskQueueStr )) {
Date now = new Date();
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr); Integer taskId = Integer.parseInt(taskInstIdStr);
// find task instance by task id // find task instance by task id
@ -136,10 +177,11 @@ public class FetchTaskThread implements Runnable{
retryTimes--; retryTimes--;
} }
if (taskInstance == null) { if (taskInstance == null || !checkWorkerGroup(taskInstance, OSUtils.getHost())) {
logger.error("task instance is null. task id : {} ", taskId); logger.error("task instance is null. task id : {} ", taskId);
continue; continue;
} }
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
// set execute task worker host // set execute task worker host
taskInstance.setHost(OSUtils.getHost()); taskInstance.setHost(OSUtils.getHost());

11
escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java

@ -3,6 +3,9 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants; import cn.escheduler.common.Constants;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
@ -20,4 +23,12 @@ public class ZKWorkerClientTest {
} }
@Test
public void test(){
String ips = "";
List<String> ipList = Arrays.asList(ips.split(","));
System.out.println(ipList);
}
} }
Loading…
Cancel
Save