diff --git a/docs/zh_CN/系统架构设计.md b/docs/zh_CN/系统架构设计.md index a6e1645a4c..134684155d 100644 --- a/docs/zh_CN/系统架构设计.md +++ b/docs/zh_CN/系统架构设计.md @@ -13,13 +13,13 @@ **流程定义**:通过拖拽任务节点并建立任务节点的关联所形成的可视化**DAG** -**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成 +**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例 **任务实例**:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态 -**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的 +**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的 -**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流** 和 **恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用 +**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流** 和 **恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用 **定时调度**:系统采用 **quartz** 分布式调度器,并同时支持cron表达式可视化的生成 diff --git a/escheduler-api/src/main/resources/i18n/messages.properties b/escheduler-api/src/main/resources/i18n/messages.properties index ea29b7d329..a663c71013 100644 --- a/escheduler-api/src/main/resources/i18n/messages.properties +++ b/escheduler-api/src/main/resources/i18n/messages.properties @@ -1,4 +1,16 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list +EXECUTE_PROCESS_TAG=execute process related operation +PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation +RUN_PROCESS_INSTANCE_NOTES=run process instance +START_NODE_LIST=start node list(node name) +TASK_DEPEND_TYPE=task depend type +COMMAND_TYPE=command type +RUN_MODE=run mode +TIMEOUT=timeout +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance +EXECUTE_TYPE=execute type +START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition +GET_RECEIVER_CC_NOTES=query receiver cc DESC=description GROUP_NAME=group name GROUP_TYPE=group type diff --git a/escheduler-api/src/main/resources/i18n/messages_en_US.properties b/escheduler-api/src/main/resources/i18n/messages_en_US.properties index ea29b7d329..a663c71013 100644 --- a/escheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/escheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -1,4 +1,16 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list +EXECUTE_PROCESS_TAG=execute process related operation +PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation +RUN_PROCESS_INSTANCE_NOTES=run process instance +START_NODE_LIST=start node list(node name) +TASK_DEPEND_TYPE=task depend type +COMMAND_TYPE=command type +RUN_MODE=run mode +TIMEOUT=timeout +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance +EXECUTE_TYPE=execute type +START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition +GET_RECEIVER_CC_NOTES=query receiver cc DESC=description GROUP_NAME=group name GROUP_TYPE=group type diff --git a/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 7ee8e8b778..b0d6694d2b 100644 --- a/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/escheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -1,4 +1,14 @@ QUERY_SCHEDULE_LIST_NOTES=查询定时列表 +PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作 +RUN_PROCESS_INSTANCE_NOTES=运行流程实例 +START_NODE_LIST=开始节点列表(节点name) +TASK_DEPEND_TYPE=任务依赖类型 +COMMAND_TYPE=指令类型 +RUN_MODE=运行模式 +TIMEOUT=超时时间 +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=执行流程实例的各种操作(暂停、停止、重跑、恢复等) +EXECUTE_TYPE=执行类型 +START_CHECK_PROCESS_DEFINITION_NOTES=检查流程定义 DESC=备注(描述) GROUP_NAME=组名称 GROUP_TYPE=组类型 diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index e5137ef8fe..1812c09a67 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -197,6 +197,11 @@ public final class Constants { */ public static final String SEMICOLON = ";"; + /** + * DOT . + */ + public static final String DOT = "."; + /** * ZOOKEEPER_SESSION_TIMEOUT */ @@ -832,6 +837,7 @@ public final class Constants { /** - * + * default worker group id */ + public static final int DEFAULT_WORKER_ID = -1; } diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java index 106d6ff915..6f6e979797 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java @@ -24,20 +24,17 @@ public interface ITaskQueue { /** * take out all the elements * - * this method has deprecated - * use checkTaskExists instead * * @param key * @return */ - @Deprecated List getAllTasks(String key); /** * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ boolean checkTaskExists(String key, String task); @@ -54,10 +51,10 @@ public interface ITaskQueue { * an element pops out of the queue * * @param key queue name - * @param remove whether remove the element + * @param n how many elements to poll * @return */ - String poll(String key, boolean remove); + List poll(String key, int n); /** * remove a element from queue diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java index c8931064af..2d17481da4 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java @@ -42,7 +42,7 @@ public class TaskQueueFactory { public static ITaskQueue getTaskQueueInstance() { String queueImplValue = CommonUtils.getQueueImplValue(); if (StringUtils.isNotBlank(queueImplValue)) { -// queueImplValue = StringUtils.trim(queueImplValue); +// queueImplValue = IpUtils.trim(queueImplValue); // if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) { // logger.info("task queue impl use reids "); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index 28f696aa6e..88915b97cc 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -19,6 +19,8 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; import cn.escheduler.common.utils.Bytes; +import cn.escheduler.common.utils.IpUtils; +import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; @@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * A singleton of a task queue implemented with zookeeper @@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @param key task queue name * @return */ - @Deprecated @Override public List getAllTasks(String key) { try { @@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * check task exists in the task queue or not * * @param key queue name - * @param task ${priority}_${processInstanceId}_${taskId} + * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} * @return true if exists in the queue */ @Override @@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * add task to tasks queue * * @param key task queue name - * @param value ${priority}_${processInstanceId}_${taskId} + * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... */ @Override public void add(String key, String value) { @@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value; String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value)); -// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value; -// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, -// Bytes.toBytes(value)); logger.info("add task : {} to tasks queue , result success",result); } catch (Exception e) { logger.error("add task to tasks queue exception",e); @@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { /** * An element pops out of the queue

* note: - * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,... * The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low. * - * 流程实例优先级_流程实例id_任务优先级_任务id high <- low + * 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low * @param key task queue name - * @param remove whether remove the element - * @return the task id to be executed + * @param tasksNum how many elements to poll + * @return the task ids to be executed */ @Override - public String poll(String key, boolean remove) { + public List poll(String key, int tasksNum) { try{ CuratorFramework zk = getZkClient(); String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH; @@ -149,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { if(list != null && list.size() > 0){ + String workerIp = OSUtils.getHost(); + String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp)); + int size = list.size(); - String formatTargetTask = null; - String targetTaskKey = null; + + Set taskTreeSet = new TreeSet<>(); + for (int i = 0; i < size; i++) { + String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); - if(taskDetailArrs.length == 4){ + //向前版本兼容 + if(taskDetailArrs.length >= 4){ + //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3])); - if(i > 0){ - int result = formatTask.compareTo(formatTargetTask); - if(result < 0){ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; + if(taskDetailArrs.length > 4){ + String taskHosts = taskDetailArrs[4]; + + //task can assign to any worker host if equals default ip value of worker server + if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){ + String[] taskHostsArr = taskHosts.split(Constants.COMMA); + + if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ + continue; + } } - }else{ - formatTargetTask = formatTask; - targetTaskKey = taskDetail; } - }else{ - logger.error("task queue poll error, task detail :{} , please check!", taskDetail); + + taskTreeSet.add(formatTask); + } - } - if(formatTargetTask != null){ - String taskIdPath = tasksQueuePath + targetTaskKey; + } - logger.info("consume task {}", taskIdPath); + List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet); - String[] vals = targetTaskKey.split(Constants.UNDERLINE); + logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size()); - if(remove){ - removeNode(key, targetTaskKey); - } - logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1); - return targetTaskKey; - }else{ - logger.error("should not go here, task queue poll error, please check!"); - } + return taskslist; + }else{ + Thread.sleep(Constants.SLEEP_TIME_MILLIS); } } catch (Exception e) { logger.error("add task to tasks queue exception",e); } - return null; + return new ArrayList(); + } + + + /** + * get task list from tree set + * + * @param tasksNum + * @param taskTreeSet + */ + public List getTasksListFromTreeSet(int tasksNum, Set taskTreeSet) { + Iterator iterator = taskTreeSet.iterator(); + int j = 0; + List taskslist = new ArrayList<>(tasksNum); + while(iterator.hasNext()){ + if(j++ < tasksNum){ + String task = iterator.next(); + taskslist.add(task); + } + } + return taskslist; } + @Override public void removeNode(String key, String nodeValue){ diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java index 177669b43c..e2f064be13 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java @@ -312,7 +312,11 @@ public abstract class AbstractZKClient { childrenList = zkClient.getChildren().forPath(masterZNodeParentPath); } } catch (Exception e) { - logger.warn(e.getMessage(),e); +// logger.warn(e.getMessage()); + if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){ + logger.warn(e.getMessage(),e); + } + return childrenList.size(); } return childrenList.size(); diff --git a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java index 7d35bc8480..4bf152bbf2 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java @@ -37,6 +37,12 @@ public class OSUtilsTest { // static HardwareAbstractionLayer hal = si.getHardware(); + @Test + public void getHost(){ + logger.info(OSUtils.getHost()); + } + + @Test public void memoryUsage() { logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239 diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java index 03ba29a840..72a6e46200 100644 --- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java +++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java @@ -17,12 +17,15 @@ package cn.escheduler.common.queue; import cn.escheduler.common.Constants; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.List; import java.util.Random; import static org.junit.Assert.assertEquals; @@ -34,59 +37,62 @@ public class TaskQueueImplTest { private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class); + ITaskQueue tasksQueue = null; - @Test - public void testTaskQueue(){ + @Before + public void before(){ + tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + //clear all data + tasksQueue.delete(); - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); + } + + + @After + public void after(){ //clear all data tasksQueue.delete(); + } + + + @Test + public void testAdd(){ //add - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3"); - tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775"); + tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775"); + + List tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() < 0){ + return; + } //pop - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - assertEquals(node1,"1"); - String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); - assertEquals(node2,"2"); - - //sadd - String task1 = "1.1.1.1-1-mr"; - String task2 = "1.1.1.2-2-mr"; - String task3 = "1.1.1.3-3-mr"; - String task4 = "1.1.1.4-4-mr"; - String task5 = "1.1.1.5-5-mr"; - - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); - tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task - - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); - //srem - tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5); - //smembers - Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4); - logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray())); + String node1 = tasks.get(0); + assertEquals(node1,"0_0000000001_1_0000000001"); + + tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1); + + if(tasks.size() < 0){ + return; + } + + String node2 = tasks.get(0); + assertEquals(node2,"0_0000000001_1_0000000001"); } + + /** * test one million data from zookeeper queue */ @Test public void extremeTest(){ - ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance(); - //clear all data - tasksQueue.delete(); int total = 30 * 10000; for(int i = 0; i < total; i++) @@ -99,14 +105,9 @@ public class TaskQueueImplTest { } } - String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0); assertEquals(node1,"0"); - //clear all data - tasksQueue.delete(); - - - } } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 09e2149d88..0d5af285c3 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue; import cn.escheduler.common.queue.TaskQueueFactory; import cn.escheduler.common.task.subprocess.SubProcessParameters; import cn.escheduler.common.utils.DateUtils; +import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.dao.mapper.*; @@ -110,7 +111,7 @@ public class ProcessDao extends AbstractBaseDao { */ @Override protected void init() { - userMapper=getMapper(UserMapper.class); + userMapper = getMapper(UserMapper.class); processDefineMapper = getMapper(ProcessDefinitionMapper.class); processInstanceMapper = getMapper(ProcessInstanceMapper.class); dataSourceMapper = getMapper(DataSourceMapper.class); @@ -976,11 +977,58 @@ public class ProcessDao extends AbstractBaseDao { * * 流程实例优先级_流程实例id_任务优先级_任务id high <- low * - * @param task + * @param taskInstance * @return */ - private String taskZkInfo(TaskInstance task) { - return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId(); + private String taskZkInfo(TaskInstance taskInstance) { + + int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance); + + StringBuilder sb = new StringBuilder(100); + + sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE) + .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE) + .append(taskInstance.getId()).append(Constants.UNDERLINE); + + if(taskWorkerGroupId > 0){ + //not to find data from db + WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId); + if(workerGroup == null ){ + logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId()); + + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + String ips = workerGroup.getIpList(); + + if(StringUtils.isBlank(ips)){ + logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", + taskInstance.getId(), workerGroup.getId()); + sb.append(Constants.DEFAULT_WORKER_ID); + return sb.toString(); + } + + StringBuilder ipSb = new StringBuilder(100); + String[] ipArray = ips.split(COMMA); + + for (String ip : ipArray) { + long ipLong = IpUtils.ipToLong(ip); + ipSb.append(ipLong).append(COMMA); + } + + if(ipSb.length() > 0) { + ipSb.deleteCharAt(ipSb.length() - 1); + } + + sb.append(ipSb); + }else{ + sb.append(Constants.DEFAULT_WORKER_ID); + } + + + return sb.toString(); } /** @@ -1634,5 +1682,23 @@ public class ProcessDao extends AbstractBaseDao { } + /** + * get task worker group id + * + * @param taskInstance + * @return + */ + public int getTaskWorkerGroupId(TaskInstance taskInstance) { + int taskWorkerGroupId = taskInstance.getWorkerGroupId(); + ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId()); + if(processInstance == null){ + logger.error("cannot find the task:{} process instance", taskInstance.getId()); + } + int processWorkerGroupId = processInstance.getWorkerGroupId(); + + taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + return taskWorkerGroupId; + } + } diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml index ad21578d6c..657633c80b 100644 --- a/escheduler-server/pom.xml +++ b/escheduler-server/pom.xml @@ -88,8 +88,12 @@ cn.analysys escheduler-alert + + cn.analysys + escheduler-api + - + diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index 3ecafde57a..ba10fcb57d 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -28,8 +28,9 @@ import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.WorkerGroup; import cn.escheduler.server.zk.ZKWorkerClient; -import com.cronutils.utils.StringUtils; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,15 +102,7 @@ public class FetchTaskThread implements Runnable{ */ private boolean checkWorkerGroup(TaskInstance taskInstance, String host){ - int taskWorkerGroupId = taskInstance.getWorkerGroupId(); - ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); - if(processInstance == null){ - logger.error("cannot find the task:{} process instance", taskInstance.getId()); - return false; - } - int processWorkerGroupId = processInstance.getWorkerGroupId(); - - taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId); + int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance); if(taskWorkerGroupId <= 0){ return true; @@ -120,118 +113,124 @@ public class FetchTaskThread implements Runnable{ return true; } String ips = workerGroup.getIpList(); - if(ips == null){ + if(StringUtils.isBlank(ips)){ logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers", taskInstance.getId(), workerGroup.getId()); } - String[] ipArray = ips.split(","); + String[] ipArray = ips.split(Constants.COMMA); List ipList = Arrays.asList(ipArray); return ipList.contains(host); } + + @Override public void run() { while (Stopper.isRunning()){ InterProcessMutex mutex = null; try { - if(OSUtils.checkResource(this.conf, false)) { - // creating distributed locks, lock path /escheduler/lock/worker - String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); - mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); - mutex.acquire(); + ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; - ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService; + //check memory and cpu usage and threads + if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) { - for (int i = 0; i < taskNum; i++) { - - int activeCount = poolExecutor.getActiveCount(); - if (activeCount >= workerExecNums) { - logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums); - continue; - } + //whether have tasks, if no tasks , no need lock //get all tasks + List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE); + if(tasksQueueList.size() > 0){ + // creating distributed locks, lock path /escheduler/lock/worker + String zNodeLockPath = zkWorkerClient.getWorkerLockPath(); + mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath); + mutex.acquire(); // task instance id str - String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false); + List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum); - if (!StringUtils.isEmpty(taskQueueStr )) { + for(String taskQueueStr : taskQueueStrArr){ + if (StringUtils.isNotBlank(taskQueueStr )) { - String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); - String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; - Date now = new Date(); - Integer taskId = Integer.parseInt(taskInstIdStr); + if (!checkThreadCount(poolExecutor)) { + break; + } - // find task instance by task id - TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); + String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); + String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + Date now = new Date(); + Integer taskId = Integer.parseInt(taskInstIdStr); - logger.info("worker fetch taskId : {} from queue ", taskId); + // find task instance by task id + TaskInstance taskInstance = processDao.findTaskInstanceById(taskId); - int retryTimes = 30; - // mainly to wait for the master insert task to succeed - while (taskInstance == null && retryTimes > 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - taskInstance = processDao.findTaskInstanceById(taskId); - retryTimes--; - } + logger.info("worker fetch taskId : {} from queue ", taskId); - if (taskInstance == null ) { - logger.error("task instance is null. task id : {} ", taskId); - continue; - } - if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ - continue; - } - taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); - logger.info("remove task:{} from queue", taskQueueStr); + int retryTimes = 30; + // mainly to wait for the master insert task to succeed + while (taskInstance == null && retryTimes > 0) { + Thread.sleep(Constants.SLEEP_TIME_MILLIS); + taskInstance = processDao.findTaskInstanceById(taskId); + retryTimes--; + } + + if (taskInstance == null ) { + logger.error("task instance is null. task id : {} ", taskId); + continue; + } - // set execute task worker host - taskInstance.setHost(OSUtils.getHost()); - taskInstance.setStartTime(now); + if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){ + continue; + } + taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr); + logger.info("remove task:{} from queue", taskQueueStr); + // set execute task worker host + taskInstance.setHost(OSUtils.getHost()); + taskInstance.setStartTime(now); - // get process instance - ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - // get process define - ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); + // get process instance + ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + // get process define + ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); - taskInstance.setProcessInstance(processInstance); - taskInstance.setProcessDefine(processDefine); + taskInstance.setProcessInstance(processInstance); + taskInstance.setProcessDefine(processDefine); - // get local execute path - String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), - processDefine.getId(), - processInstance.getId(), - taskInstance.getId()); - logger.info("task instance local execute path : {} ", execLocalPath); + // get local execute path + String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(), + processDefine.getId(), + processInstance.getId(), + taskInstance.getId()); + logger.info("task instance local execute path : {} ", execLocalPath); - // set task execute path - taskInstance.setExecutePath(execLocalPath); - // check and create Linux users - FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, - processInstance.getTenantCode(), logger); + // set task execute path + taskInstance.setExecutePath(execLocalPath); - logger.info("task : {} ready to submit to task scheduler thread",taskId); - // submit task - workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + // check and create Linux users + FileUtils.createWorkDirAndUserIfAbsent(execLocalPath, + processInstance.getTenantCode(), logger); + logger.info("task : {} ready to submit to task scheduler thread",taskId); + // submit task + workerExecService.submit(new TaskScheduleThread(taskInstance, processDao)); + + } } } + } Thread.sleep(Constants.SLEEP_TIME_MILLIS); }catch (Exception e){ logger.error("fetch task thread exception : " + e.getMessage(),e); - } - finally { + }finally { if (mutex != null){ try { mutex.release(); @@ -246,4 +245,18 @@ public class FetchTaskThread implements Runnable{ } } } + + /** + * + * @param poolExecutor + * @return + */ + private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) { + int activeCount = poolExecutor.getActiveCount(); + if (activeCount >= workerExecNums) { + logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS); + return false; + } + return true; + } } \ No newline at end of file diff --git a/install.sh b/install.sh index 6fd9e83de2..73b081a1ab 100644 --- a/install.sh +++ b/install.sh @@ -290,7 +290,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties -sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties +#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties