Browse Source

Merge remote-tracking branch 'upstream/dev-1.1.0' into dev-1.1.0

pull/2/head
huyuanming 6 years ago
parent
commit
a6dbc1a1b5
  1. 2
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java
  2. 7
      escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java
  3. 6
      escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java
  4. 15
      escheduler-common/src/main/java/cn/escheduler/common/enums/ZKNodeType.java
  5. 47
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  6. 5
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java
  7. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java
  8. 11
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java
  9. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  10. 7
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java
  11. 17
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java
  12. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java
  13. 10
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java
  14. 13
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java
  15. 14
      escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java
  16. 125
      escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java
  17. 6
      escheduler-dao/src/main/resources/dao/data_source.properties
  18. 51
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
  19. 5
      escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
  20. 12
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  21. 199
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java
  22. 42
      escheduler-server/src/test/java/cn/escheduler/server/master/MasterCommandTest.java
  23. 11
      escheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue
  24. 5
      escheduler-ui/src/js/conf/home/pages/dag/definitionDetails.vue
  25. 7
      escheduler-ui/src/js/conf/home/pages/dag/index.vue
  26. 7
      escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue
  27. 115
      escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/statistics.vue
  28. 24
      escheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue
  29. 8
      escheduler-ui/src/js/conf/home/router/index.js
  30. 6
      escheduler-ui/src/js/conf/home/store/dag/actions.js
  31. 7
      escheduler-ui/src/js/conf/home/store/dag/mutations.js
  32. 2
      escheduler-ui/src/js/conf/home/store/dag/state.js
  33. 8
      escheduler-ui/src/js/conf/home/store/security/actions.js
  34. 3
      escheduler-ui/src/js/conf/home/store/security/state.js
  35. 18
      escheduler-ui/src/js/module/components/secondaryMenu/_source/menu.js
  36. 8
      escheduler-ui/src/js/module/i18n/locale/en_US.js
  37. 7
      escheduler-ui/src/js/module/i18n/locale/zh_CN.js
  38. 41
      sql/upgrade/1.1.0_schema/mysql/escheduler_ddl.sql
  39. 1
      sql/upgrade/1.1.0_schema/mysql/escheduler_dml.sql

2
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessDefinitionService.java

