Browse Source

Merge pull request #504 from lenboo/dev-1.1.0

update tenant id
pull/2/head
bao liang 5 years ago committed by GitHub
parent
commit
660b325f6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  2. 6
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  3. 6
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java
  4. 144
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

18
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -642,6 +642,9 @@ public class ProcessDao extends AbstractBaseDao {
// find pause tasks and init task's state // find pause tasks and init task's state
cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING); cmdParam.remove(Constants.CMDPARAM_RECOVERY_START_NODE_STRING);
List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE); List<Integer> suspendedNodeList = this.findTaskIdByInstanceState(processInstance.getId(), ExecutionStatus.PAUSE);
List<Integer> stopNodeList = findTaskIdByInstanceState(processInstance.getId(),
ExecutionStatus.KILL);
suspendedNodeList.addAll(stopNodeList);
for(Integer taskId : suspendedNodeList){ for(Integer taskId : suspendedNodeList){
// 把暂停状态初始化 // 把暂停状态初始化
initTaskInstance(this.findTaskInstanceById(taskId)); initTaskInstance(this.findTaskInstanceById(taskId));
@ -789,13 +792,16 @@ public class ProcessDao extends AbstractBaseDao {
* @param taskInstance * @param taskInstance
*/ */
private void initTaskInstance(TaskInstance taskInstance){ private void initTaskInstance(TaskInstance taskInstance){
if(taskInstance.getState().typeIsFailure() && !taskInstance.isSubProcess()){
taskInstance.setFlag(Flag.NO); if(!taskInstance.isSubProcess()){
updateTaskInstance(taskInstance); if(taskInstance.getState().typeIsCancel() || taskInstance.getState().typeIsFailure()){
}else{ taskInstance.setFlag(Flag.NO);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS); updateTaskInstance(taskInstance);
updateTaskInstance(taskInstance); return;
}
} }
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateTaskInstance(taskInstance);
} }
/** /**

6
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -222,11 +222,11 @@ public class ProcessInstanceMapperProvider {
public String queryDetailById(Map<String, Object> parameter) { public String queryDetailById(Map<String, Object> parameter) {
return new SQL() { return new SQL() {
{ {
SELECT("inst.*,q.queue_name as queue,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration"); SELECT("inst.*,UNIX_TIMESTAMP(inst.end_time)-UNIX_TIMESTAMP(inst.start_time) as duration");
FROM(TABLE_NAME + " inst, t_escheduler_user u,t_escheduler_queue q"); FROM(TABLE_NAME + " inst");
WHERE("inst.executor_id = u.id AND t.queue_id = q.id AND inst.id = #{processId}"); WHERE("inst.id = #{processId}");
} }
}.toString(); }.toString();
} }

6
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/FetchTaskThread.java

@ -195,12 +195,6 @@ public class FetchTaskThread implements Runnable{
// get process define // get process define
ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId()); ProcessDefinition processDefine = processDao.findProcessDefineById(taskInstance.getProcessDefinitionId());
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant != null){
processInstance.setTenantCode(tenant.getTenantCode());
}
taskInstance.setProcessInstance(processInstance); taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processDefine); taskInstance.setProcessDefine(processDefine);

144
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters;
import cn.escheduler.common.utils.*; import cn.escheduler.common.utils.*;
import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.TaskRecordDao; import cn.escheduler.dao.TaskRecordDao;
import cn.escheduler.dao.model.ProcessDefinition;
import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils; import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.log.TaskLogger;
@ -160,82 +162,94 @@ public class TaskScheduleThread implements Callable<Boolean> {
// set task params // set task params
taskProps.setTaskParams(taskNode.getParams()); taskProps.setTaskParams(taskNode.getParams());
// set tenant code , execute task linux user // set tenant code , execute task linux user
taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode());
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId());
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
taskProps.setScheduleTime(processInstance.getScheduleTime()); taskProps.setScheduleTime(processInstance.getScheduleTime());
taskProps.setNodeName(taskInstance.getName()); taskProps.setNodeName(taskInstance.getName());
taskProps.setTaskInstId(taskInstance.getId()); taskProps.setTaskInstId(taskInstance.getId());
taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); taskProps.setEnvFile(CommonUtils.getSystemEnvPath());
// set queue
if (StringUtils.isEmpty(queue)){ ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId());
taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else { Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
taskProps.setQueue(queue); processDefine.getUserId());
}
taskProps.setTaskStartTime(taskInstance.getStartTime()); if(tenant == null){
taskProps.setDefinedParams(allParamMap); processInstance.setTenantCode(tenant.getTenantCode());
logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}",
// set task timeout processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId()
setTaskTimeout(taskProps, taskNode); );
status = ExecutionStatus.FAILURE;
taskProps.setDependence(taskInstance.getDependency()); }else{
taskProps.setTenantCode(tenant.getTenantCode());
taskProps.setTaskAppId(String.format("%s_%s_%s", String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId());
taskInstance.getProcessDefine().getId(), // set queue
taskInstance.getProcessInstance().getId(), if (StringUtils.isEmpty(queue)){
taskInstance.getId())); taskProps.setQueue(taskInstance.getProcessInstance().getQueue());
}else {
// custom logger taskProps.setQueue(tenant.getQueueName());
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, }
taskInstance.getProcessDefine().getId(), taskProps.setTaskStartTime(taskInstance.getStartTime());
taskInstance.getProcessInstance().getId(), taskProps.setDefinedParams(allParamMap);
taskInstance.getId()));
// set task timeout
task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger); setTaskTimeout(taskProps, taskNode);
// job init taskProps.setDependence(taskInstance.getDependency());
task.init();
taskProps.setTaskAppId(String.format("%s_%s_%s",
// job handle taskInstance.getProcessDefine().getId(),
task.handle(); taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); // custom logger
TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX,
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ taskInstance.getProcessDefine().getId(),
status = ExecutionStatus.SUCCESS; taskInstance.getProcessInstance().getId(),
// task recor flat : if true , start up qianfan taskInstance.getId()));
if (TaskRecordDao.getTaskRecordFlag()
&& TaskType.typeIsNormalTask(taskInstance.getTaskType())){ task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass()); // job init
task.init();
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), // job handle
taskProps.getDefinedParams(), task.handle();
params.getLocalParametersMap(), logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode());
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime()); if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
if (paramsMap != null && !paramsMap.isEmpty() status = ExecutionStatus.SUCCESS;
&& paramsMap.containsKey("v_proc_date")){ // task recor flat : if true , start up qianfan
String vProcDate = paramsMap.get("v_proc_date").getValue(); if (TaskRecordDao.getTaskRecordFlag()
if (!StringUtils.isEmpty(vProcDate)){ && TaskType.typeIsNormalTask(taskInstance.getTaskType())){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState); AbstractParameters params = (AbstractParameters) JSONUtils.parseObject(taskProps.getTaskParams(), getCurTaskParamsClass());
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE; // replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(),
params.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
if (paramsMap != null && !paramsMap.isEmpty()
&& paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskInstance.getName(), vProcDate);
logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){
status = ExecutionStatus.FAILURE;
}
} }
} }
} }
}
}else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){ }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
status = ExecutionStatus.KILL; status = ExecutionStatus.KILL;
}else { }else {
status = ExecutionStatus.FAILURE; status = ExecutionStatus.FAILURE;
}
} }
}catch (Exception e){ }catch (Exception e){
logger.error("task escheduler failure : " + e.getMessage(),e); logger.error("task escheduler failure : " + e.getMessage(),e);

Loading…
Cancel
Save