Browse Source

[DS-6829][WorkerServer] skip create log dir and print log in dryRun model (#6852)

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
e6239e808b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  3. 38
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

1
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -766,4 +766,5 @@ public final class Constants {
* dry run flag
*/
public static final int DRY_RUN_FLAG_NO = 0;
public static final int DRY_RUN_FLAG_YES = 1;
}

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
@ -135,19 +136,21 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
}
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),

38
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -125,8 +125,16 @@ public class TaskExecuteThread implements Runnable, Delayed {
@Override
public void run() {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
responseCommand.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
return;
}
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// check if the OS user exists
@ -146,13 +154,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
int dryRun = taskExecutionContext.getDryRun();
// copy hdfs/minio file to local
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
logger);
}
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@ -177,31 +180,28 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskRequest.setTaskLogName(taskLogName);
task = taskChannel.createTask(taskRequest);
// task init
this.task.init();
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
// task handle
this.task.handle();
// task handle
this.task.handle();
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
} else {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo());
}
responseCommand.setStatus(this.task.getExitStatus().getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());

Loading…
Cancel
Save