@ -128,6 +128,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
//custom global params
List<Property> globalParamsList = processData.getGlobalParams();
@ -292,6 +293,7 @@ public class ProcessDefinitionService extends BaseDAGService {
processDefine.setLocations(locations);
processDefine.setConnects(connects);
processDefine.setTimeout(processData.getTimeout());
processDefine.setTenantId(processData.getTenantId());
//custom global params
List<Property> globalParamsList = new ArrayList<>();

7
escheduler-api/src/main/java/cn/escheduler/api/service/ProcessInstanceService.java

@ -364,6 +364,7 @@ public class ProcessInstanceService extends BaseDAGService {
String globalParams = null;
String originDefParams = null;
int timeout = processInstance.getTimeout();
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
if (StringUtils.isNotEmpty(processInstanceJson)) {
ProcessData processData = JSONUtils.parseObject(processInstanceJson, ProcessData.class);
//check workflow json is valid
@ -379,6 +380,11 @@ public class ProcessInstanceService extends BaseDAGService {
processInstance.getCmdTypeIfComplement(), schedule);
timeout = processData.getTimeout();
processInstance.setTimeout(timeout);
Tenant tenant = processDao.getTenantForProcess(processData.getTenantId(),
processDefinition.getUserId());
if(tenant != null){
processInstance.setTenantCode(tenant.getTenantCode());
}
processInstance.setProcessInstanceJson(processInstanceJson);
processInstance.setGlobalParams(globalParams);
}
@ -387,7 +393,6 @@ public class ProcessInstanceService extends BaseDAGService {
int update = processDao.updateProcessInstance(processInstance);
int updateDefine = 1;
if (syncDefine && StringUtils.isNotEmpty(processInstanceJson)) {
ProcessDefinition processDefinition = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
processDefinition.setProcessDefinitionJson(processInstanceJson);
processDefinition.setGlobalParams(originDefParams);
processDefinition.setLocations(locations);

6
escheduler-api/src/main/java/cn/escheduler/api/service/TenantService.java

@ -125,9 +125,9 @@ public class TenantService extends BaseService{
public Map<String,Object> queryTenantList(User loginUser, String searchVal, Integer pageNo, Integer pageSize) {
Map<String, Object> result = new HashMap<>(5);
if (checkAdmin(loginUser, result)) {
return result;
}
// if (checkAdmin(loginUser, result)) {
// return result;
// }
Integer count = tenantMapper.countTenantPaging(searchVal);

15
escheduler-common/src/main/java/cn/escheduler/common/enums/ZKNodeType.java

@ -0,0 +1,15 @@
package cn.escheduler.common.enums;
/**
* zk node type
*/
public enum ZKNodeType {
/**
* 0 do not send warning;
* 1 send if process success;
* 2 send if process failed;
* 3 send if process ending;
*/
MASTER, WORKER, DEAD_SERVER, TASK_QUEUE;
}

47
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -58,6 +58,7 @@ public class ProcessDao extends AbstractBaseDao {
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
// ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
@Autowired
@ -96,6 +97,12 @@ public class ProcessDao extends AbstractBaseDao {
@Autowired
private ErrorCommandMapper errorCommandMapper;
@Autowired
private WorkerServerMapper workerServerMapper;
@Autowired
private TenantMapper tenantMapper;
/**
* task queue impl
*/
@ -121,7 +128,9 @@ public class ProcessDao extends AbstractBaseDao {
udfFuncMapper = getMapper(UdfFuncMapper.class);
resourceMapper = getMapper(ResourceMapper.class);
workerGroupMapper = getMapper(WorkerGroupMapper.class);
workerServerMapper = getMapper(WorkerServerMapper.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance();
tenantMapper = getMapper(TenantMapper.class);
}
@ -485,9 +494,30 @@ public class ProcessDao extends AbstractBaseDao {
processInstance.setProcessInstancePriority(command.getProcessInstancePriority());
processInstance.setWorkerGroupId(command.getWorkerGroupId());
processInstance.setTimeout(processDefinition.getTimeout());
processInstance.setTenantId(processDefinition.getTenantId());
return processInstance;
}
/**
* get process tenant
* there is tenant id in definition, use the tenant of the definition.
* if there is not tenant id in the definiton or the tenant not exist
* use definition creator's tenant.
* @param tenantId
* @param userId
* @return
*/
public Tenant getTenantForProcess(int tenantId, int userId){
Tenant tenant = null;
if(tenantId >= 0){
tenant = tenantMapper.queryById(tenantId);
}
if(tenant == null){
User user = userMapper.queryById(userId);
tenant = tenantMapper.queryById(user.getTenantId());
}
return tenant;
}
/**
* check command parameters is valid
@ -581,6 +611,8 @@ public class ProcessDao extends AbstractBaseDao {
processInstance.setScheduleTime(command.getScheduleTime());
}
processInstance.setHost(host);
ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXEUTION;
int runTime = processInstance.getRunTimes();
switch (commandType){
case START_PROCESS:
@ -621,6 +653,7 @@ public class ProcessDao extends AbstractBaseDao {
case RECOVER_TOLERANCE_FAULT_PROCESS:
// recover tolerance fault process
processInstance.setRecovery(Flag.YES);
runStatus = processInstance.getState();
break;
case COMPLEMENT_DATA:
// delete all the valid tasks when complement data
@ -652,7 +685,7 @@ public class ProcessDao extends AbstractBaseDao {
default:
break;
}
processInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
processInstance.setState(runStatus);
return processInstance;
}
@ -1566,7 +1599,6 @@ public class ProcessDao extends AbstractBaseDao {
for (ProcessInstance processInstance:processInstanceList){
processNeedFailoverProcessInstances(processInstance);
}
}
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)
@ -1633,6 +1665,17 @@ public class ProcessDao extends AbstractBaseDao {
return workerGroupMapper.queryById(workerGroupId);
}
/**
* query worker server by host
* @param host
* @return
*/
public List<WorkerServer> queryWorkerServerByHost(String host){
return workerServerMapper.queryWorkerByHost(host);
}
}

5
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapper.java

@ -95,6 +95,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "receivers", column = "receivers", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "receiversCc", column = "receivers_cc", javaType = String.class, jdbcType = JdbcType.VARCHAR)
@ -123,6 +124,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "locations", column = "locations", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryByDefineName")
@ -160,6 +162,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "flag", column = "flag", typeHandler = EnumOrdinalTypeHandler.class, javaType = Flag.class, jdbcType = JdbcType.TINYINT),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryAllDefinitionList")
@ -187,6 +190,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "scheduleReleaseState", column = "schedule_release_state", typeHandler = EnumOrdinalTypeHandler.class, javaType = ReleaseState.class, jdbcType = JdbcType.TINYINT),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefineListPaging")
@ -216,6 +220,7 @@ public interface ProcessDefinitionMapper {
@Result(property = "connects", column = "connects", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "userName", column = "user_name", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "projectName", column = "project_name", javaType = String.class, jdbcType = JdbcType.VARCHAR)
})
@SelectProvider(type = ProcessDefinitionMapperProvider.class, method = "queryDefinitionListByIdList")

2
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessDefinitionMapperProvider.java

@ -56,6 +56,7 @@ public class ProcessDefinitionMapperProvider {
VALUES("`create_time`", "#{processDefinition.createTime}");
VALUES("`update_time`", "#{processDefinition.updateTime}");
VALUES("`timeout`", "#{processDefinition.timeout}");
VALUES("`tenant_id`", "#{processDefinition.tenantId}");
VALUES("`flag`", EnumFieldUtil.genFieldStr("processDefinition.flag", ReleaseState.class));
VALUES("`user_id`", "#{processDefinition.userId}");
@ -102,6 +103,7 @@ public class ProcessDefinitionMapperProvider {
SET("`create_time`=#{processDefinition.createTime}");
SET("`update_time`=#{processDefinition.updateTime}");
SET("`timeout`=#{processDefinition.timeout}");
SET("`tenant_id`=#{processDefinition.tenantId}");
SET("`flag`="+EnumFieldUtil.genFieldStr("processDefinition.flag", Flag.class));
SET("`user_id`=#{processDefinition.userId}");

11
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapper.java

@ -97,6 +97,7 @@ public interface ProcessInstanceMapper {
@Result(property = "queue", column = "queue", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryDetailById")
@ -136,6 +137,7 @@ public interface ProcessInstanceMapper {
@Result(property = "dependenceScheduleTimes", column = "dependence_schedule_times", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryById")
@ -175,6 +177,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -214,6 +217,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -262,6 +266,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -359,6 +364,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -452,6 +458,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -497,6 +504,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -542,6 +550,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@ -585,6 +594,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastRunningProcess")
@ -628,6 +638,7 @@ public interface ProcessInstanceMapper {
@Result(property = "processInstanceJson", column = "process_instance_json", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "workerGroupId", column = "worker_group_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "timeout", column = "timeout", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "tenantId", column = "tenant_id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "processInstancePriority", column = "process_instance_priority", javaType = Priority.class, typeHandler = EnumOrdinalTypeHandler.class, jdbcType = JdbcType.TINYINT)
})
@SelectProvider(type = ProcessInstanceMapperProvider.class, method = "queryLastManualProcess")

15
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -69,6 +69,7 @@ public class ProcessInstanceMapperProvider {
VALUES("`executor_id`", "#{processInstance.executorId}");
VALUES("`worker_group_id`", "#{processInstance.workerGroupId}");
VALUES("`timeout`", "#{processInstance.timeout}");
VALUES("`tenant_id`", "#{processInstance.tenantId}");
VALUES("`process_instance_priority`", EnumFieldUtil.genFieldStr("processInstance.processInstancePriority", Priority.class));
}
}.toString();
@ -141,6 +142,7 @@ public class ProcessInstanceMapperProvider {
SET("`dependence_schedule_times`=#{processInstance.dependenceScheduleTimes}");
SET("`is_sub_process`="+EnumFieldUtil.genFieldStr("processInstance.isSubProcess", Flag.class));
SET("`executor_id`=#{processInstance.executorId}");
SET("`tenant_id`=#{processInstance.tenantId}");
SET("`worker_group_id`=#{processInstance.workerGroupId}");
SET("`timeout`=#{processInstance.timeout}");
@ -220,11 +222,11 @@ public class ProcessInstanceMapperProvider {
public String queryDetailById(Map<String, Object> parameter) {
return new SQL() {
{
SELECT("inst.*,q.queue_name as queue,t.tenant_code,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_tenant t,t_escheduler_queue q");
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q");
WHERE("inst.executor_id = u.id AND u.tenant_id = t.id AND t.queue_id = q.id AND inst.id = #{processId}");
WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}");
}
}.toString();
}
@ -402,7 +404,12 @@ public class ProcessInstanceMapperProvider {
FROM(TABLE_NAME);
WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")");
Object host = parameter.get("host");
if(host != null && StringUtils.isNotEmpty(host.toString())){
WHERE("`host` = #{host} ");
}
WHERE("`state` in (" + strStates.toString() +")");
ORDER_BY("`id` asc");

7
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java

@ -228,7 +228,12 @@ public class TaskInstanceMapperProvider {
SELECT("*, UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time) as duration");
FROM(TABLE_NAME);
WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")");
Object host = parameter.get("host");
if(host != null && StringUtils.isNotEmpty(host.toString())){
WHERE("`host` = #{host} ");
}
WHERE("`state` in (" + strStates.toString() +")");
ORDER_BY("`id` asc");
}
}.toString();

17
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java

@ -42,6 +42,23 @@ public interface WorkerServerMapper {
@SelectProvider(type = WorkerServerMapperProvider.class, method = "queryAllWorker")
List<WorkerServer> queryAllWorker();
/**
* query worker list
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "host", column = "host", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "port", column = "port", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "zkDirectory", column = "zk_directory", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "resInfo", column = "res_info", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "lastHeartbeatTime", column = "last_heartbeat_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP)
})
@SelectProvider(type = WorkerServerMapperProvider.class, method = "queryWorkerByHost")
List<WorkerServer> queryWorkerByHost(@Param("host") String host);
/**
* insert worker server
*

15
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java

@ -37,6 +37,21 @@ public class WorkerServerMapperProvider {
}}.toString();
}
/**
* query worker list
* @return
*/
public String queryWorkerByHost(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("host = #{host}");
}}.toString();
}
/**
* insert worker server
* @param parameter

10
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessData.java

@ -39,6 +39,8 @@ public class ProcessData {
private int timeout;
private int tenantId;
public ProcessData() {
}
@ -92,4 +94,12 @@ public class ProcessData {
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTenantId() {
return tenantId;
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
}

13
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessDefinition.java

@ -141,6 +141,11 @@ public class ProcessDefinition {
*/
private int timeout;
/**
* tenant id
*/
private int tenantId;
public String getName() {
return name;
@ -354,7 +359,15 @@ public class ProcessDefinition {
", receiversCc='" + receiversCc + '\'' +
", scheduleReleaseState=" + scheduleReleaseState +
", timeout=" + timeout +
", tenantId=" + tenantId +
'}';
}
public int getTenantId() {
return tenantId;
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
}

14
escheduler-dao/src/main/java/cn/escheduler/dao/model/ProcessInstance.java

@ -188,6 +188,12 @@ public class ProcessInstance {
*/
private int timeout;
/**
* tenant id
*/
private int tenantId;
public ProcessInstance(){
}
@ -534,6 +540,7 @@ public class ProcessInstance {
", processInstanceJson='" + processInstanceJson + '\'' +
", executorId=" + executorId +
", tenantCode='" + tenantCode + '\'' +
", tenantId='" + tenantId + '\'' +
", queue='" + queue + '\'' +
", isSubProcess=" + isSubProcess +
", locations='" + locations + '\'' +
@ -546,4 +553,11 @@ public class ProcessInstance {
'}';
}
public void setTenantId(int tenantId) {
this.tenantId = tenantId;
}
public int getTenantId() {
return this.tenantId ;
}
}

125
escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java

@ -18,16 +18,20 @@ package cn.escheduler.dao.utils;
import cn.escheduler.common.enums.TaskDependType;
import cn.escheduler.common.graph.DAG;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.model.TaskNodeRelation;
import cn.escheduler.common.process.ProcessDag;
import cn.escheduler.common.utils.JSONUtils;
import cn.escheduler.dao.model.ProcessData;
import cn.escheduler.dao.model.TaskInstance;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* dag tools
@ -105,8 +109,7 @@ public class DagHelper {
}
for (TaskNode taskNode : tmpTaskNodeList) {
if ( !taskNode.isForbidden()
&& null == findNodeByName(destTaskNodeList, taskNode.getName())) {
if (null == findNodeByName(destTaskNodeList, taskNode.getName())) {
destTaskNodeList.add(taskNode);
}
}
@ -193,6 +196,24 @@ public class DagHelper {
return processDag;
}
/**
* parse the forbidden task nodes in process definition.
* @param processDefinitionJson
* @return
*/
public static Map<String, TaskNode> getForbiddenTaskNodeMaps(String processDefinitionJson){
Map<String, TaskNode> forbidTaskNodeMap = new ConcurrentHashMap<>();
ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class);
List<TaskNode> taskNodeList = processData.getTasks();
for(TaskNode node : taskNodeList){
if(node.isForbidden()){
forbidTaskNodeMap.putIfAbsent(node.getName(), node);
}
}
return forbidTaskNodeMap;
}
/**
* find node by node name
@ -210,4 +231,100 @@ public class DagHelper {
}
return null;
}
/**
* get start vertex in one dag
* it would find the post node if the start vertex is forbidden running
* @param parentNodeName the previous node
* @param dag
* @param completeTaskList
* @return
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
if(completeTaskList == null){
completeTaskList = new HashMap<>();
}
Collection<String> startVertexs = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertexs = dag.getSubsequentNodes(parentNodeName);
}else{
startVertexs = dag.getBeginNode();
}
List<String> tmpStartVertexs = new ArrayList<>();
if(startVertexs!= null){
tmpStartVertexs.addAll(startVertexs);
}
for(String start : startVertexs){
TaskNode startNode = dag.getNode(start);
if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
continue;
}
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
for(String post : postNodes){
if(checkForbiddenPostCanSubmit(post, dag)){
tmpStartVertexs.add(post);
}
}
tmpStartVertexs.remove(start);
}
return tmpStartVertexs;
}
/**
*
* @param postNodeName
* @param dag
* @return
*/
private static boolean checkForbiddenPostCanSubmit(String postNodeName, DAG<String, TaskNode, TaskNodeRelation> dag){
TaskNode postNode = dag.getNode(postNodeName);
List<String> dependList = postNode.getDepList();
for(String dependNodeName : dependList){
TaskNode dependNode = dag.getNode(dependNodeName);
if(!dependNode.isForbidden()){
return false;
}
}
return true;
}
/***
* generate dag graph
* @param processDag
* @return
*/
public static DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/**
* add vertex
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node);
}
}
/**
* add edge
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode());
}
}
return dag;
}
}

6
escheduler-dao/src/main/resources/dao/data_source.properties

@ -1,9 +1,9 @@
# base spring data source configuration
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://192.168.xx.xx:3306/escheduler?characterEncoding=UTF-8
spring.datasource.username=xx
spring.datasource.password=xx
spring.datasource.url=jdbc:mysql://192.168.220.188:3306/escheduler_new?characterEncoding=UTF-8
spring.datasource.username=root
spring.datasource.password=root@123
# connection configuration
spring.datasource.initialSize=5

51
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

@ -79,6 +79,7 @@ public class MasterExecThread implements Runnable {
private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
private Map<String, TaskInstance> dependFailedTask = new ConcurrentHashMap<>();
private Map<String, TaskNode> forbiddenTaskList = new ConcurrentHashMap<>();
private List<TaskInstance> recoverToleranceFaultTaskList = new ArrayList<>();
private AlertManager alertManager = new AlertManager();
@ -269,6 +270,7 @@ public class MasterExecThread implements Runnable {
private void buildFlowDag() throws Exception {
recoverNodeIdList = getStartTaskInstanceList(processInstance.getCommandParam());
forbiddenTaskList = DagHelper.getForbiddenTaskNodeMaps(processInstance.getProcessInstanceJson());
// generate process to get DAG info
List<String> recoveryNameList = getRecoveryNodeNameList();
List<String> startNodeNameList = parseStartNodeName(processInstance.getCommandParam());
@ -279,7 +281,8 @@ public class MasterExecThread implements Runnable {
return;
}
// generate process dag
dag = buildDagGraph(processDag);
dag = DagHelper.buildDagGraph(processDag);
}
private void initTaskQueue(){
@ -411,6 +414,8 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
/**
* get post task instance by node
*
@ -421,14 +426,12 @@ public class MasterExecThread implements Runnable {
private List<TaskInstance> getPostTaskInstanceByNode(DAG<String, TaskNode, TaskNodeRelation> dag, String parentNodeName){
List<TaskInstance> postTaskList = new ArrayList<>();
Collection<String> startVertex = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertex = dag.getSubsequentNodes(parentNodeName);
}else{
startVertex = dag.getBeginNode();
Collection<String> startVertex = DagHelper.getStartVertex(parentNodeName, dag, completeTaskList);
if(startVertex == null){
return postTaskList;
}
for (String nodeName : startVertex){
for (String nodeName : startVertex){
// encapsulation task instance
TaskInstance taskInstance = createTaskInstance(processInstance, nodeName ,
dag.getNode(nodeName),parentNodeName);
@ -517,7 +520,10 @@ public class MasterExecThread implements Runnable {
List<String> depsNameList = taskNode.getDepList();
for(String depsNode : depsNameList ){
// dependencies must be all complete
if(forbiddenTaskList.containsKey(depsNode)){
continue;
}
// dependencies must be fully completed
if(!completeTaskList.containsKey(depsNode)){
return DependResult.WAITING;
}
@ -904,35 +910,6 @@ public class MasterExecThread implements Runnable {
}
}
/***
* generate dag graph
* @param processDag
* @return
*/
public DAG<String, TaskNode, TaskNodeRelation> buildDagGraph(ProcessDag processDag) {
DAG<String,TaskNode,TaskNodeRelation> dag = new DAG<>();
/**
* add vertex
*/
if (CollectionUtils.isNotEmpty(processDag.getNodes())){
for (TaskNode node : processDag.getNodes()){
dag.addNode(node.getName(),node);
}
}
/**
* add edge
*/
if (CollectionUtils.isNotEmpty(processDag.getEdges())){
for (TaskNodeRelation edge : processDag.getEdges()){
dag.addEdge(edge.getStartNode(),edge.getEndNode());
}
}
return dag;
}
/**
* whether the retry interval is timed out
* @param taskInstance

5
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java

@ -294,9 +294,8 @@ public class ProcessUtils {
/**
* find logs and kill yarn tasks
* @param taskInstance
* @throws IOException
*/
public static void killYarnJob(TaskInstance taskInstance) throws Exception {
public static void killYarnJob(TaskInstance taskInstance) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
@ -316,7 +315,7 @@ public class ProcessUtils {
} catch (Exception e) {
logger.error("kill yarn job failed : " + e.getMessage(),e);
throw new RuntimeException("kill yarn job fail");
// throw new RuntimeException("kill yarn job fail");
}
}
}

12
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -23,10 +23,7 @@ import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.FileUtils;
import cn.escheduler.common.utils.OSUtils;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerGroup;
import cn.escheduler.dao.model.*;
import cn.escheduler.server.zk.ZKWorkerClient;
import com.cronutils.utils.StringUtils;
import org.apache.commons.configuration.Configuration;
@ -194,9 +191,16 @@ public class FetchTaskThread implements Runnable{
// get process instance
ProcessInstance processInstance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant != null){
processInstance.setTenantCode(tenant.getTenantCode());
}
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine);

199
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -18,6 +18,7 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils;
@ -28,10 +29,11 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerServer;
import cn.escheduler.server.ResInfo;
import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@ -134,7 +136,9 @@ public class ZKMasterClient extends AbstractZKClient {
// check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) {
processDao.masterStartupFaultTolerant();
failoverWorker(null, true);
// processDao.masterStartupFaultTolerant();
failoverMaster(null);
}
}catch (Exception e){
@ -190,31 +194,20 @@ public class ZKMasterClient extends AbstractZKClient {
Date now = new Date();
createTime = now ;
try {
String osHost = OSUtils.getHost();
// encapsulation master znnode
masterZNode = masterZNodeParentPath + "/" + OSUtils.getHost() + "_";
List<String> masterZNodeList = zkClient.getChildren().forPath(masterZNodeParentPath);
if (CollectionUtils.isNotEmpty(masterZNodeList)){
boolean flag = false;
for (String masterZNode : masterZNodeList){
if (masterZNode.startsWith(OSUtils.getHost())){
flag = true;
break;
}
}
if (flag){
logger.error("register failure , master already started on host : {}" , OSUtils.getHost());
// exit system
System.exit(-1);
}
// zookeeper node exists, cannot start a new one.
if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){
logger.error("register failure , master already started on host : {}" , osHost);
// exit system
System.exit(-1);
}
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = getOsInfo(now);
// create temporary sequence nodes for master znode
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(masterZNode, heartbeatZKInfo.getBytes());
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
logger.info("register master node {} success" , masterZNode);
@ -238,6 +231,46 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* check the zookeeper node already exists
* @param host
* @param zkNodeType
* @return
* @throws Exception
*/
private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
String path = null;
switch (zkNodeType){
case MASTER:
path = masterZNodeParentPath;
break;
case WORKER:
path = workerZNodeParentPath;
break;
case DEAD_SERVER:
path = deadServerZNodeParentPath;
break;
default:
break;
}
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
return false;
}
List<String> masterZNodeList = null;
masterZNodeList = zkClient.getChildren().forPath(path);
if (CollectionUtils.isNotEmpty(masterZNodeList)){
for (String masterZNode : masterZNodeList){
if (masterZNode.startsWith(host)){
return true;
}
}
}
return false;
}
/**
* monitor master
*/
@ -279,17 +312,9 @@ public class ZKMasterClient extends AbstractZKClient {
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
}
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
processDao.processNeedFailoverProcessInstances(processInstance);
if(StringUtils.isNotEmpty(masterHost)){
failoverMaster(masterHost);
}
logger.info("master failover end");
}catch (Exception e){
logger.error("master failover failed : " + e.getMessage(),e);
}finally {
@ -331,6 +356,8 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* monitor worker
*/
@ -369,23 +396,9 @@ public class ZKMasterClient extends AbstractZKClient {
alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server");
}
logger.info("start worker failover ...");
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskInstance);
}
//updateProcessInstance state value is NEED_FAULT_TOLERANCE
processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("worker failover end");
if(StringUtils.isNotEmpty(workerHost)){
failoverWorker(workerHost, true);
}
}catch (Exception e){
logger.error("worker failover failed : " + e.getMessage(),e);
}
@ -476,6 +489,95 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* task needs failover if task start before worker starts
*
* @param taskInstance
* @return
*/
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
boolean taskNeedFailover = true;
// if the worker node exists in zookeeper, we must check the task starts after the worker
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
//if task start after worker starts, there is no need to failover the task.
if(checkTaskAfterWorkerStart(taskInstance)){
taskNeedFailover = false;
}
}
return taskNeedFailover;
}
/**
* check task start after the worker server starts.
* @param taskInstance
* @return
*/
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
Date workerServerStartDate = null;
List<WorkerServer> workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost());
if(workerServers.size() > 0){
workerServerStartDate = workerServers.get(0).getCreateTime();
}
if(workerServerStartDate != null){
return taskInstance.getStartTime().after(workerServerStartDate);
}else{
return false;
}
}
/**
* failover worker tasks
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* @param workerHost
*/
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
if(needCheckWorkerAlive){
if(!checkTaskInstanceNeedFailover(taskInstance)){
continue;
}
}
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskInstance);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processDao.saveTaskInstance(taskInstance);
}
//update task Instance state value is NEED_FAULT_TOLERANCE
// processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("end worker[{}] failover ...", workerHost);
}
/**
* failover master tasks
* @param masterHost
*/
private void failoverMaster(String masterHost) {
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
processDao.processNeedFailoverProcessInstances(processInstance);
}
logger.info("master failover end");
}
/**
* get host ip
@ -488,6 +590,7 @@ public class ZKMasterClient extends AbstractZKClient {
if(startIndex >= endIndex){
logger.error("parse ip error");
return "";
}
return path.substring(startIndex, endIndex);
}

42
escheduler-server/src/test/java/cn/escheduler/server/master/MasterCommandTest.java

@ -18,15 +18,27 @@ package cn.escheduler.server.master;
import cn.escheduler.common.enums.CommandType;
import cn.escheduler.common.enums.FailureStrategy;
import cn.escheduler.common.enums.TaskDependType;
import cn.escheduler.common.enums.WarningType;
import cn.escheduler.common.graph.DAG;
import cn.escheduler.common.model.TaskNode;
import cn.escheduler.common.model.TaskNodeRelation;
import cn.escheduler.common.process.ProcessDag;
import cn.escheduler.dao.datasource.ConnectionFactory;
import cn.escheduler.dao.mapper.CommandMapper;
import cn.escheduler.dao.mapper.ProcessDefinitionMapper;
import cn.escheduler.dao.model.Command;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.utils.DagHelper;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
/**
* master test
*/
@ -36,9 +48,14 @@ public class MasterCommandTest {
private CommandMapper commandMapper;
private ProcessDefinitionMapper processDefinitionMapper;
@Before
public void before(){
commandMapper = ConnectionFactory.getSqlSession().getMapper(CommandMapper.class);
processDefinitionMapper = ConnectionFactory.getSqlSession().getMapper(ProcessDefinitionMapper.class);
}
@ -104,4 +121,29 @@ public class MasterCommandTest {
}
@Test
public void testDagHelper(){
ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(19);
try {
ProcessDag processDag = DagHelper.generateFlowDag(processDefinition.getProcessDefinitionJson(),
new ArrayList<>(), new ArrayList<>(), TaskDependType.TASK_POST);
DAG<String,TaskNode,TaskNodeRelation> dag = DagHelper.buildDagGraph(processDag);
Collection<String> start = DagHelper.getStartVertex("1", dag, null);
System.out.println(start.toString());
Map<String, TaskNode> forbidden = DagHelper.getForbiddenTaskNodeMaps(processDefinition.getProcessDefinitionJson());
System.out.println(forbidden);
} catch (Exception e) {
e.printStackTrace();
}
}
}

11
escheduler-ui/src/js/conf/home/pages/dag/_source/udp/udp.vue

@ -26,6 +26,10 @@
</div>
</template>
<div class="title" style="padding-top: 6px;">
<span class="text-b">{{$t('select tenant')}}</span>
<form-tenant v-model="tenantId"></form-tenant>
</div>
<div class="title" style="padding-top: 6px;">
<span>超时告警</span>
<span style="padding-left: 6px;">
@ -73,6 +77,7 @@
import mLocalParams from '../formModel/tasks/_source/localParams'
import disabledState from '@/module/mixin/disabledState'
import Affirm from '../jumpAffirm'
import FormTenant from "./_source/selectTenant";
export default {
name: 'udp',
@ -90,6 +95,8 @@
syncDefine: true,
// Timeout alarm
timeout: 0,
tenantId: -1,
// checked Timeout alarm
checkedTimeout: true
}
@ -116,6 +123,7 @@
this.store.commit('dag/setGlobalParams', _.cloneDeep(this.udpList))
this.store.commit('dag/setName', _.cloneDeep(this.name))
this.store.commit('dag/setTimeout', _.cloneDeep(this.timeout))
this.store.commit('dag/setTenantId', _.cloneDeep(this.tenantId))
this.store.commit('dag/setDesc', _.cloneDeep(this.desc))
this.store.commit('dag/setSyncDefine', this.syncDefine)
},
@ -181,9 +189,10 @@
this.syncDefine = dag.syncDefine
this.timeout = dag.timeout || 0
this.checkedTimeout = this.timeout !== 0
this.tenantId = dag.tenantId || -1
},
mounted () {},
components: { mLocalParams }
components: {FormTenant, mLocalParams }
}
</script>

