Browse Source

Merge pull request #513 from boandai/dev

make getting tasks from zk better performance
pull/2/head
easyscheduler 6 years ago committed by GitHub
parent
commit
b4e748686a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      docs/zh_CN/系统架构设计.md
  2. 12
      escheduler-api/src/main/resources/i18n/messages.properties
  3. 12
      escheduler-api/src/main/resources/i18n/messages_en_US.properties
  4. 10
      escheduler-api/src/main/resources/i18n/messages_zh_CN.properties
  5. 8
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  6. 9
      escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java
  7. 2
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java
  8. 103
      escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java
  9. 6
      escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
  10. 6
      escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
  11. 85
      escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
  12. 74
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  13. 6
      escheduler-server/pom.xml
  14. 163
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  15. 2
      install.sh

6
docs/zh_CN/系统架构设计.md

@ -13,13 +13,13 @@
**流程定义**:通过拖拽任务节点并建立任务节点的关联所形成的可视化**DAG**
**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成
**流程实例**:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成,流程定义每运行一次,产生一个流程实例
**任务实例**:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态
**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的
**任务类型**: 目前支持有SHELL、SQL、SUB_PROCESS(子流程)、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT(依赖),同时计划支持动态插件扩展,注意:其中子 **SUB_PROCESS** 也是一个单独的流程定义,是可以单独启动执行的
**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、调度、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流****恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用
**调度方式:** 系统支持基于cron表达式的定时调度和手动调度。命令类型支持:启动工作流、从当前节点开始执行、恢复被容错的工作流、恢复暂停流程、从失败节点开始执行、补数、定时、重跑、暂停、停止、恢复等待线程。其中 **恢复被容错的工作流****恢复等待线程** 两种命令类型是由调度内部控制使用,外部无法调用
**定时调度**:系统采用 **quartz** 分布式调度器,并同时支持cron表达式可视化的生成

12
escheduler-api/src/main/resources/i18n/messages.properties

@ -1,4 +1,16 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
GET_RECEIVER_CC_NOTES=query receiver cc
DESC=description
GROUP_NAME=group name
GROUP_TYPE=group type

12
escheduler-api/src/main/resources/i18n/messages_en_US.properties

@ -1,4 +1,16 @@
QUERY_SCHEDULE_LIST_NOTES=query schedule list
EXECUTE_PROCESS_TAG=execute process related operation
PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation
RUN_PROCESS_INSTANCE_NOTES=run process instance
START_NODE_LIST=start node list(node name)
TASK_DEPEND_TYPE=task depend type
COMMAND_TYPE=command type
RUN_MODE=run mode
TIMEOUT=timeout
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance
EXECUTE_TYPE=execute type
START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition
GET_RECEIVER_CC_NOTES=query receiver cc
DESC=description
GROUP_NAME=group name
GROUP_TYPE=group type

10
escheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@ -1,4 +1,14 @@
QUERY_SCHEDULE_LIST_NOTES=查询定时列表
PROCESS_INSTANCE_EXECUTOR_TAG=流程实例执行相关操作
RUN_PROCESS_INSTANCE_NOTES=运行流程实例
START_NODE_LIST=开始节点列表(节点name)
TASK_DEPEND_TYPE=任务依赖类型
COMMAND_TYPE=指令类型
RUN_MODE=运行模式
TIMEOUT=超时时间
EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=执行流程实例的各种操作(暂停、停止、重跑、恢复等)
EXECUTE_TYPE=执行类型
START_CHECK_PROCESS_DEFINITION_NOTES=检查流程定义
DESC=备注(描述)
GROUP_NAME=组名称
GROUP_TYPE=组类型

