diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index 7ba044c6f6..eb98bd192c 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -66,13 +66,15 @@ public class ExecutorController extends BaseController { @RequestParam(value = "receiversCc", required = false) String receiversCc, @RequestParam(value = "runMode", required = false) RunMode runMode, @RequestParam(value = "processInstancePriority", required = false) Priority processInstancePriority, + @RequestParam(value = "workerGroupId", required = false) int workerGroupId, @RequestParam(value = "timeout", required = false) Integer timeout) { try { logger.info("login user {}, start process instance, project name: {}, process definition id: {}, schedule time: {}, " + "failure policy: {}, node name: {}, node dep: {}, notify type: {}, " - + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, timeout: {}", + + "notify group id: {},receivers:{},receiversCc:{}, run mode: {},process instance priority:{}, workerGroupId: {}, timeout: {}", loginUser.getUserName(), projectName, processDefinitionId, scheduleTime, failureStrategy, - taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority,timeout); + taskDependType, warningType, warningGroupId,receivers,receiversCc,runMode,processInstancePriority, + workerGroupId, timeout); if (timeout == null) { timeout = cn.escheduler.common.Constants.MAX_TASK_TIMEOUT; diff --git a/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java b/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java index fb9ffe1145..e09775558c 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/model/TaskNode.java @@ -113,6 +113,12 @@ public class TaskNode { */ private Priority taskInstancePriority; + /** + * worker group id + */ + private int workerGroupId; + + /** * task time out */ @@ -224,6 +230,7 @@ public class TaskNode { Objects.equals(extras, taskNode.extras) && Objects.equals(runFlag, taskNode.runFlag) && Objects.equals(dependence, taskNode.dependence) && + Objects.equals(workerGroupId, taskNode.workerGroupId) && CollectionUtils.equalLists(depList, taskNode.depList); } @@ -303,6 +310,15 @@ public class TaskNode { ", dependence='" + dependence + '\'' + ", taskInstancePriority=" + taskInstancePriority + ", timeout='" + timeout + '\'' + + ", workerGroupId='" + workerGroupId + '\'' + '}'; } + + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java index 7c01bc4ec2..42629324a2 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java @@ -54,10 +54,17 @@ public interface ITaskQueue { * an element pops out of the queue * * @param key queue name + * @param remove where remove the element * @return */ - String poll(String key); + String poll(String key, boolean remove); + /** + * remove a element from queue + * @param key + * @param value + */ + void removeNode(String key, String value); /** * add an element to the set diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index ec11f6dbe6..1b39cb1e8d 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -137,10 +137,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * * 流程实例优先级_流程实例id_任务优先级_任务id high <- low * @param key task queue name + * @param remove whether remove the element * @return the task id to be executed */ @Override - public String poll(String key) { + public String poll(String key, boolean remove) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; @@ -181,18 +182,11 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String[] vals = targetTaskKey.split(Constants.UNDERLINE); - try{ - zk.delete().forPath(taskIdPath); - -// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_remove" + Constants.SINGLE_SLASH + targetTaskKey; -// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, -// Bytes.toBytes(targetTaskKey)); - }catch(Exception e){ - logger.error(String.format("delete task:%s from zookeeper fail, task detail: %s exception" ,targetTaskKey, vals[vals.length - 1]) ,e); + if(remove){ + removeNode(key, targetTaskKey); } - logger.info("consume task: {},there still have {} tasks need to be executed", targetTaskKey, size - 1); - - return vals[vals.length - 1]; + logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1); + return targetTaskKey; }else{ logger.error("should not go here, task queue poll error, please check!"); } @@ -204,6 +198,21 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { return null; } + @Override + public void removeNode(String key, String nodeValue){ + + CuratorFramework zk = getZkClient(); + String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; + String taskIdPath = tasksQueuePath + nodeValue; + logger.info("consume task {}", taskIdPath); + try{ + zk.delete().forPath(taskIdPath); + }catch(Exception e){ + logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e); + } + + } + /** diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 5bc8931839..03ba29a840 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -49,9 +49,9 @@ public class TaskQueueImplTest { tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); assertEquals(node1,"1"); - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); assertEquals(node2,"2"); //sadd @@ -99,7 +99,7 @@ public class TaskQueueImplTest { } } - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); assertEquals(node1,"0"); //clear all data diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index ec458f98a9..bb48d8e523 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -88,6 +88,9 @@ public class ProcessDao extends AbstractBaseDao { @Autowired private ResourceMapper resourceMapper; + @Autowired + private WorkerGroupMapper workerGroupMapper; + @Autowired private ErrorCommandMapper errorCommandMapper; @@ -115,6 +118,7 @@ public class ProcessDao extends AbstractBaseDao { scheduleMapper = getMapper(ScheduleMapper.class); udfFuncMapper = getMapper(UdfFuncMapper.class); resourceMapper = getMapper(ResourceMapper.class); + workerGroupMapper = getMapper(WorkerGroupMapper.class); taskQueue = TaskQueueFactory.getTaskQueueInstance(); } @@ -477,6 +481,7 @@ public class ProcessDao extends AbstractBaseDao { processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); + processInstance.setWorkerGroupId(command.getWorkerGroupId()); return processInstance; } @@ -1575,6 +1580,15 @@ public class ProcessDao extends AbstractBaseDao { return userMapper.queryQueueByProcessInstanceId(processInstanceId); } + /** + * query worker group by id + * @param workerGroupId + * @return + */ + public WorkerGroup queryWorkerGroupById(int workerGroupId){ + return workerGroupMapper.queryById(workerGroupId); + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java index 399e378ae8..e2a31a6702 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/Command.java @@ -91,6 +91,12 @@ public class Command { private Date updateTime; + /** + * + */ + private int workerGroupId; + + public Command(){ this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -229,6 +235,15 @@ public class Command { this.updateTime = updateTime; } + + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "Command{" + @@ -245,6 +260,7 @@ public class Command { ", startTime=" + startTime + ", processInstancePriority=" + processInstancePriority + ", updateTime=" + updateTime + + ", workerGroupId=" + workerGroupId + '}'; } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index 06656ebeba..b37450c330 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -177,6 +177,12 @@ public class ProcessInstance { */ private Priority processInstancePriority; + + /** + * worker group id + */ + private int workerGroupId; + public ProcessInstance(){ } @@ -481,6 +487,13 @@ public class ProcessInstance { this.duration = duration; } + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } @Override public String toString() { diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java index 9d63f7cc7e..f37b1e4349 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/TaskInstance.java @@ -182,6 +182,13 @@ public class TaskInstance { private String dependentResult; + /** + * worker group id + * @return + */ + private int workerGroupId; + + public ProcessInstance getProcessInstance() { return processInstance; } @@ -439,6 +446,14 @@ public class TaskInstance { this.processInstancePriority = processInstancePriority; } + public int getWorkerGroupId() { + return workerGroupId; + } + + public void setWorkerGroupId(int workerGroupId) { + this.workerGroupId = workerGroupId; + } + @Override public String toString() { return "TaskInstance{" + @@ -470,6 +485,7 @@ public class TaskInstance { ", retryInterval=" + retryInterval + ", taskInstancePriority=" + taskInstancePriority + ", processInstancePriority=" + processInstancePriority + + ", workGroupId=" + workerGroupId + '}'; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java index 101e72e300..b5ed0053a2 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java @@ -404,6 +404,9 @@ public class MasterExecThread implements Runnable { taskInstance.setTaskInstancePriority(taskNode.getTaskInstancePriority()); } + int workerGroupId = taskNode.getWorkerGroupId(); + taskInstance.setWorkerGroupId(workerGroupId); + } return taskInstance; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 64e5b57498..e77f1adc5e 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -26,6 +26,7 @@ import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.ProcessDefinition; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.dao.model.WorkerGroup; import cn.escheduler.server.zk.ZKWorkerClient; import com.cronutils.utils.StringUtils; import org.apache.commons.configuration.Configuration; @@ -33,7 +34,9 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Date; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -90,6 +93,42 @@ public class FetchTaskThread implements Runnable{ this.taskQueue = taskQueue; } + /** + * Check if the task runs on this worker + * @param taskInstance + * @param host + * @return + */ + private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ + + int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); + if(processInstance == null){ + logger.error("cannot find the task:{} process instance", taskInstance.getId()); + return false; + } + int processWorkerGroupId = processInstance.getWorkerGroupId(); + + taskWorkerGroupId = (taskWorkerGroupId == 0 ? processWorkerGroupId : taskWorkerGroupId); + + if(taskWorkerGroupId == 0){ + return true; + } + WorkerGroup workerGroup = processDao.queryWorkerGroupById(taskInstance.getWorkerGroupId()); + if(workerGroup == null ){ + logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId()); + return true; + } + String ips = workerGroup.getIpList(); + if(ips == null){ + logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", + taskInstance.getId(), workerGroup.getId()); + } + String[] ipArray = ips.split(","); + List ipList = Arrays.asList(ipArray); + return ipList.contains(host); + } + @Override public void run() { @@ -116,11 +155,13 @@ public class FetchTaskThread implements Runnable{ } // task instance id str - String taskInstIdStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE); + String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - if (!StringUtils.isEmpty(taskInstIdStr)) { - Date now = new Date(); + if (!StringUtils.isEmpty(taskQueueStr )) { + String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); + String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + Date now = new Date(); Integer taskId = Integer.parseInt(taskInstIdStr); // find task instance by task id @@ -136,10 +177,11 @@ public class FetchTaskThread implements Runnable{ retryTimes--; } - if (taskInstance == null) { + if (taskInstance == null || !checkWorkerGroup(taskInstance, OSUtils.getHost())) { logger.error("task instance is null. task id : {} ", taskId); continue; } + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); // set execute task worker host taskInstance.setHost(OSUtils.getHost()); diff --git a/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java b/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java index f18be54e0c..c8aa0930a2 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/zk/ZKWorkerClientTest.java @@ -3,6 +3,9 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import org.junit.Test; +import java.util.Arrays; +import java.util.List; + import static org.junit.Assert.*; /** @@ -20,4 +23,12 @@ public class ZKWorkerClientTest { } + @Test + public void test(){ + String ips = ""; + + List ipList = Arrays.asList(ips.split(",")); + + System.out.println(ipList); + } } \ No newline at end of file