5
escheduler-ui/src/js/conf/home/pages/dag/definitionDetails.vue

@ -26,7 +26,7 @@
methods: {
...mapMutations('dag', ['resetParams', 'setIsDetails']),
...mapActions('dag', ['getProcessList', 'getResourcesList', 'getProcessDetails']),
...mapActions('security', ['getWorkerGroupsAll']),
...mapActions('security', ['getTenantList','getWorkerGroupsAll']),
/**
* init
*/
@ -43,7 +43,8 @@
// get resource
this.getResourcesList(),
// get worker group list
this.getWorkerGroupsAll()
this.getWorkerGroupsAll(),
this.getTenantList()
]).then((data) => {
let item = data[0]
this.setIsDetails(item.releaseState === 'ONLINE')

7
escheduler-ui/src/js/conf/home/pages/dag/index.vue

@ -25,7 +25,7 @@
methods: {
...mapMutations('dag', ['resetParams']),
...mapActions('dag', ['getProcessList', 'getResourcesList']),
...mapActions('security', ['getWorkerGroupsAll']),
...mapActions('security', ['getTenantList','getWorkerGroupsAll']),
/**
* init
*/
@ -40,7 +40,8 @@
// get resource
this.getResourcesList(),
// get worker group list
this.getWorkerGroupsAll()
this.getWorkerGroupsAll(),
this.getTenantList()
]).then((data) => {
this.isLoading = false
// Whether to pop up the box?
@ -65,4 +66,4 @@
},
components: { mDag, mSpin }
}
</script>
</script>

7
escheduler-ui/src/js/conf/home/pages/dag/instanceDetails.vue

@ -26,7 +26,7 @@
methods: {
...mapMutations('dag', ['setIsDetails', 'resetParams']),
...mapActions('dag', ['getProcessList', 'getResourcesList', 'getInstancedetail']),
...mapActions('security', ['getWorkerGroupsAll']),
...mapActions('security', ['getTenantList','getWorkerGroupsAll']),
/**
* init
*/
@ -43,7 +43,8 @@
// get resources
this.getResourcesList(),
// get worker group list
this.getWorkerGroupsAll()
this.getWorkerGroupsAll(),
this.getTenantList()
]).then((data) => {
let item = data[0]
let flag = false
@ -92,4 +93,4 @@
},
components: { mDag, mSpin, mVariable }
}
</script>
</script>