8
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -197,6 +197,11 @@ public final class Constants {
*/
public static final String SEMICOLON = ";";
/**
* DOT .
*/
public static final String DOT = ".";
/**
* ZOOKEEPER_SESSION_TIMEOUT
*/
@ -832,6 +837,7 @@ public final class Constants {
/**
*
* default worker group id
*/
public static final int DEFAULT_WORKER_ID = -1;
}

9
escheduler-common/src/main/java/cn/escheduler/common/queue/ITaskQueue.java

@ -24,20 +24,17 @@ public interface ITaskQueue {
/**
* take out all the elements
*
* this method has deprecated
* use checkTaskExists instead
*
* @param key
* @return
*/
@Deprecated
List<String> getAllTasks(String key);
/**
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
boolean checkTaskExists(String key, String task);
@ -54,10 +51,10 @@ public interface ITaskQueue {
* an element pops out of the queue
*
* @param key queue name
* @param remove whether remove the element
* @param n how many elements to poll
* @return
*/
String poll(String key, boolean remove);
List<String> poll(String key, int n);
/**
* remove a element from queue

2
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueFactory.java

@ -42,7 +42,7 @@ public class TaskQueueFactory {
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
// queueImplValue = StringUtils.trim(queueImplValue);
// queueImplValue = IpUtils.trim(queueImplValue);
// if (SCHEDULER_QUEUE_REDIS_IMPL.equals(queueImplValue)) {
// logger.info("task queue impl use reids ");

103
escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java

@ -19,6 +19,8 @@ package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
@ -26,10 +28,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
/**
* A singleton of a task queue implemented with zookeeper
@ -62,7 +61,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @param key task queue name
* @return
*/
@Deprecated
@Override
public List<String> getAllTasks(String key) {
try {
@ -80,7 +78,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* check task exists in the task queue or not
*
* @param key queue name
* @param task ${priority}_${processInstanceId}_${taskId}
* @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
@Override
@ -110,7 +108,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* add task to tasks queue
*
* @param key task queue name
* @param value ${priority}_${processInstanceId}_${taskId}
* @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
@Override
public void add(String key, String value) {
@ -118,9 +116,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
// Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
@ -132,16 +127,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
/**
* An element pops out of the queue <p>
* note:
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
* 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low
* @param key task queue name
* @param remove whether remove the element
* @return the task id to be executed
* @param tasksNum how many elements to poll
* @return the task ids to be executed
*/
@Override
public String poll(String key, boolean remove) {
public List<String> poll(String key, int tasksNum) {
try{
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
@ -149,55 +144,79 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
if(list != null && list.size() > 0){
String workerIp = OSUtils.getHost();
String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
int size = list.size();
String formatTargetTask = null;
String targetTaskKey = null;
Set<String> taskTreeSet = new TreeSet<>();
for (int i = 0; i < size; i++) {
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
if(taskDetailArrs.length == 4){
//向前版本兼容
if(taskDetailArrs.length >= 4){
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
if(i > 0){
int result = formatTask.compareTo(formatTargetTask);
if(result < 0){
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
if(taskDetailArrs.length > 4){
String taskHosts = taskDetailArrs[4];
//task can assign to any worker host if equals default ip value of worker server
if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){
String[] taskHostsArr = taskHosts.split(Constants.COMMA);
if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
continue;
}
}
}else{
formatTargetTask = formatTask;
targetTaskKey = taskDetail;
}
}else{
logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
taskTreeSet.add(formatTask);
}
}
if(formatTargetTask != null){
String taskIdPath = tasksQueuePath + targetTaskKey;
}
logger.info("consume task {}", taskIdPath);
List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
String[] vals = targetTaskKey.split(Constants.UNDERLINE);
logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
if(remove){
removeNode(key, targetTaskKey);
}
logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
return targetTaskKey;
}else{
logger.error("should not go here, task queue poll error, please check!");
}
return taskslist;
}else{
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
return null;
return new ArrayList<String>();
}
/**
* get task list from tree set
*
* @param tasksNum
* @param taskTreeSet
*/
public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
Iterator<String> iterator = taskTreeSet.iterator();
int j = 0;
List<String> taskslist = new ArrayList<>(tasksNum);
while(iterator.hasNext()){
if(j++ < tasksNum){
String task = iterator.next();
taskslist.add(task);
}
}
return taskslist;
}
@Override
public void removeNode(String key, String nodeValue){

6
escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java

@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
childrenList = zkClient.getChildren().forPath(masterZNodeParentPath);
}
} catch (Exception e) {
logger.warn(e.getMessage(),e);
// logger.warn(e.getMessage());
if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
logger.warn(e.getMessage(),e);
}
return childrenList.size();
}
return childrenList.size();

6
escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java

@ -37,6 +37,12 @@ public class OSUtilsTest {
// static HardwareAbstractionLayer hal = si.getHardware();
@Test
public void getHost(){
logger.info(OSUtils.getHost());
}
@Test
public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239

85
escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java

@ -17,12 +17,15 @@
package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@ -34,59 +37,62 @@ public class TaskQueueImplTest {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class);
ITaskQueue tasksQueue = null;
@Test
public void testTaskQueue(){
@Before
public void before(){
tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
}
@After
public void after(){
//clear all data
tasksQueue.delete();
}
@Test
public void testAdd(){
//add
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775");
tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775");
List<String> tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() < 0){
return;
}
//pop
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node1,"1");
String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
assertEquals(node2,"2");
//sadd
String task1 = "1.1.1.1-1-mr";
String task2 = "1.1.1.2-2-mr";
String task3 = "1.1.1.3-3-mr";
String task4 = "1.1.1.4-4-mr";
String task5 = "1.1.1.5-5-mr";
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5);
tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5);
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
//srem
tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5);
//smembers
Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4);
logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
String node1 = tasks.get(0);
assertEquals(node1,"0_0000000001_1_0000000001");
tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
if(tasks.size() < 0){
return;
}
String node2 = tasks.get(0);
assertEquals(node2,"0_0000000001_1_0000000001");
}
/**
* test one million data from zookeeper queue
*/
@Test
public void extremeTest(){
ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
//clear all data
tasksQueue.delete();
int total = 30 * 10000;
for(int i = 0; i < total; i++)
@ -99,14 +105,9 @@ public class TaskQueueImplTest {
}
}
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
assertEquals(node1,"0");
//clear all data
tasksQueue.delete();
}
}

