diff --git a/README.md b/README.md index 19c1d2e45b..ffd8dcf396 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ Easy Scheduler ### 近期研发计划 -EasyScheduler的工作计划:研发计划 ,其中 In Develop卡片下是1.0.2版本的功能,TODO卡片是待做事项(包括 feature ideas) +EasyScheduler的工作计划:研发计划 ,其中 In Develop卡片下是1.1.0版本的功能,TODO卡片是待做事项(包括 feature ideas) ### 贡献代码 diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index d6872a278c..1938644724 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -24,7 +24,7 @@ import cn.escheduler.api.utils.Constants; import cn.escheduler.api.utils.Result; import cn.escheduler.common.enums.*; import cn.escheduler.dao.model.User; -import io.swagger.annotations.Api; +import io.swagger.annotations.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -38,9 +38,9 @@ import static cn.escheduler.api.enums.Status.*; /** - * execute task controller + * execute process controller */ -@ApiIgnore +@Api(tags = "PROCESS_INSTANCE_EXECUTOR_TAG", position = 1) @RestController @RequestMapping("projects/{projectName}/executors") public class ExecutorController extends BaseController { @@ -53,10 +53,27 @@ public class ExecutorController extends BaseController { /** * execute process instance */ + @ApiOperation(value = "startProcessInstance", notes= "RUN_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType ="FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType ="String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType ="TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType ="CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE",required = true, dataType ="WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID",required = true, dataType ="Int", example = "100"), + @ApiImplicitParam(name = "receivers", value = "RECEIVERS",dataType ="String" ), + @ApiImplicitParam(name = "receiversCc", value = "RECEIVERS_CC",dataType ="String" ), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE",dataType ="RunMode" ), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority" ), + @ApiImplicitParam(name = "workerGroupId", value = "WORKER_GROUP_ID", dataType = "Int",example = "100"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int",example = "100"), + }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) - public Result startProcessInstance(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, + public Result startProcessInstance(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "processDefinitionId") int processDefinitionId, @RequestParam(value = "scheduleTime", required = false) String scheduleTime, @RequestParam(value = "failureStrategy", required = true) FailureStrategy failureStrategy, @@ -102,10 +119,15 @@ public class ExecutorController extends BaseController { * @param processInstanceId * @return */ + @ApiOperation(value = "execute", notes= "EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "executeType", value = "EXECUTE_TYPE", required = true, dataType = "ExecuteType") + }) @PostMapping(value = "/execute") @ResponseStatus(HttpStatus.OK) - public Result execute(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @PathVariable String projectName, + public Result execute(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam("processInstanceId") Integer processInstanceId, @RequestParam("executeType") ExecuteType executeType ) { @@ -127,9 +149,13 @@ public class ExecutorController extends BaseController { * @param processDefinitionId * @return */ + @ApiOperation(value = "startCheckProcessDefinition", notes= "START_CHECK_PROCESS_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + }) @PostMapping(value = "/start-check") @ResponseStatus(HttpStatus.OK) - public Result startCheckProcessDefinition(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result startCheckProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "processDefinitionId") int processDefinitionId) { logger.info("login user {}, check process definition", loginUser.getUserName(), processDefinitionId); try { @@ -149,9 +175,16 @@ public class ExecutorController extends BaseController { * @param processDefinitionId * @return */ + @ApiIgnore + @ApiOperation(value = "getReceiverCc", notes= "GET_RECEIVER_CC_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processDefinitionId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100") + + }) @GetMapping(value = "/get-receiver-cc") @ResponseStatus(HttpStatus.OK) - public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result getReceiverCc(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId, @RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) { logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName()); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index 62332cbb75..2b8b6fd82f 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -219,6 +219,11 @@ public final class Constants { */ public static final String SEMICOLON = ";"; + /** + * DOT . + */ + public static final String DOT = "."; + /** * ZOOKEEPER_SESSION_TIMEOUT */ @@ -883,6 +888,11 @@ public final class Constants { */ public static final String LOGIN_USER_KEY_TAB_USERNAME = "login.user.keytab.username"; + /** + * default worker group id + */ + public static final int DEFAULT_WORKER_ID = -1; + /** * loginUserFromKeytab path */ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java index 106d6ff915..6f6e979797 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java @@ -24,20 +24,17 @@ public interface ITaskQueue { /** * take out all the elements * - * this method has deprecated - * use checkTaskExists instead * * @param key * @return */ - @Deprecated List getAllTasks(String key); /** * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ boolean checkTaskExists(String key, String task); @@ -54,10 +51,10 @@ public interface ITaskQueue { * an element pops out of the queue * * @param key queue name - * @param remove whether remove the element + * @param n how many elements to poll * @return */ - String poll(String key, boolean remove); + List poll(String key, int n); /** * remove a element from queue diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java index c8931064af..2d17481da4 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java @@ -42,7 +42,7 @@ public class TaskQueueFactory { public static ITaskQueue getTaskQueueInstance() { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { -// queueImplValue = StringUtils.trim(queueImplValue); +// queueImplValue = IpUtils.trim(queueImplValue); // if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) { // logger.info("task queue impl use reids "); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index 28f696aa6e..88915b97cc 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -19,6 +19,8 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; import cn.escheduler.common.utils.Bytes; +import cn.escheduler.common.utils.IpUtils; +import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; @@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * A singleton of a task queue implemented with zookeeper @@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param key task queue name * @return */ - @Deprecated @Override public List getAllTasks(String key) { try { @@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ @Override @@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * add task to tasks queue * * @param key task queue name - * @param value ${priority}_${processInstanceId}_${taskId} + * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override public void add(String key, String value) { @@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); -// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value; -// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, -// Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); } catch (Exception e) { logger.error("add task to tasks queue exception",e); @@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { /** * An element pops out of the queue

