Browse Source

Merge pull request #132 from lgcareer/dev-20190415

Dev 20190415:add receivers and receiversCc in SqlParameter and SqlTask
pull/2/head
lgcareer 6 years ago committed by GitHub
parent
commit
4720cd4852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java
  2. 19
      escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java
  3. 20
      escheduler-api/src/test/java/cn/escheduler/api/controller/ExecutorControllerTest.java
  4. 27
      escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java
  5. 54
      escheduler-dao/readme.txt
  6. 7
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java
  7. 162
      sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql

5
escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java

@ -149,10 +149,11 @@ public class ExecutorController extends BaseController {
@GetMapping(value = "/get-receiver-cc") @GetMapping(value = "/get-receiver-cc")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "processDefinitionId") int processDefinitionId){ @RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId,
@RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) {
logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName()); logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName());
try { try {
Map<String, Object> result = execService.getReceiverCc(processDefinitionId); Map<String, Object> result = execService.getReceiverCc(processDefinitionId,processInstanceId);
return returnDataList(result); return returnDataList(result);
} catch (Exception e) { } catch (Exception e) {
logger.error(QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR.getMsg(),e); logger.error(QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR.getMsg(),e);

19
escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java

@ -361,18 +361,29 @@ public class ExecutorService extends BaseService{
} }
/** /**
* query recipients and copyers by process definition id * query recipients and copyers by process definition id or processInstanceId
* *
* @param processDefineId * @param processDefineId
* @return * @return
*/ */
public Map<String, Object> getReceiverCc(int processDefineId) { public Map<String, Object> getReceiverCc(Integer processDefineId,Integer processInstanceId) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
logger.info("processInstanceId {}",processInstanceId);
if(processDefineId == null && processInstanceId == null){
throw new RuntimeException("You must set values for parameters processDefineId or processInstanceId");
}
if(processDefineId == null && processInstanceId != null) {
ProcessInstance processInstance = processInstanceMapper.queryById(processInstanceId);
if (processInstance == null) {
throw new RuntimeException("processInstanceId is not exists");
}
processDefineId = processInstance.getProcessDefinitionId();
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefineId); ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefineId);
if (processDefinition == null){ if (processDefinition == null){
throw new RuntimeException("processDefineId is not exists"); throw new RuntimeException(String.format("processDefineId %d is not exists",processDefineId));
} }
String receivers = processDefinition.getReceivers(); String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc(); String receiversCc = processDefinition.getReceiversCc();
Map<String,String> dataMap = new HashMap<>(); Map<String,String> dataMap = new HashMap<>();

20
escheduler-api/src/test/java/cn/escheduler/api/controller/ExecutorControllerTest.java

@ -32,8 +32,11 @@ import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.context.WebApplicationContext; import org.springframework.web.context.WebApplicationContext;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@ -66,4 +69,21 @@ public class ExecutorControllerTest {
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString()); logger.info(mvcResult.getResponse().getContentAsString());
} }
@Test
public void getReceiverCc() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
//paramsMap.add("processDefinitionId","4");
paramsMap.add("processInstanceId","13");
//paramsMap.add("processInstanceId","13");
MvcResult mvcResult = mockMvc.perform(get("/projects/{projectName}/executors/get-receiver-cc","li_sql_test")
.header("sessionId", "e79b3353-e227-4680-88c0-544194e64025")
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue());
logger.info(mvcResult.getResponse().getContentAsString());
}
} }

27
escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java

@ -65,6 +65,16 @@ public class SqlParameters extends AbstractParameters {
*/ */
private String connParams; private String connParams;
/**
* receivers
*/
private String receivers;
/**
* receivers cc
*/
private String receiversCc;
public String getType() { public String getType() {
return type; return type;
} }
@ -121,6 +131,21 @@ public class SqlParameters extends AbstractParameters {
this.connParams = connParams; this.connParams = connParams;
} }
public String getReceivers() {
return receivers;
}
public void setReceivers(String receivers) {
this.receivers = receivers;
}
public String getReceiversCc() {
return receiversCc;
}
public void setReceiversCc(String receiversCc) {
this.receiversCc = receiversCc;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
@ -142,6 +167,8 @@ public class SqlParameters extends AbstractParameters {
", udfs='" + udfs + '\'' + ", udfs='" + udfs + '\'' +
", showType='" + showType + '\'' + ", showType='" + showType + '\'' +
", connParams='" + connParams + '\'' + ", connParams='" + connParams + '\'' +
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
'}'; '}';
} }
} }

54
escheduler-dao/readme.txt

@ -1,54 +0,0 @@
-- 用户指定队列
alter table t_escheduler_user add queue varchar(64);
-- 访问token
CREATE TABLE `t_escheduler_access_token` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` int(11) DEFAULT NULL COMMENT '用户id',
`token` varchar(64) DEFAULT NULL COMMENT 'token令牌',
`expire_time` datetime DEFAULT NULL COMMENT 'token有效结束时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
CREATE TABLE `t_escheduler_error_command` (
`id` int(11) NOT NULL COMMENT '主键',
`command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程',
`executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者',
`process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id',
`command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)',
`task_depend_type` tinyint(4) NULL DEFAULT NULL COMMENT '节点依赖类型',
`failure_strategy` tinyint(4) NULL DEFAULT 0 COMMENT '失败策略:0结束,1继续',
`warning_type` tinyint(4) NULL DEFAULT 0 COMMENT '告警类型',
`warning_group_id` int(11) NULL DEFAULT NULL COMMENT '告警组',
`schedule_time` datetime(0) NULL DEFAULT NULL COMMENT '预期运行时间',
`start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`dependence` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '依赖字段',
`process_instance_priority` int(11) NULL DEFAULT NULL COMMENT '流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
CREATE TABLE `t_escheduler_worker_group` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称',
`ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
ALTER TABLE `t_escheduler_task_instance`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`;
ALTER TABLE `t_escheduler_command`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;
ALTER TABLE `t_escheduler_error_command`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;
ALTER TABLE `t_escheduler_schedules`
ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`;

7
escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java

@ -305,7 +305,7 @@ public class SqlTask extends AbstractTask {
receviersList.add(user.getEmail()); receviersList.add(user.getEmail());
} }
// custom receiver // custom receiver
String receivers = processDefine.getReceivers(); String receivers = sqlParameters.getReceivers();
if (StringUtils.isNotEmpty(receivers)){ if (StringUtils.isNotEmpty(receivers)){
String[] splits = receivers.split(Constants.COMMA); String[] splits = receivers.split(Constants.COMMA);
for (String receiver : splits){ for (String receiver : splits){
@ -315,11 +315,8 @@ public class SqlTask extends AbstractTask {
// copy list // copy list
List<String> receviersCcList = new ArrayList<String>(); List<String> receviersCcList = new ArrayList<String>();
// Custom Copier // Custom Copier
String receiversCc = processDefine.getReceiversCc(); String receiversCc = sqlParameters.getReceiversCc();
if (StringUtils.isNotEmpty(receiversCc)){ if (StringUtils.isNotEmpty(receiversCc)){
String[] splits = receiversCc.split(Constants.COMMA); String[] splits = receiversCc.split(Constants.COMMA);
for (String receiverCc : splits){ for (String receiverCc : splits){

162
sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql

@ -19,3 +19,165 @@ d//
delimiter ; delimiter ;
CALL ac_escheduler_T_t_escheduler_version; CALL ac_escheduler_T_t_escheduler_version;
DROP PROCEDURE ac_escheduler_T_t_escheduler_version; DROP PROCEDURE ac_escheduler_T_t_escheduler_version;
-- ac_escheduler_T_t_escheduler_user_C_queue
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_user_C_queue;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_user_C_queue()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_user'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='queue')
THEN
ALTER TABLE t_escheduler_user ADD COLUMN queue varchar(64) COMMENT '队列' AFTER update_time;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_user_C_queue;
DROP PROCEDURE ac_escheduler_T_t_escheduler_user_C_queue;
-- ac_escheduler_T_t_escheduler_access_token
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_access_token;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_access_token()
BEGIN
drop table if exists t_escheduler_access_token;
CREATE TABLE IF NOT EXISTS `t_escheduler_access_token` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` int(11) DEFAULT NULL COMMENT '用户id',
`token` varchar(64) DEFAULT NULL COMMENT 'token令牌',
`expire_time` datetime DEFAULT NULL COMMENT 'token有效结束时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_access_token;
DROP PROCEDURE ac_escheduler_T_t_escheduler_access_token;
-- ac_escheduler_T_t_escheduler_error_command
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_error_command;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_error_command()
BEGIN
drop table if exists t_escheduler_error_command;
CREATE TABLE IF NOT EXISTS `t_escheduler_error_command` (
`id` int(11) NOT NULL COMMENT '主键',
`command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程',
`executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者',
`process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id',
`command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)',
`task_depend_type` tinyint(4) NULL DEFAULT NULL COMMENT '节点依赖类型',
`failure_strategy` tinyint(4) NULL DEFAULT 0 COMMENT '失败策略:0结束,1继续',
`warning_type` tinyint(4) NULL DEFAULT 0 COMMENT '告警类型',
`warning_group_id` int(11) NULL DEFAULT NULL COMMENT '告警组',
`schedule_time` datetime(0) NULL DEFAULT NULL COMMENT '预期运行时间',
`start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`dependence` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '依赖字段',
`process_instance_priority` int(11) NULL DEFAULT NULL COMMENT '流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest',
`worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组',
`message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT=1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_error_command;
DROP PROCEDURE ac_escheduler_T_t_escheduler_error_command;
-- ac_escheduler_T_t_escheduler_worker_group
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_worker_group;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_worker_group()
BEGIN
drop table if exists t_escheduler_worker_group;
CREATE TABLE IF NOT EXISTS `t_escheduler_worker_group` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称',
`ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT=1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_worker_group;
DROP PROCEDURE ac_escheduler_T_t_escheduler_worker_group;
-- ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='worker_group_id')
THEN
ALTER TABLE t_escheduler_task_instance ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id;
-- ac_escheduler_T_t_escheduler_command_C_worker_group_id
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_command_C_worker_group_id;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_command_C_worker_group_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='worker_group_id')
THEN
ALTER TABLE t_escheduler_command ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `process_instance_priority`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_command_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_command_C_worker_group_id;
-- ac_escheduler_T_t_escheduler_schedules_C_worker_group_id
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_schedules_C_worker_group_id;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_schedules_C_worker_group_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_schedules'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='worker_group_id')
THEN
ALTER TABLE t_escheduler_schedules ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `process_instance_priority`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_schedules_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_schedules_C_worker_group_id;
Loading…
Cancel
Save