Browse Source

[Fix-#4716][mater,worker] The task execution path should be calculated by the worker, not the master (#4717)

* [fix][api,mater,worker] 修复master 计算实际执行路径,worker上报实际执行路径

* remove unused import
pull/3/MERGE
guohaozhang 3 years ago committed by GitHub
parent
commit
7a6b905597
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java
  2. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 1
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java

@ -44,7 +44,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setStartTime(taskInstance.getStartTime()); taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType()); taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setHost(taskInstance.getHost());

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -35,7 +35,6 @@ import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParamete
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
@ -222,7 +221,6 @@ public class TaskPriorityQueueConsumer extends Thread {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setExecutePath(getExecLocalPath(taskInstance));
taskInstance.setResources(getResourceFullNames(taskNode)); taskInstance.setResources(getResourceFullNames(taskNode));
SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext();
@ -363,18 +361,6 @@ public class TaskPriorityQueueConsumer extends Thread {
} }
} }
/**
* get execute local path
*
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance) {
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/** /**
* whehter tenant is null * whehter tenant is null
* *

1
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -147,6 +147,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
// local execute path // local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext); String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {} ", execLocalPath); logger.info("task instance local execute path : {} ", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
FileUtils.taskLoggerThreadLocal.set(taskLogger); FileUtils.taskLoggerThreadLocal.set(taskLogger);
try { try {

Loading…
Cancel
Save