* note: - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * - * 流程实例优先级_流程实例id_任务优先级_任务id high <- low + * 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low * @param key task queue name - * @param remove whether remove the element - * @return the task id to be executed + * @param tasksNum how many elements to poll + * @return the task ids to be executed */ @Override - public String poll(String key, boolean remove) { + public List poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; @@ -149,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { if(list != null && list.size() > 0){ + String workerIp = OSUtils.getHost(); + String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp)); + int size = list.size(); - String formatTargetTask = null; - String targetTaskKey = null; + + Set taskTreeSet = new TreeSet<>(); + for (int i = 0; i < size; i++) { + String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); - if(taskDetailArrs.length == 4){ + //向前版本兼容 + if(taskDetailArrs.length >= 4){ + //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3])); - if(i > 0){ - int result = formatTask.compareTo(formatTargetTask); - if(result < 0){ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; + if(taskDetailArrs.length > 4){ + String taskHosts = taskDetailArrs[4]; + + //task can assign to any worker host if equals default ip value of worker server + if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){ + String[] taskHostsArr = taskHosts.split(Constants.COMMA); + + if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ + continue; + } } - }else{ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; } - }else{ - logger.error("task queue poll error, task detail :{} , please check!", taskDetail); + + taskTreeSet.add(formatTask); + } - } - if(formatTargetTask != null){ - String taskIdPath = tasksQueuePath + targetTaskKey; + } - logger.info("consume task {}", taskIdPath); + List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); - String[] vals = targetTaskKey.split(Constants.UNDERLINE); + logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size()); - if(remove){ - removeNode(key, targetTaskKey); - } - logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1); - return targetTaskKey; - }else{ - logger.error("should not go here, task queue poll error, please check!"); - } + return taskslist; + }else{ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } catch (Exception e) { logger.error("add task to tasks queue exception",e); } - return null; + return new ArrayList(); + } + + + /** + * get task list from tree set + * + * @param tasksNum + * @param taskTreeSet + */ + public List getTasksListFromTreeSet(int tasksNum, Set taskTreeSet) { + Iterator iterator = taskTreeSet.iterator(); + int j = 0; + List taskslist = new ArrayList<>(tasksNum); + while(iterator.hasNext()){ + if(j++ < tasksNum){ + String task = iterator.next(); + taskslist.add(task); + } + } + return taskslist; } + @Override public void removeNode(String key, String nodeValue){ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java new file mode 100644 index 0000000000..ddc520a876 --- /dev/null +++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.common.utils; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * http utils + */ +public class IpUtils { + + private static final Logger logger = LoggerFactory.getLogger(IpUtils.class); + public static final String DOT = "."; + + /** + * ip str to long

+ * + * @param ipStr ip string + */ + public static Long ipToLong(String ipStr) { + String[] ipSet = ipStr.split("\\" + DOT); + + return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]); + } + + /** + * long to ip + * @param ipLong the long number converted from IP + * @return String + */ + public static String longToIp(long ipLong) { + long[] ipNumbers = new long[4]; + long tmp = 0xFF; + ipNumbers[0] = ipLong >> 24 & tmp; + ipNumbers[1] = ipLong >> 16 & tmp; + ipNumbers[2] = ipLong >> 8 & tmp; + ipNumbers[3] = ipLong & tmp; + + StringBuilder sb = new StringBuilder(16); + sb.append(ipNumbers[0]).append(DOT) + .append(ipNumbers[1]).append(DOT) + .append(ipNumbers[2]).append(DOT) + .append(ipNumbers[3]); + return sb.toString(); + } + + + + public static void main(String[] args){ + long ipLong = ipToLong("11.3.4.5"); + logger.info(longToIp(ipLong)); + } +} diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 177669b43c..e2f064be13 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -312,7 +312,11 @@ public abstract class AbstractZKClient { childrenList = zkClient.getChildren().forPath(masterZNodeParentPath); } } catch (Exception e) { - logger.warn(e.getMessage(),e); +// logger.warn(e.getMessage()); + if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ + logger.warn(e.getMessage(),e); + } + return childrenList.size(); } return childrenList.size(); diff --git a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java index 7d35bc8480..4bf152bbf2 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java @@ -37,6 +37,12 @@ public class OSUtilsTest { // static HardwareAbstractionLayer hal = si.getHardware(); + @Test + public void getHost(){ + logger.info(OSUtils.getHost()); + } + + @Test public void memoryUsage() { logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239 diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 03ba29a840..72a6e46200 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -17,12 +17,15 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -34,59 +37,62 @@ public class TaskQueueImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); + ITaskQueue tasksQueue = null; - @Test - public void testTaskQueue(){ + @Before + public void before(){ + tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + //clear all data + tasksQueue.delete(); - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + } + + + @After + public void after(){ //clear all data tasksQueue.delete(); + } + + + @Test + public void testAdd(){ //add - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775"); + + List tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() < 0){ + return; + } //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - assertEquals(node1,"1"); - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - assertEquals(node2,"2"); - - //sadd - String task1 = "1.1.1.1-1-mr"; - String task2 = "1.1.1.2-2-mr"; - String task3 = "1.1.1.3-3-mr"; - String task4 = "1.1.1.4-4-mr"; - String task5 = "1.1.1.5-5-mr"; - - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task - - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); - //srem - tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5); - //smembers - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); + String node1 = tasks.get(0); + assertEquals(node1,"0_0000000001_1_0000000001"); + + tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() < 0){ + return; + } + + String node2 = tasks.get(0); + assertEquals(node2,"0_0000000001_1_0000000001"); } + + /** * test one million data from zookeeper queue */ @Test public void extremeTest(){ - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - //clear all data - tasksQueue.delete(); int total = 30 * 10000; for(int i = 0; i < total; i++) @@ -99,14 +105,9 @@ public class TaskQueueImplTest { } } - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); assertEquals(node1,"0"); - //clear all data - tasksQueue.delete(); - - - } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 17ca727340..8ba0f47960 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.task.subprocess.SubProcessParameters; import cn.escheduler.common.utils.DateUtils; +import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.mapper.*; @@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao { */ @Override protected void init() { - userMapper=getMapper(UserMapper.class); + userMapper = getMapper(UserMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class); @@ -1015,11 +1016,58 @@ public class ProcessDao extends AbstractBaseDao { * * 流程实例优先级_流程实例id_任务优先级_任务id high <- low * - * @param task + * @param taskInstance * @return */ - private String taskZkInfo(TaskInstance task) { - return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId(); + private String taskZkInfo(TaskInstance taskInstance) { + + int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance); + + StringBuilder sb = new StringBuilder(100); + + sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) + .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getId()).append(Constants.UNDERLINE); + + if(taskWorkerGroupId > 0){ + //not to find data from db + WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId); + if(workerGroup == null ){ + logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId()); + + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + String ips = workerGroup.getIpList(); + + if(StringUtils.isBlank(ips)){ + logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", + taskInstance.getId(), workerGroup.getId()); + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + StringBuilder ipSb = new StringBuilder(100); + String[] ipArray = ips.split(COMMA); + + for (String ip : ipArray) { + long ipLong = IpUtils.ipToLong(ip); + ipSb.append(ipLong).append(COMMA); + } + + if(ipSb.length() > 0) { + ipSb.deleteCharAt(ipSb.length() - 1); + } + + sb.append(ipSb); + }else{ + sb.append(Constants.DEFAULT_WORKER_ID); + } + + + return sb.toString(); } /** @@ -1683,5 +1731,24 @@ public class ProcessDao extends AbstractBaseDao { } + /** + * get task worker group id + * + * @param taskInstance + * @return + */ + public int getTaskWorkerGroupId(TaskInstance taskInstance) { + int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId()); + if(processInstance == null){ + logger.error("cannot find the task:{} process instance", taskInstance.getId()); + return Constants.DEFAULT_WORKER_ID; + } + int processWorkerGroupId = processInstance.getWorkerGroupId(); + + taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + return taskWorkerGroupId; + } + } diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml index ad21578d6c..6341539e4c 100644 --- a/escheduler-server/pom.xml +++ b/escheduler-server/pom.xml @@ -89,7 +89,7 @@ escheduler-alert - + diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java index e137824814..bf0dcbfe75 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java @@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable { if(Stopper.isRunning()) { // send heartbeat to zk if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { - logger.error("master send heartbeat to zk failed"); + logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server"); return; } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 1c6232bc9a..a960570ea5 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.*; import cn.escheduler.server.zk.ZKWorkerClient; -import com.cronutils.utils.StringUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{ */ private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ - int taskWorkerGroupId = taskInstance.getWorkerGroupId(); - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - if(processInstance == null){ - logger.error("cannot find the task:{} process instance", taskInstance.getId()); - return false; - } - int processWorkerGroupId = processInstance.getWorkerGroupId(); - - taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance); if(taskWorkerGroupId <= 0){ return true; @@ -117,99 +110,103 @@ public class FetchTaskThread implements Runnable{ return true; } String ips = workerGroup.getIpList(); - if(ips == null){ + if(StringUtils.isBlank(ips)){ logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", taskInstance.getId(), workerGroup.getId()); } - String[] ipArray = ips.split(","); + String[] ipArray = ips.split(Constants.COMMA); List ipList = Arrays.asList(ipArray); return ipList.contains(host); } + + @Override public void run() { while (Stopper.isRunning()){ InterProcessMutex mutex = null; try { - if(OSUtils.checkResource(this.conf, false)) { - // creating distributed locks, lock path /escheduler/lock/worker - String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); - mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); - mutex.acquire(); + ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; + //check memory and cpu usage and threads + if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) { - for (int i = 0; i < taskNum; i++) { - - int activeCount = poolExecutor.getActiveCount(); - if (activeCount >= workerExecNums) { - logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums); - continue; - } + //whether have tasks, if no tasks , no need lock //get all tasks + List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE); + if(tasksQueueList.size() > 0){ + // creating distributed locks, lock path /escheduler/lock/worker + String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); + mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); + mutex.acquire(); // task instance id str - String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum); - if (!StringUtils.isEmpty(taskQueueStr )) { + for(String taskQueueStr : taskQueueStrArr){ + if (StringUtils.isNotBlank(taskQueueStr )) { - String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); - String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; - Date now = new Date(); - Integer taskId = Integer.parseInt(taskInstIdStr); + if (!checkThreadCount(poolExecutor)) { + break; + } - // find task instance by task id - TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); + String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); + String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + Date now = new Date(); + Integer taskId = Integer.parseInt(taskInstIdStr); - logger.info("worker fetch taskId : {} from queue ", taskId); + // find task instance by task id + TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); - int retryTimes = 30; - // mainly to wait for the master insert task to succeed - while (taskInstance == null && retryTimes > 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - taskInstance = processDao.findTaskInstanceById(taskId); - retryTimes--; - } + logger.info("worker fetch taskId : {} from queue ", taskId); - if (taskInstance == null ) { - logger.error("task instance is null. task id : {} ", taskId); - continue; - } - if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ - continue; - } - taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); - logger.info("remove task:{} from queue", taskQueueStr); + int retryTimes = 30; + // mainly to wait for the master insert task to succeed + while (taskInstance == null && retryTimes > 0) { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + taskInstance = processDao.findTaskInstanceById(taskId); + retryTimes--; + } + + if (taskInstance == null ) { + logger.error("task instance is null. task id : {} ", taskId); + continue; + } - // set execute task worker host - taskInstance.setHost(OSUtils.getHost()); - taskInstance.setStartTime(now); + if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ + continue; + } + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + logger.info("remove task:{} from queue", taskQueueStr); + // set execute task worker host + taskInstance.setHost(OSUtils.getHost()); + taskInstance.setStartTime(now); - // get process instance - ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + // get process instance + ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - // get process define - ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); + // get process define + ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - taskInstance.setProcessInstance(processInstance); - taskInstance.setProcessDefine(processDefine); + taskInstance.setProcessInstance(processInstance); + taskInstance.setProcessDefine(processDefine); - // get local execute path - String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), - processDefine.getId(), - processInstance.getId(), - taskInstance.getId()); - logger.info("task instance local execute path : {} ", execLocalPath); + // get local execute path + String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), + processDefine.getId(), + processInstance.getId(), + taskInstance.getId()); + logger.info("task instance local execute path : {} ", execLocalPath); - // set task execute path - taskInstance.setExecutePath(execLocalPath); + // set task execute path + taskInstance.setExecutePath(execLocalPath); Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), processDefine.getUserId()); @@ -218,21 +215,22 @@ public class FetchTaskThread implements Runnable{ FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, tenant.getTenantCode(), logger); - logger.info("task : {} ready to submit to task scheduler thread",taskId); - // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + logger.info("task : {} ready to submit to task scheduler thread",taskId); + // submit task + workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + } } } + } Thread.sleep(Constants.SLEEP_TIME_MILLIS); }catch (Exception e){ logger.error("fetch task thread exception : " + e.getMessage(),e); - } - finally { + }finally { if (mutex != null){ try { mutex.release(); @@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{ } } } + + /** + * + * @param poolExecutor + * @return + */ + private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) { + int activeCount = poolExecutor.getActiveCount(); + if (activeCount >= workerExecNums) { + logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS); + return false; + } + return true; + } } \ No newline at end of file diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index 26d682f132..09f6467aad 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask { String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim(); if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){ Map mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName)); - if(!(Boolean) mailResult.get(Constants.STATUS)){ + if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){ throw new RuntimeException("send mail failed!"); } }else{ diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue index 22d48782d2..914bab2812 100644 --- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue +++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue @@ -6,7 +6,7 @@

IP: {{item.host}} - {{$t('Port')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.port}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -93,4 +93,4 @@ \ No newline at end of file + diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue index 3cf0993415..960beeb14a 100644 --- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue +++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue @@ -6,7 +6,7 @@
IP: {{item.host}} - {{$t('Port')}}: {{item.port}} + {{$t('Process Pid')}}: {{item.port}} {{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -94,4 +94,4 @@ \ No newline at end of file + diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js index db6c8aa261..2259dea9cd 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js @@ -37,7 +37,7 @@ let warningTypeList = [ ] const isEmial = (val) => { - let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line + let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line return regEmail.test(val) } diff --git a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue index b02db7848e..574a995ec4 100644 --- a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue +++ b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue @@ -131,7 +131,7 @@ } }, _verification () { - let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line + let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line // Mobile phone number regular let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line diff --git a/install.sh b/install.sh index 5a98e46abd..d58be482fd 100644 --- a/install.sh +++ b/install.sh @@ -330,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties -sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties +#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties