Browse Source

Merge pull request #160 from lenboo/dev-20190415

add time out
pull/2/head
bao liang 5 years ago committed by GitHub
parent
commit
bef3eef714
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java
  2. 19
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
  3. 5
      escheduler-common/src/main/java/cn/escheduler/common/Constants.java
  4. 35
      escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java
  5. 1
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  6. 5
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java
  7. 4
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java
  8. 11
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java
  9. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  10. 10
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java
  11. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java
  12. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  13. 6
      escheduler-dao/src/main/resources/dao/data_source.properties
  14. 32
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
  15. 58
      escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java
  16. 4
      escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java
  17. 44
      sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql

2
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java

@ -125,6 +125,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setDesc(desc);
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
@ -288,6 +289,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setDesc(desc);
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();

19
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@ -346,7 +346,8 @@ public class ProcessInstanceService extends BaseDAGService {
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR, "update");
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
processInstance.getName(), processInstance.getState().toString(), "update");
return result;
}
Date schedule = null;
@ -355,8 +356,12 @@ public class ProcessInstanceService extends BaseDAGService {
} else {
schedule = processInstance.getScheduleTime();
}
processInstance.setScheduleTime(schedule);
processInstance.setLocations(locations);
processInstance.setConnects(connects);
String globalParams = null;
String originDefParams = null;
int timeout = processInstance.getTimeout();
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
@ -370,9 +375,14 @@ public class ProcessInstanceService extends BaseDAGService {
Map<String, String> globalParamMap = globalParamList.stream().collect(Collectors.toMap(Property::getProp, Property::getValue));
globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList,
processInstance.getCmdTypeIfComplement(), schedule);
}
int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
globalParams, schedule, flag, locations, connects);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
// int update = processDao.updateProcessInstance(processInstanceId, processInstanceJson,
// globalParams, schedule, flag, locations, connects);
int update = processDao.updateProcessInstance(processInstance);
int updateDefine = 1;
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
@ -380,6 +390,7 @@ public class ProcessInstanceService extends BaseDAGService {
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);
processDefinition.setConnects(connects);
processDefinition.setTimeout(timeout);
updateDefine = processDefineMapper.update(processDefinition);
}
if (update > 0 && updateDefine > 0) {

5
escheduler-common/src/main/java/cn/escheduler/common/Constants.java

@ -331,6 +331,11 @@ public final class Constants {
*/
public static final int MAX_TASK_TIMEOUT = 24 * 3600;
/**
* max task timeout
*/
public static final int MAX_PROCESS_TIMEOUT = Integer.MAX_VALUE;
/**
* heartbeat threads number

35
escheduler-dao/src/main/java/cn/escheduler/dao/AlertDao.java

@ -23,6 +23,8 @@ import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.mapper.AlertMapper;
import cn.escheduler.dao.mapper.UserAlertGroupMapper;
import cn.escheduler.dao.model.Alert;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.User;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@ -83,8 +85,9 @@ public class AlertDao extends AbstractBaseDao {
*/
public void sendServerStopedAlert(int alertgroupId,String host,String serverType){
Alert alert = new Alert();
String content = String.format("[{'type':'%s','host':'%s','event':'服务挂掉','警告级别':'严重'}]",serverType,host);
alert.setTitle("容错告警");
String content = String.format("[{'type':'%s','host':'%s','event':'server down','warning level':'serious'}]",
serverType, host);
alert.setTitle("Fault tolerance warning");
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
@ -94,6 +97,34 @@ public class AlertDao extends AbstractBaseDao {
alertMapper.insert(alert);
}
/**
* process time out alert
* @param processInstance
* @param processDefinition
*/
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition){
int alertgroupId = processInstance.getWarningGroupId();
String receivers = processDefinition.getReceivers();
String receiversCc = processDefinition.getReceiversCc();
Alert alert = new Alert();
String content = String.format("[{'id':'%d','name':'%s','event':'timeout','warnLevel':'middle'}]",
processInstance.getId(), processInstance.getName());
alert.setTitle("Process Timeout Warn");
alert.setShowType(ShowType.TABLE);
alert.setContent(content);
alert.setAlertType(AlertType.EMAIL);
alert.setAlertGroupId(alertgroupId);
if (StringUtils.isNotEmpty(receivers)) {
alert.setReceivers(receivers);
}
if (StringUtils.isNotEmpty(receiversCc)) {
alert.setReceiversCc(receiversCc);
}
alert.setCreateTime(new Date());
alert.setUpdateTime(new Date());
alertMapper.insert(alert);
}
/**
* task timeout warn
*/

1
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -482,6 +482,7 @@ public class ProcessDao extends AbstractBaseDao {
// set process instance priority
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
processInstance.setWorkerGroupId(command.getWorkerGroupId());
processInstance.setTimeout(processDefinition.getTimeout());
return processInstance;
}

5
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java

@ -94,6 +94,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "receivers", column = "receivers", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "receiversCc", column = "receivers_cc", javaType = String.class, jdbcType = JdbcType.VARCHAR)
@ -121,6 +122,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryByDefineName")
@ -157,6 +159,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "updateTime", column = "update_time", javaType = Timestamp.class, jdbcType = JdbcType.DATE),
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryAllDefinitionList")
@ -183,6 +186,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "scheduleReleaseState", column = "schedule_release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefineListPaging")
@ -211,6 +215,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefinitionListByIdList")