74
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.mapper.*;
@ -110,7 +111,7 @@ public class ProcessDao extends AbstractBaseDao {
*/
@Override
protected void init() {
userMapper=getMapper(UserMapper.class);
userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class);
@ -976,11 +977,58 @@ public class ProcessDao extends AbstractBaseDao {
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
*
* @param task
* @param taskInstance
* @return
*/
private String taskZkInfo(TaskInstance task) {
return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
private String taskZkInfo(TaskInstance taskInstance) {
int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
StringBuilder sb = new StringBuilder(100);
sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
.append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
.append(taskInstance.getId()).append(Constants.UNDERLINE);
if(taskWorkerGroupId > 0){
//not to find data from db
WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
if(workerGroup == null ){
logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
sb.append(Constants.DEFAULT_WORKER_ID);
return sb.toString();
}
String ips = workerGroup.getIpList();
if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
sb.append(Constants.DEFAULT_WORKER_ID);
return sb.toString();
}
StringBuilder ipSb = new StringBuilder(100);
String[] ipArray = ips.split(COMMA);
for (String ip : ipArray) {
long ipLong = IpUtils.ipToLong(ip);
ipSb.append(ipLong).append(COMMA);
}
if(ipSb.length() > 0) {
ipSb.deleteCharAt(ipSb.length() - 1);
}
sb.append(ipSb);
}else{
sb.append(Constants.DEFAULT_WORKER_ID);
}
return sb.toString();
}
/**
@ -1634,5 +1682,23 @@ public class ProcessDao extends AbstractBaseDao {
}
/**
* get task worker group id
*
* @param taskInstance
* @return
*/
public int getTaskWorkerGroupId(TaskInstance taskInstance) {
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
return taskWorkerGroupId;
}
}

6
escheduler-server/pom.xml

@ -88,8 +88,12 @@
<groupId>cn.analysys</groupId>
<artifactId>escheduler-alert</artifactId>
</dependency>
<dependency>
<groupId>cn.analysys</groupId>
<artifactId>escheduler-api</artifactId>
</dependency>
</dependencies>
</dependencies>
<build>

163
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -28,8 +28,9 @@ import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerGroup;
import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -101,15 +102,7 @@ public class FetchTaskThread implements Runnable{
*/
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
int taskWorkerGroupId = taskInstance.getWorkerGroupId();
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
if(processInstance == null){
logger.error("cannot find the task:{} process instance", taskInstance.getId());
return false;
}
int processWorkerGroupId = processInstance.getWorkerGroupId();
taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
if(taskWorkerGroupId <= 0){
return true;
@ -120,118 +113,124 @@ public class FetchTaskThread implements Runnable{
return true;
}
String ips = workerGroup.getIpList();
if(ips == null){
if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
}
String[] ipArray = ips.split(",");
String[] ipArray = ips.split(Constants.COMMA);
List<String> ipList = Arrays.asList(ipArray);
return ipList.contains(host);
}
@Override
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
if(OSUtils.checkResource(this.conf, false)) {
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
mutex.acquire();
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
for (int i = 0; i < taskNum; i++) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums);
continue;
}
//whether have tasks, if no tasks , no need lock //get all tasks
List<String> tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
if(tasksQueueList.size() > 0){
// creating distributed locks, lock path /escheduler/lock/worker
String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
mutex.acquire();
// task instance id str
String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
List<String> taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
if (!StringUtils.isEmpty(taskQueueStr )) {
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isNotBlank(taskQueueStr )) {
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
if (!checkThreadCount(poolExecutor)) {
break;
}
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
Date now = new Date();
Integer taskId = Integer.parseInt(taskInstIdStr);
logger.info("worker fetch taskId : {} from queue ", taskId);
// find task instance by task id
TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskId);
retryTimes--;
}
logger.info("worker fetch taskId : {} from queue ", taskId);
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
continue;
}
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
int retryTimes = 30;
// mainly to wait for the master insert task to succeed
while (taskInstance == null && retryTimes > 0) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
taskInstance = processDao.findTaskInstanceById(taskId);
retryTimes--;
}
if (taskInstance == null ) {
logger.error("task instance is null. task id : {} ", taskId);
continue;
}
// set execute task worker host
taskInstance.setHost(OSUtils.getHost());
taskInstance.setStartTime(now);
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
logger.info("remove task:{} from queue", taskQueueStr);
// set execute task worker host
taskInstance.setHost(OSUtils.getHost());
taskInstance.setStartTime(now);
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
processDefine.getId(),
processInstance.getId(),
taskInstance.getId());
logger.info("task instance local execute path : {} ", execLocalPath);
// get local execute path
String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
processDefine.getId(),
processInstance.getId(),
taskInstance.getId());
logger.info("task instance local execute path : {} ", execLocalPath);
// set task execute path
taskInstance.setExecutePath(execLocalPath);
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger);
// set task execute path
taskInstance.setExecutePath(execLocalPath);
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
processInstance.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
}
}
}
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e);
}
finally {
}finally {
if (mutex != null){
try {
mutex.release();
@ -246,4 +245,18 @@ public class FetchTaskThread implements Runnable{
}
}
}
/**
*
* @param poolExecutor
* @return
*/
private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
int activeCount = poolExecutor.getActiveCount();
if (activeCount >= workerExecNums) {
logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
return false;
}
return true;
}
}

2
install.sh

@ -290,7 +290,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties
sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties

Loading…
Cancel
Save