getAllTasks(String key) {
try {
@@ -80,7 +79,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* check task exists in the task queue or not
*
* @param key queue name
- * @param task ${priority}_${processInstanceId}_${taskId}
+ * @param task ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
* @return true if exists in the queue
*/
@Override
@@ -110,7 +109,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* add task to tasks queue
*
* @param key task queue name
- * @param value ${priority}_${processInstanceId}_${taskId}
+ * @param value ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
*/
@Override
public void add(String key, String value) {
@@ -118,9 +117,6 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));
-// String path = conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + Constants.SCHEDULER_TASKS_QUEUE + "_add" + Constants.SINGLE_SLASH + value;
-// getZkClient().create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,
-// Bytes.toBytes(value));
logger.info("add task : {} to tasks queue , result success",result);
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
@@ -132,16 +128,16 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
/**
* An element pops out of the queue
* note:
- * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
+ * ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
* The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
*
- * 流程实例优先级_流程实例id_任务优先级_任务id high <- low
+ * 流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,... high <- low
* @param key task queue name
- * @param remove whether remove the element
- * @return the task id to be executed
+ * @param tasksNum how many elements to poll
+ * @return the task ids to be executed
*/
@Override
- public String poll(String key, boolean remove) {
+ public List poll(String key, int tasksNum) {
try{
CuratorFramework zk = getZkClient();
String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
@@ -149,53 +145,100 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
if(list != null && list.size() > 0){
+ String workerIp = OSUtils.getHost();
+ String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));
+
int size = list.size();
- String formatTargetTask = null;
- String targetTaskKey = null;
+
+ Set taskTreeSet = new TreeSet<>();
+
for (int i = 0; i < size; i++) {
+
String taskDetail = list.get(i);
String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);
- if(taskDetailArrs.length == 4){
+ //forward compatibility 向前版本兼容
+ if(taskDetailArrs.length >= 4){
+
//format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
- if(i > 0){
- int result = formatTask.compareTo(formatTargetTask);
- if(result < 0){
- formatTargetTask = formatTask;
- targetTaskKey = taskDetail;
+ if(taskDetailArrs.length > 4){
+ String taskHosts = taskDetailArrs[4];
+
+ //task can assign to any worker host if equals default ip value of worker server
+ if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){
+ String[] taskHostsArr = taskHosts.split(Constants.COMMA);
+ if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
+ continue;
+ }
}
- }else{
- formatTargetTask = formatTask;
- targetTaskKey = taskDetail;
}
- }else{
- logger.error("task queue poll error, task detail :{} , please check!", taskDetail);
+
+ taskTreeSet.add(formatTask);
+
}
- }
- if(formatTargetTask != null){
- String taskIdPath = tasksQueuePath + targetTaskKey;
+ }
- logger.info("consume task {}", taskIdPath);
+ List taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
- String[] vals = targetTaskKey.split(Constants.UNDERLINE);
+ logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
- if(remove){
- removeNode(key, targetTaskKey);
- }
- logger.info("consume task: {},there still have {} tasks need to be executed", vals[vals.length - 1], size - 1);
- return targetTaskKey;
- }else{
- logger.error("should not go here, task queue poll error, please check!");
- }
+ return taskslist;
+ }else{
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
logger.error("add task to tasks queue exception",e);
}
- return null;
+ return new ArrayList();
+ }
+
+
+ /**
+ * get task list from tree set
+ *
+ * @param tasksNum
+ * @param taskTreeSet
+ */
+ public List getTasksListFromTreeSet(int tasksNum, Set taskTreeSet) {
+ Iterator iterator = taskTreeSet.iterator();
+ int j = 0;
+ List taskslist = new ArrayList<>(tasksNum);
+ while(iterator.hasNext()){
+ if(j++ < tasksNum){
+ String task = iterator.next();
+
+ taskslist.add(getOriginTaskFormat(task));
+ }
+ }
+ return taskslist;
+ }
+
+ /**
+ * format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
+ * processInstanceId and task id need to be convert to int.
+ * @param formatTask
+ * @return
+ */
+ private String getOriginTaskFormat(String formatTask){
+ String[] taskArray = formatTask.split(Constants.UNDERLINE);
+ int processInstanceId = Integer.parseInt(taskArray[1]);
+ int taskId = Integer.parseInt(taskArray[3]);
+
+ StringBuilder sb = new StringBuilder(50);
+ String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId);
+
+ sb.append(destTask);
+
+ if(taskArray.length > 4){
+ for(int index = 4; index < taskArray.length; index++){
+ sb.append(Constants.UNDERLINE).append(taskArray[index]);
+ }
+ }
+ return sb.toString();
}
@Override
diff --git a/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
new file mode 100644
index 0000000000..ddc520a876
--- /dev/null
+++ b/escheduler-common/src/main/java/cn/escheduler/common/utils/IpUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package cn.escheduler.common.utils;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * http utils
+ */
+public class IpUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(IpUtils.class);
+ public static final String DOT = ".";
+
+ /**
+ * ip str to long
+ *
+ * @param ipStr ip string
+ */
+ public static Long ipToLong(String ipStr) {
+ String[] ipSet = ipStr.split("\\" + DOT);
+
+ return Long.parseLong(ipSet[0]) << 24 | Long.parseLong(ipSet[1]) << 16 | Long.parseLong(ipSet[2]) << 8 | Long.parseLong(ipSet[3]);
+ }
+
+ /**
+ * long to ip
+ * @param ipLong the long number converted from IP
+ * @return String
+ */
+ public static String longToIp(long ipLong) {
+ long[] ipNumbers = new long[4];
+ long tmp = 0xFF;
+ ipNumbers[0] = ipLong >> 24 & tmp;
+ ipNumbers[1] = ipLong >> 16 & tmp;
+ ipNumbers[2] = ipLong >> 8 & tmp;
+ ipNumbers[3] = ipLong & tmp;
+
+ StringBuilder sb = new StringBuilder(16);
+ sb.append(ipNumbers[0]).append(DOT)
+ .append(ipNumbers[1]).append(DOT)
+ .append(ipNumbers[2]).append(DOT)
+ .append(ipNumbers[3]);
+ return sb.toString();
+ }
+
+
+
+ public static void main(String[] args){
+ long ipLong = ipToLong("11.3.4.5");
+ logger.info(longToIp(ipLong));
+ }
+}
diff --git a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
index 177669b43c..e2f064be13 100644
--- a/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
+++ b/escheduler-common/src/main/java/cn/escheduler/common/zk/AbstractZKClient.java
@@ -312,7 +312,11 @@ public abstract class AbstractZKClient {
childrenList = zkClient.getChildren().forPath(masterZNodeParentPath);
}
} catch (Exception e) {
- logger.warn(e.getMessage(),e);
+// logger.warn(e.getMessage());
+ if(!e.getMessage().contains("java.lang.IllegalStateException: instance must be started")){
+ logger.warn(e.getMessage(),e);
+ }
+
return childrenList.size();
}
return childrenList.size();
diff --git a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
index 7d35bc8480..4bf152bbf2 100644
--- a/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
+++ b/escheduler-common/src/test/java/cn/escheduler/common/os/OSUtilsTest.java
@@ -37,6 +37,12 @@ public class OSUtilsTest {
// static HardwareAbstractionLayer hal = si.getHardware();
+ @Test
+ public void getHost(){
+ logger.info(OSUtils.getHost());
+ }
+
+
@Test
public void memoryUsage() {
logger.info("memoryUsage : {}", OSUtils.memoryUsage());// 0.3361799418926239
diff --git a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
index 03ba29a840..72a6e46200 100644
--- a/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
+++ b/escheduler-common/src/test/java/cn/escheduler/common/queue/TaskQueueImplTest.java
@@ -17,12 +17,15 @@
package cn.escheduler.common.queue;
import cn.escheduler.common.Constants;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
@@ -34,59 +37,62 @@ public class TaskQueueImplTest {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueImplTest.class);
+ ITaskQueue tasksQueue = null;
- @Test
- public void testTaskQueue(){
+ @Before
+ public void before(){
+ tasksQueue = TaskQueueFactory.getTaskQueueInstance();
+ //clear all data
+ tasksQueue.delete();
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
+ }
+
+
+ @After
+ public void after(){
//clear all data
tasksQueue.delete();
+ }
+
+
+ @Test
+ public void testAdd(){
//add
- tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1");
- tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"2");
- tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"3");
- tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"4");
+ tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_1_1_2130706433,3232236775");
+ tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"0_1_1_1_2130706433,3232236775");
+ tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_1_0_1_2130706433,3232236775");
+ tasksQueue.add(Constants.SCHEDULER_TASKS_QUEUE,"1_2_1_1_2130706433,3232236775");
+
+ List tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
+
+ if(tasks.size() < 0){
+ return;
+ }
//pop
- String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
- assertEquals(node1,"1");
- String node2 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
- assertEquals(node2,"2");
-
- //sadd
- String task1 = "1.1.1.1-1-mr";
- String task2 = "1.1.1.2-2-mr";
- String task3 = "1.1.1.3-3-mr";
- String task4 = "1.1.1.4-4-mr";
- String task5 = "1.1.1.5-5-mr";
-
- tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task1);
- tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task2);
- tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task3);
- tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task4);
- tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5);
- tasksQueue.sadd(Constants.SCHEDULER_TASKS_KILL,task5); //repeat task
-
- Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),5);
- logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
- //srem
- tasksQueue.srem(Constants.SCHEDULER_TASKS_KILL,task5);
- //smembers
- Assert.assertEquals(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).size(),4);
- logger.info(Arrays.toString(tasksQueue.smembers(Constants.SCHEDULER_TASKS_KILL).toArray()));
+ String node1 = tasks.get(0);
+ assertEquals(node1,"0_0000000001_1_0000000001");
+
+ tasks = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1);
+
+ if(tasks.size() < 0){
+ return;
+ }
+
+ String node2 = tasks.get(0);
+ assertEquals(node2,"0_0000000001_1_0000000001");
}
+
+
/**
* test one million data from zookeeper queue
*/
@Test
public void extremeTest(){
- ITaskQueue tasksQueue = TaskQueueFactory.getTaskQueueInstance();
- //clear all data
- tasksQueue.delete();
int total = 30 * 10000;
for(int i = 0; i < total; i++)
@@ -99,14 +105,9 @@ public class TaskQueueImplTest {
}
}
- String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
+ String node1 = tasksQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, 1).get(0);
assertEquals(node1,"0");
- //clear all data
- tasksQueue.delete();
-
-
-
}
}
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
index 17ca727340..d393339a5e 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
+++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
@@ -25,6 +25,7 @@ import cn.escheduler.common.queue.ITaskQueue;
import cn.escheduler.common.queue.TaskQueueFactory;
import cn.escheduler.common.task.subprocess.SubProcessParameters;
import cn.escheduler.common.utils.DateUtils;
+import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.common.utils.ParameterUtils;
import cn.escheduler.dao.mapper.*;
@@ -117,7 +118,7 @@ public class ProcessDao extends AbstractBaseDao {
*/
@Override
protected void init() {
- userMapper=getMapper(UserMapper.class);
+ userMapper = getMapper(UserMapper.class);
processDefineMapper = getMapper(ProcessDefinitionMapper.class);
processInstanceMapper = getMapper(ProcessInstanceMapper.class);
dataSourceMapper = getMapper(DataSourceMapper.class);
@@ -492,7 +493,8 @@ public class ProcessDao extends AbstractBaseDao {
processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson());
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
- processInstance.setWorkerGroupId(command.getWorkerGroupId());
+ int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId();
+ processInstance.setWorkerGroupId(workerGroupId);
processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantId(processDefinition.getTenantId());
return processInstance;
@@ -1015,11 +1017,58 @@ public class ProcessDao extends AbstractBaseDao {
*
* 流程实例优先级_流程实例id_任务优先级_任务id high <- low
*
- * @param task
+ * @param taskInstance
* @return
*/
- private String taskZkInfo(TaskInstance task) {
- return String.valueOf(task.getProcessInstancePriority().ordinal()) + Constants.UNDERLINE + task.getProcessInstanceId() + Constants.UNDERLINE + task.getTaskInstancePriority().ordinal() + Constants.UNDERLINE + task.getId();
+ private String taskZkInfo(TaskInstance taskInstance) {
+
+ int taskWorkerGroupId = getTaskWorkerGroupId(taskInstance);
+
+ StringBuilder sb = new StringBuilder(100);
+
+ sb.append(taskInstance.getProcessInstancePriority().ordinal()).append(Constants.UNDERLINE)
+ .append(taskInstance.getProcessInstanceId()).append(Constants.UNDERLINE)
+ .append(taskInstance.getTaskInstancePriority().ordinal()).append(Constants.UNDERLINE)
+ .append(taskInstance.getId()).append(Constants.UNDERLINE);
+
+ if(taskWorkerGroupId > 0){
+ //not to find data from db
+ WorkerGroup workerGroup = queryWorkerGroupById(taskWorkerGroupId);
+ if(workerGroup == null ){
+ logger.info("task {} cannot find the worker group, use all worker instead.", taskInstance.getId());
+
+ sb.append(Constants.DEFAULT_WORKER_ID);
+ return sb.toString();
+ }
+
+ String ips = workerGroup.getIpList();
+
+ if(StringUtils.isBlank(ips)){
+ logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
+ taskInstance.getId(), workerGroup.getId());
+ sb.append(Constants.DEFAULT_WORKER_ID);
+ return sb.toString();
+ }
+
+ StringBuilder ipSb = new StringBuilder(100);
+ String[] ipArray = ips.split(COMMA);
+
+ for (String ip : ipArray) {
+ long ipLong = IpUtils.ipToLong(ip);
+ ipSb.append(ipLong).append(COMMA);
+ }
+
+ if(ipSb.length() > 0) {
+ ipSb.deleteCharAt(ipSb.length() - 1);
+ }
+
+ sb.append(ipSb);
+ }else{
+ sb.append(Constants.DEFAULT_WORKER_ID);
+ }
+
+
+ return sb.toString();
}
/**
@@ -1683,5 +1732,24 @@ public class ProcessDao extends AbstractBaseDao {
}
+ /**
+ * get task worker group id
+ *
+ * @param taskInstance
+ * @return
+ */
+ public int getTaskWorkerGroupId(TaskInstance taskInstance) {
+ int taskWorkerGroupId = taskInstance.getWorkerGroupId();
+ ProcessInstance processInstance = findProcessInstanceByTaskId(taskInstance.getId());
+ if(processInstance == null){
+ logger.error("cannot find the task:{} process instance", taskInstance.getId());
+ return Constants.DEFAULT_WORKER_ID;
+ }
+ int processWorkerGroupId = processInstance.getWorkerGroupId();
+
+ taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
+ return taskWorkerGroupId;
+ }
+
}
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java
index 9d2ab80f21..c57d15128d 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java
+++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java
@@ -274,5 +274,21 @@ public interface ResourceMapper {
@SelectProvider(type = ResourceMapperProvider.class, method = "queryTenantCodeByResourceName")
String queryTenantCodeByResourceName(@Param("resName") String resName);
-
+ /**
+ * query resource list that the appointed user has permission
+ * @param type
+ * @return
+ */
+ @Results(value = {@Result(property = "id", column = "id", id = true, javaType = int.class, jdbcType = JdbcType.INTEGER),
+ @Result(property = "alias", column = "alias", javaType = String.class, jdbcType = JdbcType.VARCHAR),
+ @Result(property = "fileName", column = "file_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
+ @Result(property = "desc", column = "desc", javaType = String.class, jdbcType = JdbcType.VARCHAR),
+ @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER),
+ @Result(property = "type", column = "type", typeHandler = EnumOrdinalTypeHandler.class, javaType = ResourceType.class, jdbcType = JdbcType.TINYINT),
+ @Result(property = "size", column = "size", javaType = Long.class, jdbcType = JdbcType.BIGINT),
+ @Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
+ @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE)
+ })
+ @SelectProvider(type = ResourceMapperProvider.class, method = "listAllResourceByType")
+ List listAllResourceByType(@Param("type") Integer type);
}
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java
index 4314b8f584..a943bb6ba4 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java
+++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java
@@ -295,4 +295,19 @@ public class ResourceMapperProvider {
WHERE("type = #{type} and user_id = #{userId}");
}}.toString();
}
+
+ /**
+ * list all resource by type
+ *
+ * @param parameter
+ * @return
+ */
+ public String listAllResourceByType(Map parameter) {
+ return new SQL() {{
+ SELECT("*");
+ FROM(TABLE_NAME);
+ WHERE("type = #{type}");
+ }}.toString();
+ }
+
}
diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
index 24902c0121..5c9418ca72 100644
--- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
+++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
@@ -194,6 +194,21 @@ public class ProcessInstance {
*/
private int tenantId;
+ /**
+ * worker group name. for api.
+ */
+ private String workerGroupName;
+
+ /**
+ * receivers for api
+ */
+ private String receivers;
+
+ /**
+ * receivers cc for api
+ */
+ private String receiversCc;
+
public ProcessInstance(){
}
@@ -560,4 +575,28 @@ public class ProcessInstance {
public int getTenantId() {
return this.tenantId ;
}
+
+ public String getWorkerGroupName() {
+ return workerGroupName;
+ }
+
+ public void setWorkerGroupName(String workerGroupName) {
+ this.workerGroupName = workerGroupName;
+ }
+
+ public String getReceivers() {
+ return receivers;
+ }
+
+ public void setReceivers(String receivers) {
+ this.receivers = receivers;
+ }
+
+ public String getReceiversCc() {
+ return receiversCc;
+ }
+
+ public void setReceiversCc(String receiversCc) {
+ this.receiversCc = receiversCc;
+ }
}
diff --git a/escheduler-server/pom.xml b/escheduler-server/pom.xml
index ad21578d6c..6341539e4c 100644
--- a/escheduler-server/pom.xml
+++ b/escheduler-server/pom.xml
@@ -89,7 +89,7 @@
escheduler-alert
-
+
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
index e137824814..bf0dcbfe75 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/master/MasterServer.java
@@ -216,7 +216,7 @@ public class MasterServer implements CommandLineRunner, IStoppable {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
- logger.error("master send heartbeat to zk failed");
+ logger.error("master send heartbeat to zk failed: can't find zookeeper regist path of master server");
return;
}
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
index 1c6232bc9a..2d88fdb843 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
@@ -25,8 +25,9 @@ import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient;
-import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,15 +99,7 @@ public class FetchTaskThread implements Runnable{
*/
private boolean checkWorkerGroup(TaskInstance taskInstance, String host){
- int taskWorkerGroupId = taskInstance.getWorkerGroupId();
- ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
- if(processInstance == null){
- logger.error("cannot find the task:{} process instance", taskInstance.getId());
- return false;
- }
- int processWorkerGroupId = processInstance.getWorkerGroupId();
-
- taskWorkerGroupId = (taskWorkerGroupId <= 0 ? processWorkerGroupId : taskWorkerGroupId);
+ int taskWorkerGroupId = processDao.getTaskWorkerGroupId(taskInstance);
if(taskWorkerGroupId <= 0){
return true;
@@ -117,99 +110,103 @@ public class FetchTaskThread implements Runnable{
return true;
}
String ips = workerGroup.getIpList();
- if(ips == null){
+ if(StringUtils.isBlank(ips)){
logger.error("task:{} worker group:{} parameters(ip_list) is null, this task would be running on all workers",
taskInstance.getId(), workerGroup.getId());
}
- String[] ipArray = ips.split(",");
+ String[] ipArray = ips.split(Constants.COMMA);
List ipList = Arrays.asList(ipArray);
return ipList.contains(host);
}
+
+
@Override
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
- if(OSUtils.checkResource(this.conf, false)) {
- // creating distributed locks, lock path /escheduler/lock/worker
- String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
- mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
- mutex.acquire();
+ ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
- ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
+ //check memory and cpu usage and threads
+ if(OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor)) {
- for (int i = 0; i < taskNum; i++) {
-
- int activeCount = poolExecutor.getActiveCount();
- if (activeCount >= workerExecNums) {
- logger.info("thread insufficient , activeCount : {} , workerExecNums : {}",activeCount,workerExecNums);
- continue;
- }
+ //whether have tasks, if no tasks , no need lock //get all tasks
+ List tasksQueueList = taskQueue.getAllTasks(Constants.SCHEDULER_TASKS_QUEUE);
+ if(tasksQueueList.size() > 0){
+ // creating distributed locks, lock path /escheduler/lock/worker
+ String zNodeLockPath = zkWorkerClient.getWorkerLockPath();
+ mutex = new InterProcessMutex(zkWorkerClient.getZkClient(), zNodeLockPath);
+ mutex.acquire();
// task instance id str
- String taskQueueStr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, false);
+ List taskQueueStrArr = taskQueue.poll(Constants.SCHEDULER_TASKS_QUEUE, taskNum);
- if (!StringUtils.isEmpty(taskQueueStr )) {
+ for(String taskQueueStr : taskQueueStrArr){
+ if (StringUtils.isNotBlank(taskQueueStr )) {
- String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
- String taskInstIdStr = taskStringArray[taskStringArray.length - 1];
- Date now = new Date();
- Integer taskId = Integer.parseInt(taskInstIdStr);
+ if (!checkThreadCount(poolExecutor)) {
+ break;
+ }
- // find task instance by task id
- TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
+ String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE);
+ String taskInstIdStr = taskStringArray[3];
+ Date now = new Date();
+ Integer taskId = Integer.parseInt(taskInstIdStr);
- logger.info("worker fetch taskId : {} from queue ", taskId);
+ // find task instance by task id
+ TaskInstance taskInstance = processDao.findTaskInstanceById(taskId);
- int retryTimes = 30;
- // mainly to wait for the master insert task to succeed
- while (taskInstance == null && retryTimes > 0) {
- Thread.sleep(Constants.SLEEP_TIME_MILLIS);
- taskInstance = processDao.findTaskInstanceById(taskId);
- retryTimes--;
- }
+ logger.info("worker fetch taskId : {} from queue ", taskId);
- if (taskInstance == null ) {
- logger.error("task instance is null. task id : {} ", taskId);
- continue;
- }
- if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
- continue;
- }
- taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
- logger.info("remove task:{} from queue", taskQueueStr);
+ int retryTimes = 30;
+ // mainly to wait for the master insert task to succeed
+ while (taskInstance == null && retryTimes > 0) {
+ Thread.sleep(Constants.SLEEP_TIME_MILLIS);
+ taskInstance = processDao.findTaskInstanceById(taskId);
+ retryTimes--;
+ }
+
+ if (taskInstance == null ) {
+ logger.error("task instance is null. task id : {} ", taskId);
+ continue;
+ }
- // set execute task worker host
- taskInstance.setHost(OSUtils.getHost());
- taskInstance.setStartTime(now);
+ if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
+ continue;
+ }
+ taskQueue.removeNode(Constants.SCHEDULER_TASKS_QUEUE, taskQueueStr);
+ logger.info("remove task:{} from queue", taskQueueStr);
+ // set execute task worker host
+ taskInstance.setHost(OSUtils.getHost());
+ taskInstance.setStartTime(now);
- // get process instance
- ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
+ // get process instance
+ ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
- // get process define
- ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
+ // get process define
+ ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
- taskInstance.setProcessInstance(processInstance);
- taskInstance.setProcessDefine(processDefine);
+ taskInstance.setProcessInstance(processInstance);
+ taskInstance.setProcessDefine(processDefine);
- // get local execute path
- String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
- processDefine.getId(),
- processInstance.getId(),
- taskInstance.getId());
- logger.info("task instance local execute path : {} ", execLocalPath);
+ // get local execute path
+ String execLocalPath = FileUtils.getProcessExecDir(processDefine.getProjectId(),
+ processDefine.getId(),
+ processInstance.getId(),
+ taskInstance.getId());
+ logger.info("task instance local execute path : {} ", execLocalPath);
- // set task execute path
- taskInstance.setExecutePath(execLocalPath);
+ // set task execute path
+ taskInstance.setExecutePath(execLocalPath);
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
@@ -218,21 +215,22 @@ public class FetchTaskThread implements Runnable{
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
tenant.getTenantCode(), logger);
- logger.info("task : {} ready to submit to task scheduler thread",taskId);
- // submit task
- workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
+ logger.info("task : {} ready to submit to task scheduler thread",taskId);
+ // submit task
+ workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
+ }
}
}
+
}
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch (Exception e){
logger.error("fetch task thread exception : " + e.getMessage(),e);
- }
- finally {
+ }finally {
if (mutex != null){
try {
mutex.release();
@@ -247,4 +245,18 @@ public class FetchTaskThread implements Runnable{
}
}
}
+
+ /**
+ *
+ * @param poolExecutor
+ * @return
+ */
+ private boolean checkThreadCount(ThreadPoolExecutor poolExecutor) {
+ int activeCount = poolExecutor.getActiveCount();
+ if (activeCount >= workerExecNums) {
+ logger.info("thread insufficient , activeCount : {} , workerExecNums : {}, will sleep : {} millis for thread resource", activeCount, workerExecNums, Constants.SLEEP_TIME_MILLIS);
+ return false;
+ }
+ return true;
+ }
}
\ No newline at end of file
diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
index 26d682f132..09f6467aad 100644
--- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
+++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
@@ -387,7 +387,7 @@ public class SqlTask extends AbstractTask {
String showTypeName = sqlParameters.getShowType().replace(Constants.COMMA,"").trim();
if(EnumUtils.isValidEnum(ShowType.class,showTypeName)){
Map mailResult = MailUtils.sendMails(receviersList, receviersCcList, title, content, ShowType.valueOf(showTypeName));
- if(!(Boolean) mailResult.get(Constants.STATUS)){
+ if(!(Boolean) mailResult.get(cn.escheduler.common.Constants.STATUS)){
throw new RuntimeException("send mail failed!");
}
}else{
diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
index 2c0422c701..b2062b7769 100644
--- a/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
+++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
@@ -28,6 +28,17 @@
@click="_toggleView"
icon="fa fa-code">
+
+
{{name}}
@@ -383,6 +394,13 @@
_toggleView () {
findComponentDownward(this.$root, `assist-dag-index`)._toggleView()
},
+
+ /**
+ * Starting parameters
+ */
+ _toggleParam () {
+ findComponentDownward(this.$root, `starting-params-dag-index`)._toggleParam()
+ },
/**
* Create a node popup layer
* @param Object id
diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
new file mode 100644
index 0000000000..b1494e063e
--- /dev/null
+++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue
@@ -0,0 +1,114 @@
+
+
+
+
+
{{$t('Startup parameter')}}
+
+ - {{$t('Startup type')}}:{{_rtRunningType(startupParam.commandType)}}
+ - {{$t('Complement range')}}:{{startupParam.commandParam.complementStartDate}}-{{startupParam.commandParam.complementEndDate}}-
+ - {{$t('Failure Strategy')}}:{{startupParam.failureStrategy === 'END' ? $t('End') : $t('Continue')}}
+ - {{$t('Process priority')}}:{{startupParam.processInstancePriority}}
+ - {{$t('Worker group')}}:{{_rtWorkerGroupName(startupParam.workerGroupId)}}
+ - {{$t('Notification strategy')}}:{{_rtWarningType(startupParam.warningType)}}
+ - {{$t('Notification group')}}:{{_rtNotifyGroupName(startupParam.warningGroupId)}}
+ - {{$t('Recipient')}}:{{startupParam.receivers || '-'}}
+ - {{$t('Cc')}}:{{startupParam.receiversCc || '-'}}
+
+
+
+
+
+
+
diff --git a/escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue b/escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
index cbe220ad36..705a151b78 100644
--- a/escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
+++ b/escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
@@ -1,6 +1,7 @@
+
@@ -10,6 +11,7 @@
import { mapActions, mapMutations } from 'vuex'
import mSpin from '@/module/components/spin/spin'
import mVariable from './_source/variable'
+ import mStartingParam from './_source/startingParam'
import Affirm from './_source/jumpAffirm'
import disabledState from '@/module/mixin/disabledState'
@@ -91,6 +93,6 @@
},
mounted () {
},
- components: { mDag, mSpin, mVariable }
+ components: { mDag, mSpin, mVariable, mStartingParam }
}
diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
index 22d48782d2..914bab2812 100644
--- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
+++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/master.vue
@@ -6,7 +6,7 @@
IP: {{item.host}}
- {{$t('Port')}}: {{item.port}}
+ {{$t('Process Pid')}}: {{item.port}}
{{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -93,4 +93,4 @@
\ No newline at end of file
+
diff --git a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
index 3cf0993415..960beeb14a 100644
--- a/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
+++ b/escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue
@@ -6,7 +6,7 @@
IP: {{item.host}}
- {{$t('Port')}}: {{item.port}}
+ {{$t('Process Pid')}}: {{item.port}}
{{$t('Zk registration directory')}}: {{item.zkDirectory}}
@@ -94,4 +94,4 @@
\ No newline at end of file
+
diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
index 46cfaa0c86..a61f0634d9 100644
--- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
+++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue
@@ -21,7 +21,7 @@
-
执行时间
+
执行时间
{{$t('Timing')}}
@@ -136,7 +136,7 @@
{{$t('Cancel')}}
- {{spinnerLoading ? 'Loading...' : (item.crontab ? $t('Edit') : $t('Create'))}}
+ {{spinnerLoading ? 'Loading...' : (item.crontab ? $t('Edit') : $t('Create'))}}
diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
index db6c8aa261..2259dea9cd 100644
--- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
+++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/util.js
@@ -37,7 +37,7 @@ let warningTypeList = [
]
const isEmial = (val) => {
- let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
+ let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
return regEmail.test(val)
}
diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
index 2bc1cad066..619407a61a 100644
--- a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
+++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue
@@ -95,17 +95,17 @@
shape="circle"
size="xsmall"
data-toggle="tooltip"
- :title="$t('Stop')"
- @click="_stop(item)"
- icon="iconfont icon-zanting1"
- :disabled="item.state !== 'RUNNING_EXEUTION'">
+ :title="item.state === 'STOP' ? $t('Recovery Suspend') : $t('Stop')"
+ @click="_stop(item,$index)"
+ :icon="item.state === 'STOP' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'"
+ :disabled="item.state !== 'RUNNING_EXEUTION' && item.state != 'STOP'">
- {{item.count}}s
+ {{item.count}}
- {{item.count}}s
+ {{item.count}}
-
-
+
+
+
+
+
+
+
- {{item.count}}s
+ {{item.count}}
+
+
+
@@ -362,11 +371,20 @@
* stop
* @param STOP
*/
- _stop (item) {
- this._upExecutorsState({
- processInstanceId: item.id,
- executeType: 'STOP'
- })
+ _stop (item, index) {
+ if(item.state == 'STOP') {
+ this._countDownFn({
+ id: item.id,
+ executeType: 'RECOVER_SUSPENDED_PROCESS',
+ index: index,
+ buttonType: 'suspend'
+ })
+ } else {
+ this._upExecutorsState({
+ processInstanceId: item.id,
+ executeType: 'STOP'
+ })
+ }
},
/**
* pause
@@ -383,7 +401,7 @@
} else {
this._upExecutorsState({
processInstanceId: item.id,
- executeType: item.state === 'PAUSE' ? 'RECOVER_SUSPENDED_PROCESS' : 'PAUSE'
+ executeType: 'PAUSE'
})
}
},
@@ -435,7 +453,7 @@
if (data.length) {
_.map(data, v => {
v.disabled = true
- v.count = 10
+ v.count = 9
})
}
return data
diff --git a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
index b02db7848e..574a995ec4 100644
--- a/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
+++ b/escheduler-ui/src/js/conf/home/pages/security/pages/users/_source/createUser.vue
@@ -131,7 +131,7 @@
}
},
_verification () {
- let regEmail = /^([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\_|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
+ let regEmail = /^([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+@([a-zA-Z0-9]+[_|\-|\.]?)*[a-zA-Z0-9]+\.[a-zA-Z]{2,3}$/ // eslint-disable-line
// Mobile phone number regular
let regPhone = /^1(3|4|5|6|7|8)\d{9}$/; // eslint-disable-line
diff --git a/escheduler-ui/src/js/conf/home/store/dag/actions.js b/escheduler-ui/src/js/conf/home/store/dag/actions.js
index 4426cfb719..c93505eead 100644
--- a/escheduler-ui/src/js/conf/home/store/dag/actions.js
+++ b/escheduler-ui/src/js/conf/home/store/dag/actions.js
@@ -149,6 +149,10 @@ export default {
state.tenantId = processInstanceJson.tenantId
+ //startup parameters
+ state.startup = _.assign(state.startup, _.pick(res.data, ['commandType', 'failureStrategy', 'processInstancePriority', 'workerGroupId', 'warningType', 'warningGroupId', 'receivers', 'receiversCc']))
+ state.startup.commandParam = JSON.parse(res.data.commandParam)
+
resolve(res.data)
}).catch(res => {
reject(res)
diff --git a/escheduler-ui/src/js/conf/home/store/dag/state.js b/escheduler-ui/src/js/conf/home/store/dag/state.js
index 37df7b8b86..c875500f5f 100644
--- a/escheduler-ui/src/js/conf/home/store/dag/state.js
+++ b/escheduler-ui/src/js/conf/home/store/dag/state.js
@@ -92,5 +92,8 @@ export default {
// Process instance list{ view a single record }
instanceListS: [],
// Operating state
- isDetails: false
+ isDetails: false,
+ startup: {
+
+ }
}
diff --git a/escheduler-ui/src/js/module/i18n/locale/en_US.js b/escheduler-ui/src/js/module/i18n/locale/en_US.js
index 0a9e1126ec..a56eaec46b 100644
--- a/escheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/escheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -466,7 +466,9 @@ export default {
'Statistics manage': 'Statistics manage',
'statistics': 'statistics',
'select tenant':'select tenant',
- 'Process Instance Running Count': 'Process Instance Running Count',
'Please enter Principal':'Please enter Principal',
- 'The start time must not be the same as the end': 'The start time must not be the same as the end'
+ 'The start time must not be the same as the end': 'The start time must not be the same as the end',
+ 'Startup parameter': 'Startup parameter',
+ 'Startup type': 'Startup type',
+ 'Complement range': 'Complement range'
}
diff --git a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js
index 0acb1975a4..1186a353ac 100644
--- a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -237,7 +237,7 @@ export default {
'Recovery Failed': '恢复失败',
'Stop': '停止',
'Pause': '暂停',
- 'Recovery Suspend': '恢复暂停',
+ 'Recovery Suspend': '恢复运行',
'Gantt': '甘特图',
'Name': '名称',
'Node Type': '节点类型',
@@ -282,7 +282,7 @@ export default {
'Start Process': '启动工作流',
'Execute from the current node': '从当前节点开始执行',
'Recover tolerance fault process': '恢复被容错的工作流',
- 'Resume the suspension process': '恢复暂停流程',
+ 'Resume the suspension process': '恢复运行流程',
'Execute from the failed nodes': '从失败节点开始执行',
'Complement Data': '补数',
'Scheduling execution': '调度执行',
@@ -468,5 +468,8 @@ export default {
'statistics': '统计',
'select tenant':'选择租户',
'Please enter Principal':'请输入Principal',
- 'The start time must not be the same as the end': '开始时间和结束时间不能相同'
+ 'The start time must not be the same as the end': '开始时间和结束时间不能相同',
+ 'Startup parameter': '启动参数',
+ 'Startup type': '启动类型',
+ 'Complement range': '补数范围'
}
diff --git a/install.sh b/install.sh
index a80c3198a9..d58be482fd 100644
--- a/install.sh
+++ b/install.sh
@@ -106,6 +106,18 @@ sslEnable="true"
# 下载Excel路径
xlsFilePath="/tmp/xls"
+# 企业微信企业ID配置
+enterpriseWechatCorpId="xxxxxxxxxx"
+
+# 企业微信应用Secret配置
+enterpriseWechatSecret="xxxxxxxxxx"
+
+# 企业微信应用AgentId配置
+enterpriseWechatAgentId="xxxxxxxxxx"
+
+# 企业微信用户配置,多个用户以,分割
+enterpriseWechatUsers="xxxxx,xxxxx"
+
#是否启动监控自启动脚本
monitorServerState="false"
@@ -318,7 +330,7 @@ sed -i ${txt} "s#master.exec.task.number.*#master.exec.task.number=${masterExecT
sed -i ${txt} "s#master.heartbeat.interval.*#master.heartbeat.interval=${masterHeartbeatInterval}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.retryTimes.*#master.task.commit.retryTimes=${masterTaskCommitRetryTimes}#g" conf/master.properties
sed -i ${txt} "s#master.task.commit.interval.*#master.task.commit.interval=${masterTaskCommitInterval}#g" conf/master.properties
-sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
+#sed -i ${txt} "s#master.max.cpuload.avg.*#master.max.cpuload.avg=${masterMaxCpuLoadAvg}#g" conf/master.properties
sed -i ${txt} "s#master.reserved.memory.*#master.reserved.memory=${masterReservedMemory}#g" conf/master.properties
@@ -345,6 +357,10 @@ sed -i ${txt} "s#mail.passwd.*#mail.passwd=${mailPassword}#g" conf/alert.propert
sed -i ${txt} "s#mail.smtp.starttls.enable.*#mail.smtp.starttls.enable=${starttlsEnable}#g" conf/alert.properties
sed -i ${txt} "s#mail.smtp.ssl.enable.*#mail.smtp.ssl.enable=${sslEnable}#g" conf/alert.properties
sed -i ${txt} "s#xls.file.path.*#xls.file.path=${xlsFilePath}#g" conf/alert.properties
+sed -i ${txt} "s#enterprise.wechat.corp.id.*#enterprise.wechat.corp.id=${enterpriseWechatCorpId}#g" conf/alert.properties
+sed -i ${txt} "s#enterprise.wechat.secret.*#enterprise.wechat.secret=${enterpriseWechatSecret}#g" conf/alert.properties
+sed -i ${txt} "s#enterprise.wechat.agent.id.*#enterprise.wechat.agent.id=${enterpriseWechatAgentId}#g" conf/alert.properties
+sed -i ${txt} "s#enterprise.wechat.users.*#enterprise.wechat.users=${enterpriseWechatUsers}#g" conf/alert.properties
sed -i ${txt} "s#installPath.*#installPath=${installPath}#g" conf/config/install_config.conf
diff --git a/sql/soft_version b/sql/soft_version
index a6a3a43c3a..1cc5f657e0 100644
--- a/sql/soft_version
+++ b/sql/soft_version
@@ -1 +1 @@
-1.0.4
\ No newline at end of file
+1.1.0
\ No newline at end of file