Browse Source

Fix compile error

3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
c488a9f828
  1. 134
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  2. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

134
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -42,8 +42,6 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang.SystemUtils;
import java.util.Date; import java.util.Date;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -133,84 +131,74 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext)); taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) { if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
boolean osUserExistFlag; if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
//if Using distributed is true and Currently supported systems are linux,Should not let it automatically
//create tenants,so TenantAutoCreate has no effect
if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
//use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
} else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode()); OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
} else {
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
}
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId());
logger.error("create executeLocalPath fail", ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
} }
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), // check if the OS user exists
new NettyRemoteChannel(channel, command.getOpaque())); if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
// delay task process taskExecutionContext.getTenantCode(),
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.getDelayTime() * 60L); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
if (remainTime > 0) { taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.setEndTime(new Date());
taskExecutionContext.getTaskInstanceId(), taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
remainTime); return;
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
} }
// submit task to manager // local execute path
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, String execLocalPath = getExecLocalPath(taskExecutionContext);
taskCallbackService, logger.info("task instance local execute path : {}", execLocalPath);
alertClientService, taskExecutionContext.setExecutePath(execLocalPath);
taskPluginManager));
if (!offer) { try {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", FileUtils.createWorkDirIfAbsent(execLocalPath);
workerManager.getDelayQueueSize(), } catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
logger.error("create executeLocalPath fail", ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE); taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext); taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
} }
} }
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s",
taskExecutionContext.getTaskInstanceId(),
remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
}
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
taskCallbackService,
alertClientService,
taskPluginManager,
storageOperate));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
/** /**
* get execute local path * get execute local path
* *
@ -219,9 +207,9 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
*/ */
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) { private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(), return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
} }
} }

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -143,7 +143,6 @@ public class WorkerManagerThread implements Runnable {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
try { try {
taskExecuteThread = workerExecuteQueue.take(); taskExecuteThread = workerExecuteQueue.take();
taskExecuteThread.setStorageOperate(storageOperate);
workerExecService.submit(taskExecuteThread); workerExecService.submit(taskExecuteThread);
} catch (Exception e) { } catch (Exception e) {
logger.error("An unexpected interrupt is happened, " logger.error("An unexpected interrupt is happened, "

Loading…
Cancel
Save