115
escheduler-ui/src/js/conf/home/pages/monitor/pages/servers/statistics.vue

@ -0,0 +1,115 @@
<template>
<m-list-construction :title="$t('statistics') + $t('Manage')">
<template slot="content">
<div class="servers-wrapper mysql-model" v-show="2">
<div class="row">
<div class="col-md-3">
<div class="text-num-model text">
<div class="title">
<span >{{$t('process number of waiting for running')}}</span>
</div>
<div class="value-p">
<b :style="{color:color[0]}"> {{commandCountData.normalCount}}</b>
</div>
</div>
</div>
<div class="col-md-3">
<div class="text-num-model text">
<div class="title">
<span >{{$t('failure command number')}}}</span>
</div>
<div class="value-p">
<b :style="{color:color[1]}"> {{commandCountData.errorCount}}</b>
</div>
</div>
</div>
<div class="col-md-3">
<div class="text-num-model text">
<div class="title">
<span >{{$t('tasks number of waiting running')}}</span>
</div>
<div class="value-p">
<b :style="{color:color[0]}"> {{queueCount.taskQueue}}</b>
</div>
</div>
</div>
<div class="col-md-3">
<div class="text-num-model text">
<div class="title">
<span >{{$t('task number of ready to kill')}}</span>
</div>
<div class="value-p">
<b :style="{color:color[1]}">{{queueCount.taskKill}}</b>
</div>
</div>
</div>
</div>
</div>
<m-spin :is-spin="isLoading" ></m-spin>
</template>
</m-list-construction>
</template>
<script>
import { mapActions } from 'vuex'
import mSpin from '@/module/components/spin/spin'
import mNoData from '@/module/components/noData/noData'
import themeData from '@/module/echarts/themeData.json'
import mListConstruction from '@/module/components/listConstruction/listConstruction'
export default {
name: 'statistics',
data () {
return {
isLoading: false,
queueCount: {},
commandCountData: {},
color: themeData.color
}
},
props:{},
methods: {
//...mapActions('monitor', ['getDatabaseData'])
// ...mapActions('projects', ['getCommandStateCount']),
...mapActions('projects', ['getQueueCount']),
...mapActions('projects', ['getCommandStateCount']),
},
watch: {},
created () {
this.isLoading = true
this.getQueueCount().then(res => {
this.queueCount = res.data
this.isLoading = false
}).catch(() => {
this.isLoading = false
})
this.getCommandStateCount().then(res => {
let normal = 0
let error = 0
_.forEach(res.data, (v, i) => {
let key = _.keys(v)
if(key[0] == 'errorCount') {
error = error + v.errorCount
}
if(key[1] == 'normalCount'){
normal = normal + v.normalCount
}
}
)
this.commandCountData = {
'normalCount': normal,
'errorCount' : error
}
}).catch( () => {
})
},
mounted () {
},
components: { mListConstruction, mSpin, mNoData }
}
</script>
<style lang="scss" rel="stylesheet/scss">
@import "./servers";
</style>

