Browse Source

fix the bug when worker execute task using queue. and remove checking Tenant user anymore in TaskScheduleThread (#1045)

pull/2/head
Tboy 5 years ago committed by dailidong
parent
commit
8eceb685c9
  1. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java
  2. 97
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/FetchTaskThread.java

@ -192,6 +192,7 @@ public class FetchTaskThread implements Runnable{
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
logger.info("worker fetch taskId : {} from queue ", taskInstId);

97
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskScheduleThread.java

@ -98,61 +98,48 @@ public class TaskScheduleThread implements Runnable {
// get process instance according to tak instance
ProcessInstance processInstance = taskInstance.getProcessInstance();
// get process define according to tak instance
ProcessDefinition processDefine = taskInstance.getProcessDefine();
// get tenant info
Tenant tenant = processDao.getTenantForProcess(processInstance.getTenantId(),
processDefine.getUserId());
if(tenant == null){
logger.error("cannot find the tenant, process definition id:{}, user id:{}",
processDefine.getId(),
processDefine.getUserId());
task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);
}else{
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInstance.getExecutePath(),
processInstance.getScheduleTime(),
taskInstance.getName(),
taskInstance.getTaskType(),
taskInstance.getId(),
CommonUtils.getSystemEnvPath(),
tenant.getTenantCode(),
tenant.getQueue(),
taskInstance.getStartTime(),
getGlobalParamsMap(),
taskInstance.getDependency(),
processInstance.getCmdTypeIfComplement());
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
task = TaskManager.newTask(taskInstance.getTaskType(),
taskProps,
taskLogger);
// task init
task.init();
// task handle
task.handle();
// task result process
task.after();
}
// set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskInstance.getExecutePath(),
processInstance.getScheduleTime(),
taskInstance.getName(),
taskInstance.getTaskType(),
taskInstance.getId(),
CommonUtils.getSystemEnvPath(),
processInstance.getTenantCode(),
processInstance.getQueue(),
taskInstance.getStartTime(),
getGlobalParamsMap(),
taskInstance.getDependency(),
processInstance.getCmdTypeIfComplement());
// set task timeout
setTaskTimeout(taskProps, taskNode);
taskProps.setTaskAppId(String.format("%s_%s_%s",
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
// custom logger
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId()));
task = TaskManager.newTask(taskInstance.getTaskType(),
taskProps,
taskLogger);
// task init
task.init();
// task handle
task.handle();
// task result process
task.after();
}catch (Exception e){
logger.error("task scheduler failure", e);
task.setExitStatusCode(Constants.EXIT_CODE_FAILURE);

Loading…
Cancel
Save