|
|
|
@ -34,8 +34,10 @@ import cn.escheduler.common.task.sql.SqlParameters;
|
|
|
|
|
import cn.escheduler.common.utils.*; |
|
|
|
|
import cn.escheduler.dao.ProcessDao; |
|
|
|
|
import cn.escheduler.dao.TaskRecordDao; |
|
|
|
|
import cn.escheduler.dao.model.ProcessDefinition; |
|
|
|
|
import cn.escheduler.dao.model.ProcessInstance; |
|
|
|
|
import cn.escheduler.dao.model.TaskInstance; |
|
|
|
|
import cn.escheduler.dao.model.Tenant; |
|
|
|
|
import cn.escheduler.server.utils.LoggerUtils; |
|
|
|
|
import cn.escheduler.server.utils.ParamUtils; |
|
|
|
|
import cn.escheduler.server.worker.log.TaskLogger; |
|
|
|
@ -160,20 +162,33 @@ public class TaskScheduleThread implements Callable<Boolean> {
|
|
|
|
|
// set task params
|
|
|
|
|
taskProps.setTaskParams(taskNode.getParams()); |
|
|
|
|
// set tenant code , execute task linux user
|
|
|
|
|
taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); |
|
|
|
|
|
|
|
|
|
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskInstance.getId()); |
|
|
|
|
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); |
|
|
|
|
|
|
|
|
|
taskProps.setScheduleTime(processInstance.getScheduleTime()); |
|
|
|
|
taskProps.setNodeName(taskInstance.getName()); |
|
|
|
|
taskProps.setTaskInstId(taskInstance.getId()); |
|
|
|
|
taskProps.setEnvFile(CommonUtils.getSystemEnvPath()); |
|
|
|
|
|
|
|
|
|
ProcessDefinition processDefine = processDao.findProcessDefineById(processInstance.getProcessDefinitionId()); |
|
|
|
|
|
|
|
|
|
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(), |
|
|
|
|
processDefine.getUserId()); |
|
|
|
|
|
|
|
|
|
if(tenant == null){ |
|
|
|
|
processInstance.setTenantCode(tenant.getTenantCode()); |
|
|
|
|
logger.error("cannot find the tenant, process definition id:{}, tenant id:{}, user id:{}", |
|
|
|
|
processDefine.getId(), processDefine.getTenantId(), processDefine.getUserId() |
|
|
|
|
); |
|
|
|
|
status = ExecutionStatus.FAILURE; |
|
|
|
|
}else{ |
|
|
|
|
taskProps.setTenantCode(taskInstance.getProcessInstance().getTenantCode()); |
|
|
|
|
String queue = processDao.queryQueueByProcessInstanceId(processInstance.getId()); |
|
|
|
|
// set queue
|
|
|
|
|
if (StringUtils.isEmpty(queue)){ |
|
|
|
|
taskProps.setQueue(taskInstance.getProcessInstance().getQueue()); |
|
|
|
|
}else { |
|
|
|
|
taskProps.setQueue(queue); |
|
|
|
|
taskProps.setQueue(tenant.getQueueName()); |
|
|
|
|
} |
|
|
|
|
taskProps.setTaskStartTime(taskInstance.getStartTime()); |
|
|
|
|
taskProps.setDefinedParams(allParamMap); |
|
|
|
@ -201,8 +216,6 @@ public class TaskScheduleThread implements Callable<Boolean> {
|
|
|
|
|
|
|
|
|
|
// job handle
|
|
|
|
|
task.handle(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info("task : {} exit status code : {}", taskProps.getTaskAppId(),task.getExitStatusCode()); |
|
|
|
|
|
|
|
|
|
if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){ |
|
|
|
@ -237,6 +250,7 @@ public class TaskScheduleThread implements Callable<Boolean> {
|
|
|
|
|
}else { |
|
|
|
|
status = ExecutionStatus.FAILURE; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}catch (Exception e){ |
|
|
|
|
logger.error("task escheduler failure : " + e.getMessage(),e); |
|
|
|
|
status = ExecutionStatus.FAILURE ; |
|
|
|
|