24
escheduler-ui/src/js/conf/home/pages/projects/pages/index/index.vue

@ -33,30 +33,6 @@
</div>
</div>
</div>
<div class="row" style="padding-top: 20px;">
<div class="col-md-6">
</div>
<div class="col-md-6">
<div class="chart-title">
<span>{{$t('Queue statistics')}}</span>
</div>
<div class="row">
<m-queue-count :search-params="searchParams">
</m-queue-count>
</div>
</div>
</div>
<div class="row">
<div class="col-md-12">
<div class="chart-title" style="margin-bottom: 20px;margin-top: 30px">
<span>{{$t('Command status statistics')}}</span>
</div>
<div>
<m-command-state-count :search-params="searchParams">
</m-command-state-count>
</div>
</div>
</div>
<div class="row">
<div class="col-md-12">
<div class="chart-title" style="margin-bottom: -20px;margin-top: 30px">

8
escheduler-ui/src/js/conf/home/router/index.js

@ -439,6 +439,14 @@ const router = new Router({
meta: {
title: `Mysql`
}
},
{
path: '/monitor/servers/statistics',
name: 'statistics',
component: resolve => require(['../pages/monitor/pages/servers/statistics'], resolve),
meta: {
title: `statistics`
}
}
]
}

6
escheduler-ui/src/js/conf/home/store/dag/actions.js

