diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequest.java index 8eebb8a354..df0df0eb6f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/log/GetAppIdRequest.java @@ -29,9 +29,9 @@ import lombok.NoArgsConstructor; @NoArgsConstructor public class GetAppIdRequest implements RequestMessageBuilder { - private String logPath; + private int taskInstanceId; - private String appInfoPath; + private String logPath; @Override public MessageType getCommandType() { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/GetAppIdProcessor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/GetAppIdProcessor.java index 6c446aacb1..e5081dc191 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/GetAppIdProcessor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/processor/GetAppIdProcessor.java @@ -22,6 +22,8 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COL import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.remote.command.Message; import org.apache.dolphinscheduler.remote.command.MessageType; @@ -44,7 +46,9 @@ public class GetAppIdProcessor extends BaseLogProcessor implements NettyRequestP public void process(Channel channel, Message message) { GetAppIdRequest getAppIdRequest = JSONUtils.parseObject(message.getBody(), GetAppIdRequest.class); - String appInfoPath = getAppIdRequest.getAppInfoPath(); + TaskExecutionContext taskExecutionContext = + TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId()); + String appInfoPath = taskExecutionContext.getAppInfoPath(); String logPath = getAppIdRequest.getLogPath(); List appIds = LogUtils.getAppIds(logPath, appInfoPath, PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java index b407fe7023..fb71991469 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java @@ -210,8 +210,9 @@ public class LogClient implements AutoCloseable { } } - public @Nullable List getAppIds(@NonNull String host, int port, @NonNull String taskLogFilePath, - @NonNull String taskAppInfoPath) throws RemotingException, InterruptedException { + public @Nullable List getAppIds(@NonNull String host, int port, String taskLogFilePath, + String taskAppInfoPath, + int taskInstanceId) throws RemotingException, InterruptedException { log.info("Begin to get appIds from worker: {}:{} taskLogPath: {}, taskAppInfoPath: {}", host, port, taskLogFilePath, taskAppInfoPath); final Host workerAddress = new Host(host, port); @@ -220,7 +221,7 @@ public class LogClient implements AutoCloseable { appIds = LogUtils.getAppIds(taskLogFilePath, taskAppInfoPath, PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); } else { - final Message message = new GetAppIdRequest(taskLogFilePath, taskAppInfoPath).convert2Command(); + final Message message = new GetAppIdRequest(taskInstanceId, taskLogFilePath).convert2Command(); Message response = this.client.sendSync(workerAddress, message, LOG_REQUEST_TIMEOUT); if (response != null) { GetAppIdResponse responseCommand = diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java index f926d3ee31..b2c32a2ac3 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java @@ -171,7 +171,7 @@ public class ProcessUtils { Thread.sleep(Constants.SLEEP_TIME_MILLIS); Host host = Host.of(taskExecutionContext.getHost()); List appIds = logClient.getAppIds(host.getIp(), host.getPort(), taskExecutionContext.getLogPath(), - taskExecutionContext.getAppInfoPath()); + taskExecutionContext.getAppInfoPath(), taskExecutionContext.getTaskInstanceId()); if (CollectionUtils.isNotEmpty(appIds)) { taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, appIds)); if (StringUtils.isEmpty(taskExecutionContext.getExecutePath())) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java index b2aee43325..ceea6f8302 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -71,7 +71,7 @@ public class LogUtils { * @param fetchWay fetch way * @return application id list. */ - public List getAppIds(@NonNull String logPath, @NonNull String appInfoPath, String fetchWay) { + public List getAppIds(String logPath, String appInfoPath, String fetchWay) { if (!StringUtils.isEmpty(fetchWay) && fetchWay.equals("aop")) { log.info("Start finding appId in {}, fetch way: {} ", appInfoPath, fetchWay); return getAppIdsFromAppInfoFile(appInfoPath); @@ -142,7 +142,11 @@ public class LogUtils { .orElse(null); } - public List getAppIdsFromAppInfoFile(@NonNull String appInfoPath) { + public List getAppIdsFromAppInfoFile(String appInfoPath) { + if (StringUtils.isEmpty(appInfoPath)) { + log.warn("appInfoPath is empty"); + return Collections.emptyList(); + } File appInfoFile = new File(appInfoPath); if (!appInfoFile.exists() || !appInfoFile.isFile()) { return Collections.emptyList();