4
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java

@ -55,6 +55,7 @@ public class ProcessDefinitionMapperProvider {
VALUES("`connects`", "#{processDefinition.connects}");
VALUES("`create_time`", "#{processDefinition.createTime}");
VALUES("`update_time`", "#{processDefinition.updateTime}");
VALUES("`timeout`", "#{processDefinition.timeout}");
VALUES("`flag`", EnumFieldUtil.genFieldStr("processDefinition.flag", ReleaseState.class));
VALUES("`user_id`", "#{processDefinition.userId}");
@ -100,6 +101,7 @@ public class ProcessDefinitionMapperProvider {
SET("`global_params`=#{processDefinition.globalParams}");
SET("`create_time`=#{processDefinition.createTime}");
SET("`update_time`=#{processDefinition.updateTime}");
SET("`timeout`=#{processDefinition.timeout}");
SET("`flag`="+EnumFieldUtil.genFieldStr("processDefinition.flag", Flag.class));
SET("`user_id`=#{processDefinition.userId}");
@ -173,7 +175,7 @@ public class ProcessDefinitionMapperProvider {
*/
public String queryDefineListPaging(Map<String, Object> parameter) {
return new SQL() {{
SELECT("td.id,td.name,td.version,td.release_state,td.project_id,td.user_id,td.`desc`,td.create_time,td.update_time,td.flag,td.global_params,td.receivers,td.receivers_cc,sc.schedule_release_state");
SELECT("td.*,sc.schedule_release_state");
FROM(TABLE_NAME + " td");
LEFT_OUTER_JOIN(" (select process_definition_id,release_state as schedule_release_state from `t_escheduler_schedules` " +
"group by `process_definition_id`,`release_state`) sc on sc.process_definition_id = td.id");

11
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java

@ -95,6 +95,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "tenantCode", column = "tenant_code", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById")
@ -133,6 +134,7 @@ public interface ProcessInstanceMapper {
@Result(property = "historyCmd", column = "history_cmd", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById")
@ -171,6 +173,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -209,6 +212,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -256,6 +260,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -352,6 +357,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -444,6 +450,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -488,6 +495,7 @@ public interface ProcessInstanceMapper {
@Result(property = "duration", column = "duration", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -532,6 +540,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -574,6 +583,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess")
@ -616,6 +626,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess")

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -68,6 +68,7 @@ public class ProcessInstanceMapperProvider {
VALUES("`is_sub_process`", EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
VALUES("`executor_id`", "#{processInstance.executorId}");
VALUES("`worker_group_id`", "#{processInstance.workerGroupId}");
VALUES("`timeout`", "#{processInstance.timeout}");
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class));
}
}.toString();
@ -141,6 +142,7 @@ public class ProcessInstanceMapperProvider {
SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
SET("`executor_id`=#{processInstance.executorId}");
SET("`worker_group_id`=#{processInstance.workerGroupId}");
SET("`timeout`=#{processInstance.timeout}");
WHERE("`id`=#{processInstance.id}");

10
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java

@ -37,6 +37,9 @@ public class ProcessData {
private List<Property> globalParams;
private int timeout;
public ProcessData() {
}
@ -82,4 +85,11 @@ public class ProcessData {
this.globalParams = globalParams;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
}

15
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java

@ -136,6 +136,11 @@ public class ProcessDefinition {
*/
private ReleaseState scheduleReleaseState;
/**
* process warning time out. unit: minute
*/
private int timeout;
public String getName() {
return name;
@ -316,6 +321,14 @@ public class ProcessDefinition {
this.scheduleReleaseState = scheduleReleaseState;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public String toString() {
return "ProcessDefinition{" +
@ -340,6 +353,8 @@ public class ProcessDefinition {
", receivers='" + receivers + '\'' +
", receiversCc='" + receiversCc + '\'' +
", scheduleReleaseState=" + scheduleReleaseState +
", timeout=" + timeout +
'}';
}
}

15
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@ -183,6 +183,11 @@ public class ProcessInstance {
*/
private int workerGroupId;
/**
* process timeout for warning
*/
private int timeout;
public ProcessInstance(){
}
@ -495,6 +500,14 @@ public class ProcessInstance {
this.workerGroupId = workerGroupId;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public String toString() {
return "ProcessInstance{" +
@ -528,7 +541,9 @@ public class ProcessInstance {
", historyCmd='" + historyCmd + '\'' +
", dependenceScheduleTimes='" + dependenceScheduleTimes + '\'' +
", duration=" + duration +
", timeout=" + timeout +
", processInstancePriority=" + processInstancePriority +
'}';
}
}

6
escheduler-dao/src/main/resources/dao/data_source.properties

@ -1,9 +1,9 @@
# base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root@123
spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8
spring.datasource.username=xx
spring.datasource.password=xx
# connection configuration
spring.datasource.initialSize=5

32
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

@ -258,7 +258,7 @@ public class MasterExecThread implements Runnable {
processDao.createRecoveryWaitingThreadCommand(null, processInstance);
}
List<TaskInstance> taskInstances = processDao.findValidTaskListByProcessId(processInstance.getId());
alertManager.sendWarnningOfProcessInstance(processInstance, taskInstances);
alertManager.sendAlertProcessInstance(processInstance, taskInstances);
}
@ -775,8 +775,15 @@ public class MasterExecThread implements Runnable {
private void runProcess(){
// submit start node
submitPostNode(null);
// submitStandByTask();
boolean sendTimeWarning = false;
while(!processInstance.IsProcessInstanceStop()){
// send warning email if process time out.
if( !sendTimeWarning && checkProcessTimeOut(processInstance) ){
alertManager.sendProcessTimeoutAlert(processInstance,
processDao.findProcessDefineById(processInstance.getProcessDefinitionId()));
sendTimeWarning = true;
}
Set<MasterBaseTaskExecThread> keys = activeTaskNode.keySet();
for (MasterBaseTaskExecThread taskExecThread : keys) {
Future<Boolean> future = activeTaskNode.get(taskExecThread);
@ -821,7 +828,7 @@ public class MasterExecThread implements Runnable {
}
// send alert
if(this.recoverToleranceFaultTaskList.size() > 0){
alertManager.sendWarnningWorkerleranceFault(processInstance, recoverToleranceFaultTaskList);
alertManager.sendAlertWorkerToleranceFault(processInstance, recoverToleranceFaultTaskList);
this.recoverToleranceFaultTaskList.clear();
}
// updateProcessInstance completed task status
@ -851,6 +858,25 @@ public class MasterExecThread implements Runnable {
logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}
/**
* check process time out
* @param processInstance
* @return
*/
private boolean checkProcessTimeOut(ProcessInstance processInstance) {
if(processInstance.getTimeout() == 0 ){
return false;
}
Date now = new Date();
long runningTime = DateUtils.differMs(now, processInstance.getStartTime());
if(runningTime > processInstance.getTimeout()){
return true;
}
return false;
}
private boolean canSubmitTaskToQueue() {
return OSUtils.checkResource(conf, true);
}

58
escheduler-server/src/main/java/cn/escheduler/server/utils/AlertManager.java

@ -26,6 +26,7 @@ import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.AlertDao;
import cn.escheduler.dao.DaoFactory;
import cn.escheduler.dao.model.Alert;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import org.slf4j.Logger;
@ -54,27 +55,27 @@ public class AlertManager {
private String getCommandCnName(CommandType commandType) {
switch (commandType) {
case RECOVER_TOLERANCE_FAULT_PROCESS:
return "恢复容错";
return "recover tolerance fault process";
case RECOVER_SUSPENDED_PROCESS:
return "恢复暂停流程";
return "recover suspended process";
case START_CURRENT_TASK_PROCESS:
return "从当前节点开始执行";
return "start current task process";
case START_FAILURE_TASK_PROCESS:
return "从失败节点开始执行";
return "start failure task process";
case START_PROCESS:
return "启动工作流";
return "start process";
case REPEAT_RUNNING:
return "重跑";
return "repeat running";
case SCHEDULER:
return "定时执行";
return "scheduler";
case COMPLEMENT_DATA:
return "补数";
return "complement data";
case PAUSE:
return "暂停工作流";
return "pause";
case STOP:
return "停止工作流";
return "stop";
default:
return "未知的命令类型";
return "unknown type";
}
}
@ -124,14 +125,14 @@ public class AlertManager {
continue;
}
LinkedHashMap<String, String> failedTaskMap = new LinkedHashMap();
failedTaskMap.put("任务id", String.valueOf(task.getId()));
failedTaskMap.put("任务名称", task.getName());
failedTaskMap.put("任务类型", task.getTaskType());
failedTaskMap.put("任务状态", task.getState().toString());
failedTaskMap.put("任务开始时间", DateUtils.dateToString(task.getStartTime()));
failedTaskMap.put("任务结束时间", DateUtils.dateToString(task.getEndTime()));
failedTaskMap.put("task id", String.valueOf(task.getId()));
failedTaskMap.put("task name", task.getName());
failedTaskMap.put("task type", task.getTaskType());
failedTaskMap.put("task state", task.getState().toString());
failedTaskMap.put("task start time", DateUtils.dateToString(task.getStartTime()));
failedTaskMap.put("task end time", DateUtils.dateToString(task.getEndTime()));
failedTaskMap.put("host", task.getHost());
failedTaskMap.put("日志路径", task.getLogPath());
failedTaskMap.put("log path", task.getLogPath());
failedTaskList.add(failedTaskMap);
}
res = JSONUtils.toJson(failedTaskList);
@ -152,10 +153,10 @@ public class AlertManager {
for(TaskInstance taskInstance: toleranceTaskList){
LinkedHashMap<String, String> toleranceWorkerContentMap = new LinkedHashMap();
toleranceWorkerContentMap.put("工作流程名称", processInstance.getName());
toleranceWorkerContentMap.put("容错任务名称", taskInstance.getName());
toleranceWorkerContentMap.put("容错机器IP", taskInstance.getHost());
toleranceWorkerContentMap.put("任务失败次数", String.valueOf(taskInstance.getRetryTimes()));
toleranceWorkerContentMap.put("process name", processInstance.getName());
toleranceWorkerContentMap.put("task name", taskInstance.getName());
toleranceWorkerContentMap.put("host", taskInstance.getHost());
toleranceWorkerContentMap.put("task retry times", String.valueOf(taskInstance.getRetryTimes()));
toleranceTaskInstanceList.add(toleranceWorkerContentMap);
}
return JSONUtils.toJson(toleranceTaskInstanceList);
@ -166,9 +167,9 @@ public class AlertManager {
* @param processInstance
* @param toleranceTaskList
*/
public void sendWarnningWorkerleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
public void sendAlertWorkerToleranceFault(ProcessInstance processInstance, List<TaskInstance> toleranceTaskList){
Alert alert = new Alert();
alert.setTitle("worker容错报警");
alert.setTitle("worker fault tolerance");
alert.setShowType(ShowType.TABLE);
String content = getWorkerToleranceContent(processInstance, toleranceTaskList);
alert.setContent(content);
@ -187,8 +188,8 @@ public class AlertManager {
* send process instance alert
* @param processInstance
*/
public void sendWarnningOfProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances){
public void sendAlertProcessInstance(ProcessInstance processInstance,
List<TaskInstance> taskInstances){
boolean sendWarnning = false;
WarningType warningType = processInstance.getWarningType();
@ -217,7 +218,7 @@ public class AlertManager {
String cmdName = getCommandCnName(processInstance.getCommandType());
String success = processInstance.getState().typeIsSuccess() ? "成功" :"失败";
String success = processInstance.getState().typeIsSuccess() ? "success" :"failed";
alert.setTitle(cmdName + success);
ShowType showType = processInstance.getState().typeIsSuccess() ? ShowType.TEXT : ShowType.TABLE;
alert.setShowType(showType);
@ -233,4 +234,7 @@ public class AlertManager {
logger.info("add alert to db , alert: {}", alert.toString());
}
public void sendProcessTimeoutAlert(ProcessInstance processInstance, ProcessDefinition processDefinition) {
alertDao.sendProcessTimeoutAlert(processInstance, processDefinition);
}
}

4
escheduler-server/src/test/java/cn/escheduler/server/master/AlertManagerTest.java

@ -76,7 +76,7 @@ public class AlertManagerTest {
toleranceTaskList.add(toleranceTask1);
toleranceTaskList.add(toleranceTask2);
alertManager.sendWarnningWorkerleranceFault(processInstance, toleranceTaskList);
alertManager.sendAlertWorkerToleranceFault(processInstance, toleranceTaskList);
}
@ -103,7 +103,7 @@ public class AlertManagerTest {
toleranceTaskList.add(toleranceTask1);
toleranceTaskList.add(toleranceTask2);
alertManager.sendWarnningOfProcessInstance(processInstance, toleranceTaskList);
alertManager.sendAlertProcessInstance(processInstance, toleranceTaskList);
}
}

44
sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql

@ -200,4 +200,46 @@ d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_worker_group_id;
-- ac_escheduler_T_t_escheduler_process_instance_C_timeout
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_instance_C_timeout;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_timeout()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='timeout')
THEN
ALTER TABLE `t_escheduler_process_instance` ADD COLUMN `timeout` int(11) NULL DEFAULT 0 COMMENT '超时时间' AFTER `worker_group_id`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_instance_C_timeout;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_timeout;
-- ac_escheduler_T_t_escheduler_process_definition_C_timeout
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_definition_C_timeout;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_timeout()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='timeout')
THEN
ALTER TABLE `t_escheduler_process_definition` ADD COLUMN `timeout` int(11) NULL DEFAULT 0 COMMENT '超时时间' AFTER `create_time`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_definition_C_timeout;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_timeout;
Loading…
Cancel
Save