From 2a1283437c2e01e4b33a924d0afb5c76c5eb844f Mon Sep 17 00:00:00 2001 From: lenboo Date: Wed, 3 Jul 2019 15:45:40 +0800 Subject: [PATCH 01/12] add recovery from stop --- .../api/service/ExecutorService.java | 12 ++++++++- .../api/service/ProcessInstanceService.java | 14 ++++++++--- .../escheduler/dao/model/ProcessInstance.java | 14 +++++++++++ .../instance/pages/list/_source/list.vue | 25 +++++++++++++------ .../src/js/module/i18n/locale/en_US.js | 1 + .../src/js/module/i18n/locale/zh_CN.js | 1 + 6 files changed, 54 insertions(+), 13 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java index 740fbc961c..9602ac6cef 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java @@ -191,6 +191,16 @@ public class ExecutorService extends BaseService{ return checkResult; } + // checkTenantExists(); + Tenant tenant = processDao.getTenantForProcess(processDefinition.getTenantId(), + processDefinition.getUserId()); + if(tenant == null){ + logger.error("there is not any vaild tenant for the process definition: id:{},name:{}, ", + processDefinition.getId(), processDefinition.getName()); + putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processInstanceId); + return result; + } + switch (executeType) { case REPEAT_RUNNING: result = insertCommand(loginUser, processInstanceId, processDefinition.getId(), CommandType.REPEAT_RUNNING); @@ -260,7 +270,7 @@ public class ExecutorService extends BaseService{ } break; case RECOVER_SUSPENDED_PROCESS: - if (executionStatus.typeIsPause()) { + if (executionStatus.typeIsPause()|| executionStatus.typeIsCancel()) { checkResult = true; } default: diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 3cbf5f1414..88ff3f1019 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -38,10 +38,7 @@ import cn.escheduler.common.utils.JSONUtils; import cn.escheduler.common.utils.ParameterUtils; import cn.escheduler.common.utils.placeholder.BusinessTimeUtils; import cn.escheduler.dao.ProcessDao; -import cn.escheduler.dao.mapper.ProcessDefinitionMapper; -import cn.escheduler.dao.mapper.ProcessInstanceMapper; -import cn.escheduler.dao.mapper.ProjectMapper; -import cn.escheduler.dao.mapper.TaskInstanceMapper; +import cn.escheduler.dao.mapper.*; import cn.escheduler.dao.model.*; import com.alibaba.fastjson.JSON; import org.apache.commons.lang3.StringUtils; @@ -97,6 +94,9 @@ public class ProcessInstanceService extends BaseDAGService { @Autowired LoggerService loggerService; + @Autowired + WorkerGroupMapper workerGroupMapper; + /** * query process instance by id * @@ -115,6 +115,12 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); + if(processInstance.getWorkerGroupId() == -1){ + processInstance.setWorkerGroupName("Default"); + }else{ + WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); + processInstance.setWorkerGroupName(workerGroup.getName()); + } result.put(Constants.DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index 24902c0121..158bf3f847 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -194,6 +194,11 @@ public class ProcessInstance { */ private int tenantId; + /** + * worker group name. for api. + */ + private String workerGroupName; + public ProcessInstance(){ } @@ -549,6 +554,7 @@ public class ProcessInstance { ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + ", duration=" + duration + ", timeout=" + timeout + + ", workerGroupName=" + workerGroupName + ", processInstancePriority=" + processInstancePriority + '}'; } @@ -560,4 +566,12 @@ public class ProcessInstance { public int getTenantId() { return this.tenantId ; } + + public String getWorkerGroupName() { + return workerGroupName; + } + + public void setWorkerGroupName(String workerGroupName) { + this.workerGroupName = workerGroupName; + } } diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index 2bc1cad066..b867c1751b 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -95,10 +95,10 @@ shape="circle" size="xsmall" data-toggle="tooltip" - :title="$t('Stop')" - @click="_stop(item)" + :title="item.state === 'STOP' ? $t('Recovery Stop') : $t('Stop')" + @click="_stop(item,$index)" icon="iconfont icon-zanting1" - :disabled="item.state !== 'RUNNING_EXEUTION'"> + :disabled="item.state !== 'RUNNING_EXEUTION' && item.state != 'STOP'"> Date: Thu, 4 Jul 2019 11:33:29 +0800 Subject: [PATCH 02/12] change stop and pause at ui add receivers and cc in process instance for api --- .../api/service/ProcessInstanceService.java | 3 ++ .../escheduler/dao/model/ProcessInstance.java | 27 +++++++++++- .../instance/pages/list/_source/list.vue | 43 +++++++++++-------- .../src/js/module/i18n/locale/en_US.js | 1 - .../src/js/module/i18n/locale/zh_CN.js | 5 +-- 5 files changed, 57 insertions(+), 22 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 88ff3f1019..cf2e5e494b 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -121,6 +121,9 @@ public class ProcessInstanceService extends BaseDAGService { WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); processInstance.setWorkerGroupName(workerGroup.getName()); } + ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); + processInstance.setReceivers(processDefinition.getReceivers()); + processInstance.setReceiversCc(processDefinition.getReceiversCc()); result.put(Constants.DATA_LIST, processInstance); putMsg(result, Status.SUCCESS); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java index 158bf3f847..5c9418ca72 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java @@ -199,6 +199,16 @@ public class ProcessInstance { */ private String workerGroupName; + /** + * receivers for api + */ + private String receivers; + + /** + * receivers cc for api + */ + private String receiversCc; + public ProcessInstance(){ } @@ -554,7 +564,6 @@ public class ProcessInstance { ", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' + ", duration=" + duration + ", timeout=" + timeout + - ", workerGroupName=" + workerGroupName + ", processInstancePriority=" + processInstancePriority + '}'; } @@ -574,4 +583,20 @@ public class ProcessInstance { public void setWorkerGroupName(String workerGroupName) { this.workerGroupName = workerGroupName; } + + public String getReceivers() { + return receivers; + } + + public void setReceivers(String receivers) { + this.receivers = receivers; + } + + public String getReceiversCc() { + return receiversCc; + } + + public void setReceiversCc(String receiversCc) { + this.receiversCc = receiversCc; + } } diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index b867c1751b..619407a61a 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -95,9 +95,9 @@ shape="circle" size="xsmall" data-toggle="tooltip" - :title="item.state === 'STOP' ? $t('Recovery Stop') : $t('Stop')" + :title="item.state === 'STOP' ? $t('Recovery Suspend') : $t('Stop')" @click="_stop(item,$index)" - icon="iconfont icon-zanting1" + :icon="item.state === 'STOP' ? 'iconfont icon-ai06' : 'iconfont icon-zanting'" :disabled="item.state !== 'RUNNING_EXEUTION' && item.state != 'STOP'"> - {{item.count}}s + {{item.count}} - {{item.count}}s + {{item.count}} - - + + + + + + + - {{item.count}}s + {{item.count}} + + + @@ -392,7 +401,7 @@ } else { this._upExecutorsState({ processInstanceId: item.id, - executeType: item.state === 'PAUSE' ? 'RECOVER_SUSPENDED_PROCESS' : 'PAUSE' + executeType: 'PAUSE' }) } }, @@ -444,7 +453,7 @@ if (data.length) { _.map(data, v => { v.disabled = true - v.count = 10 + v.count = 9 }) } return data diff --git a/escheduler-ui/src/js/module/i18n/locale/en_US.js b/escheduler-ui/src/js/module/i18n/locale/en_US.js index 8933f10c4a..57526d5b8d 100644 --- a/escheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/escheduler-ui/src/js/module/i18n/locale/en_US.js @@ -238,7 +238,6 @@ export default { 'Stop': 'Stop', 'Pause': 'Pause', 'Recovery Suspend': 'Recovery Suspend', - 'Recovery Stop': 'Recovery Stop', 'Gantt': 'Gantt', 'Name': 'Name', 'Node Type': 'Node Type', diff --git a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js index 4c9c3a1cbe..9353ea8866 100644 --- a/escheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/escheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -237,8 +237,7 @@ export default { 'Recovery Failed': '恢复失败', 'Stop': '停止', 'Pause': '暂停', - 'Recovery Suspend': '恢复暂停', - 'Recovery Stop': '恢复停止', + 'Recovery Suspend': '恢复运行', 'Gantt': '甘特图', 'Name': '名称', 'Node Type': '节点类型', @@ -283,7 +282,7 @@ export default { 'Start Process': '启动工作流', 'Execute from the current node': '从当前节点开始执行', 'Recover tolerance fault process': '恢复被容错的工作流', - 'Resume the suspension process': '恢复暂停流程', + 'Resume the suspension process': '恢复运行流程', 'Execute from the failed nodes': '从失败节点开始执行', 'Complement Data': '补数', 'Scheduling execution': '调度执行', From 65015f98d403f8deba0823819b71dd3386fbec70 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 4 Jul 2019 15:43:16 +0800 Subject: [PATCH 03/12] update --- .../java/cn/escheduler/api/service/ProcessInstanceService.java | 2 +- .../src/main/java/cn/escheduler/common/Constants.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index cf2e5e494b..9ba29e0555 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -116,7 +116,7 @@ public class ProcessInstanceService extends BaseDAGService { } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); if(processInstance.getWorkerGroupId() == -1){ - processInstance.setWorkerGroupName("Default"); + processInstance.setWorkerGroupName(DEFAULT); }else{ WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); processInstance.setWorkerGroupName(workerGroup.getName()); diff --git a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java index 62332cbb75..de63843bfb 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/Constants.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/Constants.java @@ -483,6 +483,8 @@ public final class Constants { public static final String TASK_RECORD_PWD = "task.record.datasource.password"; + public static final String DEFAULT = "Default"; + public static String TASK_RECORD_TABLE_HIVE_LOG = "eamp_hive_log_hd"; public static String TASK_RECORD_TABLE_HISTORY_HIVE_LOG = "eamp_hive_hist_log_hd"; From edd65e74e15bb1deb56f0795b695a2042dc41003 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Thu, 4 Jul 2019 19:43:00 +0800 Subject: [PATCH 04/12] Added the ability for administrators to view all resource by type --- .../api/service/ResourcesService.java | 7 ++++++- .../escheduler/dao/mapper/ResourceMapper.java | 18 +++++++++++++++++- .../dao/mapper/ResourceMapperProvider.java | 15 +++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java index 86fb322b1a..651d9603f4 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ResourcesService.java @@ -369,7 +369,12 @@ public class ResourcesService extends BaseService { public Map queryResourceList(User loginUser, ResourceType type) { Map result = new HashMap<>(5); - List resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); + List resourceList; + if(isAdmin(loginUser)){ + resourceList = resourcesMapper.listAllResourceByType(type.ordinal()); + }else{ + resourceList = resourcesMapper.queryResourceListAuthored(loginUser.getId(), type.ordinal()); + } result.put(Constants.DATA_LIST, resourceList); putMsg(result,Status.SUCCESS); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java index 9d2ab80f21..c57d15128d 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapper.java @@ -274,5 +274,21 @@ public interface ResourceMapper { @SelectProvider(type = ResourceMapperProvider.class, method = "queryTenantCodeByResourceName") String queryTenantCodeByResourceName(@Param("resName") String resName); - + /** + * query resource list that the appointed user has permission + * @param type + * @return + */ + @Results(value = {@Result(property = "id", column = "id", id = true, javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "alias", column = "alias", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "fileName", column = "file_name", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "desc", column = "desc", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "userId", column = "user_id", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "type", column = "type", typeHandler = EnumOrdinalTypeHandler.class, javaType = ResourceType.class, jdbcType = JdbcType.TINYINT), + @Result(property = "size", column = "size", javaType = Long.class, jdbcType = JdbcType.BIGINT), + @Result(property = "createTime", column = "create_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE), + @Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE) + }) + @SelectProvider(type = ResourceMapperProvider.class, method = "listAllResourceByType") + List listAllResourceByType(@Param("type") Integer type); } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java index 4314b8f584..a943bb6ba4 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ResourceMapperProvider.java @@ -295,4 +295,19 @@ public class ResourceMapperProvider { WHERE("type = #{type} and user_id = #{userId}"); }}.toString(); } + + /** + * list all resource by type + * + * @param parameter + * @return + */ + public String listAllResourceByType(Map parameter) { + return new SQL() {{ + SELECT("*"); + FROM(TABLE_NAME); + WHERE("type = #{type}"); + }}.toString(); + } + } From 570eeb030a48b3258f015c20824a8ef5ab4ef596 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 4 Jul 2019 20:30:39 +0800 Subject: [PATCH 05/12] update task queue format --- .../java/cn/escheduler/common/queue/TaskQueueZkImpl.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index 88915b97cc..2c2e3ef4d9 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -210,7 +210,12 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { while(iterator.hasNext()){ if(j++ < tasksNum){ String task = iterator.next(); - taskslist.add(task); + String[] taskArray = task.split(Constants.UNDERLINE); + int processInstanceId = Integer.parseInt(taskArray[1]); + int taskId = Integer.parseInt(taskArray[3]); + String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE + + taskArray[2] + Constants.UNDERLINE + taskId; + taskslist.add(destTask); } } return taskslist; From cd2ad0c37975c881c52e1740d98a0c12b1075b83 Mon Sep 17 00:00:00 2001 From: lenboo Date: Thu, 4 Jul 2019 20:44:14 +0800 Subject: [PATCH 06/12] update pull task --- .../main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index 2c2e3ef4d9..e116cd3271 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -166,9 +166,8 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskHosts = taskDetailArrs[4]; //task can assign to any worker host if equals default ip value of worker server - if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){ + if(!taskHosts.equals(String.valueOf(Constants.DEFAULT_WORKER_ID))){ String[] taskHostsArr = taskHosts.split(Constants.COMMA); - if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){ continue; } From 2b157db3930f4a29209f0970953714452c876dd0 Mon Sep 17 00:00:00 2001 From: huyuanming Date: Fri, 5 Jul 2019 10:39:41 +0800 Subject: [PATCH 07/12] startup parameters --- .../src/js/conf/home/pages/dag/_source/startingParam/index.vue | 2 -- 1 file changed, 2 deletions(-) diff --git a/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue b/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue index ceb019ce90..b1494e063e 100644 --- a/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue +++ b/escheduler-ui/src/js/conf/home/pages/dag/_source/startingParam/index.vue @@ -85,8 +85,6 @@ deep: true, handler () { this.isActive = false - this.notifyGroupList = null - this.workerGroupList = null this.$nextTick(() => (this.isActive = true)) } } From c9120c340306e87b2b2c35a9f9c457375fa1d301 Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Fri, 5 Jul 2019 11:09:38 +0800 Subject: [PATCH 08/12] Remove restrictions on administrators --- .../projects/pages/definition/pages/list/_source/timing.vue | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue index 46cfaa0c86..a61f0634d9 100644 --- a/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue +++ b/escheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/timing.vue @@ -21,7 +21,7 @@
- 执行时间 + 执行时间
{{$t('Timing')}}
@@ -136,7 +136,7 @@
{{$t('Cancel')}} - {{spinnerLoading ? 'Loading...' : (item.crontab ? $t('Edit') : $t('Create'))}} + {{spinnerLoading ? 'Loading...' : (item.crontab ? $t('Edit') : $t('Create'))}}
From 5c7515cf8c74450a885e39c28fb6853f60a8cb5b Mon Sep 17 00:00:00 2001 From: lenboo Date: Fri, 5 Jul 2019 11:11:43 +0800 Subject: [PATCH 09/12] fix sub process worker group id is 0 --- .../escheduler/api/service/ProcessInstanceService.java | 10 ++++++++-- .../src/main/java/cn/escheduler/dao/ProcessDao.java | 3 ++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java index 9ba29e0555..f5f05a74d6 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java @@ -115,12 +115,18 @@ public class ProcessInstanceService extends BaseDAGService { return checkResult; } ProcessInstance processInstance = processDao.findProcessInstanceDetailById(processId); + String workerGroupName = ""; if(processInstance.getWorkerGroupId() == -1){ - processInstance.setWorkerGroupName(DEFAULT); + workerGroupName = DEFAULT; }else{ WorkerGroup workerGroup = workerGroupMapper.queryById(processInstance.getWorkerGroupId()); - processInstance.setWorkerGroupName(workerGroup.getName()); + if(workerGroup != null){ + workerGroupName = DEFAULT; + }else{ + workerGroupName = workerGroup.getName(); + } } + processInstance.setWorkerGroupName(workerGroupName); ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); processInstance.setReceivers(processDefinition.getReceivers()); processInstance.setReceiversCc(processDefinition.getReceiversCc()); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 8ba0f47960..d393339a5e 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -493,7 +493,8 @@ public class ProcessDao extends AbstractBaseDao { processInstance.setProcessInstanceJson(processDefinition.getProcessDefinitionJson()); // set process instance priority processInstance.setProcessInstancePriority(command.getProcessInstancePriority()); - processInstance.setWorkerGroupId(command.getWorkerGroupId()); + int workerGroupId = command.getWorkerGroupId() == 0 ? -1 : command.getWorkerGroupId(); + processInstance.setWorkerGroupId(workerGroupId); processInstance.setTimeout(processDefinition.getTimeout()); processInstance.setTenantId(processDefinition.getTenantId()); return processInstance; From d0a0b67c8b55fd4c538f3fc3218a675b713dd38a Mon Sep 17 00:00:00 2001 From: lgcareer <18610854716@163.com> Date: Fri, 5 Jul 2019 11:46:11 +0800 Subject: [PATCH 10/12] change 1.0.4 to 1.1.0 --- sql/soft_version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/soft_version b/sql/soft_version index a6a3a43c3a..1cc5f657e0 100644 --- a/sql/soft_version +++ b/sql/soft_version @@ -1 +1 @@ -1.0.4 \ No newline at end of file +1.1.0 \ No newline at end of file From d720108e2fd406c39856289e4c1c9667b60a3fc8 Mon Sep 17 00:00:00 2001 From: lenboo Date: Fri, 5 Jul 2019 15:25:15 +0800 Subject: [PATCH 11/12] update worker get task from queue --- .../common/queue/TaskQueueZkImpl.java | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java index e116cd3271..df75a3d525 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/queue/TaskQueueZkImpl.java @@ -22,6 +22,7 @@ import cn.escheduler.common.utils.Bytes; import cn.escheduler.common.utils.IpUtils; import cn.escheduler.common.utils.OSUtils; import cn.escheduler.common.zk.AbstractZKClient; +import org.apache.commons.lang3.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; @@ -157,7 +158,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { String taskDetail = list.get(i); String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE); - //向前版本兼容 + //向前版本兼ProcessInstanceService容 if(taskDetailArrs.length >= 4){ //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} @@ -209,17 +210,33 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { while(iterator.hasNext()){ if(j++ < tasksNum){ String task = iterator.next(); - String[] taskArray = task.split(Constants.UNDERLINE); - int processInstanceId = Integer.parseInt(taskArray[1]); - int taskId = Integer.parseInt(taskArray[3]); - String destTask = taskArray[0]+Constants.UNDERLINE + processInstanceId + Constants.UNDERLINE - + taskArray[2] + Constants.UNDERLINE + taskId; - taskslist.add(destTask); + + taskslist.add(getOriginTaskFormat(task)); } } return taskslist; } + /** + * format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId} + * processInstanceId and task id need to be convert to int. + * @param formatTask + * @return + */ + private String getOriginTaskFormat(String formatTask){ + String[] taskArray = formatTask.split(Constants.UNDERLINE); + int processInstanceId = Integer.parseInt(taskArray[1]); + int taskId = Integer.parseInt(taskArray[3]); + String suffix = ""; + for(int index =4; index < taskArray.length; index++){ + suffix += taskArray[index] + Constants.UNDERLINE; + } + String destTask = String.format("%s_%s_%s_%s", taskArray[0], processInstanceId, taskArray[3], taskId); + if(StringUtils.isNotEmpty(suffix)){ + destTask += Constants.UNDERLINE + suffix; + } + return destTask; + } @Override public void removeNode(String key, String nodeValue){ From 67f82d71a8dc9a1d02aa311392af4ca512f17a79 Mon Sep 17 00:00:00 2001 From: lenboo Date: Fri, 5 Jul 2019 15:26:03 +0800 Subject: [PATCH 12/12] update worker get task from queue --- .../cn/escheduler/server/worker/runner/FetchTaskThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java index a960570ea5..2d88fdb843 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java @@ -153,7 +153,7 @@ public class FetchTaskThread implements Runnable{ } String[] taskStringArray = taskQueueStr.split(Constants.UNDERLINE); - String taskInstIdStr = taskStringArray[taskStringArray.length - 1]; + String taskInstIdStr = taskStringArray[3]; Date now = new Date(); Integer taskId = Integer.parseInt(taskInstIdStr);