@ -115,6 +115,7 @@ export default {
// timeout
state.timeout = processDefinitionJson.timeout
state.tenantId = processDefinitionJson.tenantId
resolve(res.data)
}).catch(res => {
reject(res)
@ -146,6 +147,8 @@ export default {
// timeout
state.timeout = processInstanceJson.timeout
state.tenantId = processInstanceJson.tenantId
resolve(res.data)
}).catch(res => {
reject(res)
@ -160,6 +163,7 @@ export default {
let data = {
globalParams: state.globalParams,
tasks: state.tasks,
tenantId: state.tenantId,
timeout: state.timeout
}
io.post(`projects/${state.projectName}/process/save`, {
@ -183,6 +187,7 @@ export default {
let data = {
globalParams: state.globalParams,
tasks: state.tasks,
tenantId: state.tenantId,
timeout: state.timeout
}
io.post(`projects/${state.projectName}/process/update`, {
@ -207,6 +212,7 @@ export default {
let data = {
globalParams: state.globalParams,
tasks: state.tasks,
tenantId: state.tenantId,
timeout: state.timeout
}
io.post(`projects/${state.projectName}/instance/update`, {

7
escheduler-ui/src/js/conf/home/store/dag/mutations.js

@ -58,6 +58,12 @@ export default {
setTimeout (state, payload) {
state.timeout = payload
},
/**
* set tenantId
*/
setTenantId (state, payload) {
state.tenantId = payload
},
/**
* set global params
*/
@ -100,6 +106,7 @@ export default {
state.name = payload && payload.name || ''
state.desc = payload && payload.desc || ''
state.timeout = payload && payload.timeout || 0
state.tenantId = payload && payload.tenantId || -1
state.processListS = payload && payload.processListS || []
state.resourcesListS = payload && payload.resourcesListS || []
state.isDetails = payload && payload.isDetails || false

2
escheduler-ui/src/js/conf/home/store/dag/state.js

@ -31,6 +31,8 @@ export default {
tasks: [],
// Timeout alarm
timeout: 0,
// tenant id
tenantId:-1,
// Node location information
locations: {},
// Node-to-node connection

8
escheduler-ui/src/js/conf/home/store/security/actions.js

@ -240,7 +240,13 @@ export default {
getTenantList ({ state }, payload) {
return new Promise((resolve, reject) => {
io.get(`tenant/list`, payload, res => {
resolve(res.data)
let list=res.data
list.unshift({
id: -1,
tenantName: 'Default'
})
state.tenantAllList = list
resolve(list)
}).catch(e => {
reject(e)
})

3
escheduler-ui/src/js/conf/home/store/security/state.js

@ -15,5 +15,6 @@
* limitations under the License.
*/
export default {
workerGroupsListAll: []
workerGroupsListAll: [],
tenantAllList : []
}

18
escheduler-ui/src/js/module/components/secondaryMenu/_source/menu.js

@ -193,7 +193,7 @@ let menu = {
monitor: [
{
name: `${i18n.$t('Servers manage')}`,
id: 0,
id: 1,
path: '',
isOpen: true,
disabled: true,
@ -242,6 +242,22 @@ let menu = {
disabled: true
}
]
},
{
name: `${i18n.$t('Statistics manage')}`,
id: 0,
path: '',
isOpen: true,
disabled: true,
icon: 'fa-server',
children: [
{
name: "Statistics",
path: 'statistics',
id: 0,
disabled: true
}
]
}
]
}

8
escheduler-ui/src/js/module/i18n/locale/en_US.js

@ -459,5 +459,13 @@ export default {
'Statement cannot be empty': 'Statement cannot be empty',
'Process Define Count': 'Process Define Count',
'Process Instance Running Count': 'Process Instance Running Count',
'process number of waiting for running': 'process number of waiting for running',
'failure command number': 'failure command number',
'tasks number of waiting running': 'tasks number of waiting running',
'task number of ready to kill': '待杀死任务数',
'Statistics manage': 'Statistics manage',
'statistics': 'statistics',
'select tenant':'select tenant',
'Process Instance Running Count': 'Process Instance Running Count',
'Please enter Principal':'Please enter Principal'
}

7
escheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -460,5 +460,12 @@ export default {
'Process Define Count': '流程定义个数',
'Process Instance Running Count': '运行流程实例个数',
'Please select a queue': '请选择队列',
'process number of waiting for running': '待执行的流程数',
'failure command number': '执行失败的命令数',
'tasks number of waiting running': '待运行任务数',
'task number of ready to kill': '待杀死任务数',
'Statistics manage': '统计管理',
'statistics': '统计',
'select tenant':'选择租户',
'Please enter Principal':'请输入Principal'
}

41
sql/upgrade/1.1.0_schema/mysql/escheduler_ddl.sql

@ -0,0 +1,41 @@
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- ac_escheduler_T_t_escheduler_process_definition_C_tenant_id
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_definition_C_tenant_id;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_tenant_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_definition'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='tenant_id')
THEN
ALTER TABLE `t_escheduler_process_definition` ADD COLUMN `tenant_id` int(11) NOT NULL DEFAULT -1 COMMENT '租户id' AFTER `timeout`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_definition_C_tenant_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_definition_C_tenant_id;
-- ac_escheduler_T_t_escheduler_process_instance_C_tenant_id
drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_process_instance_C_tenant_id;
delimiter d//
CREATE PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_tenant_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_escheduler_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME='tenant_id')
THEN
ALTER TABLE `t_escheduler_process_instance` ADD COLUMN `tenant_id` int(11) NOT NULL DEFAULT -1 COMMENT '租户id' AFTER `timeout`;
END IF;
END;
d//
delimiter ;
CALL ac_escheduler_T_t_escheduler_process_instance_C_tenant_id;
DROP PROCEDURE ac_escheduler_T_t_escheduler_process_instance_C_tenant_id;

1
sql/upgrade/1.1.0_schema/mysql/escheduler_dml.sql

@ -0,0 +1 @@
INSERT INTO `t_escheduler_version` (`version`) VALUES ('1.1.0');
Loading…
Cancel
Save