Browse Source

[Bug #6306][Worker] task log path generated error. (#6311)

* [Bug #6306][Worker] task log path generated error.

* [Bug #6306][Worker] task log path generated error.
2.0.7-release
Kirs 3 years ago committed by GitHub
parent
commit
163a0696a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
  2. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  3. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
  4. 123
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/LoggerUtils.java

5
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java

@ -320,5 +320,8 @@ public class TaskConstants {
*/ */
public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state"; public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state";
/**
* Task Logger Thread's name
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
} }

3
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.plugin.task.api;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_KILL;
import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.util.OSUtils; import org.apache.dolphinscheduler.plugin.task.util.OSUtils;
import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
@ -308,7 +307,7 @@ public abstract class AbstractCommandExecutor {
* @param process process * @param process process
*/ */
private void parseProcessOutput(Process process) { private void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskRequest.getTaskAppId()); String threadLoggerInfoName = String.format(TaskConstants.TASK_LOGGER_THREAD_NAME + "-%s", taskRequest.getTaskAppId());
ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
getOutputLogService.submit(() -> { getOutputLogService.submit(() -> {
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {

9
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java

@ -17,13 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.api; package org.apache.dolphinscheduler.plugin.task.api;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
import org.apache.dolphinscheduler.plugin.task.util.LoggerUtils;
import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.List;
import java.util.StringJoiner; import java.util.StringJoiner;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -45,10 +41,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
*/ */
protected AbstractTaskExecutor(TaskRequest taskRequest) { protected AbstractTaskExecutor(TaskRequest taskRequest) {
super(taskRequest); super(taskRequest);
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, logger = LoggerFactory.getLogger(taskRequest.getLogPath());
taskRequest.getProcessDefineId(),
taskRequest.getProcessInstanceId(),
taskRequest.getTaskInstanceId()));
} }
/** /**

123
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/util/LoggerUtils.java

@ -1,123 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
/**
* logger utils
*/
public class LoggerUtils {
private static final String APPLICATION_REGEX_NAME = "application_\\d+_\\d+";
private LoggerUtils() {
throw new UnsupportedOperationException("Construct LoggerUtils");
}
/**
* rules for extracting application ID
*/
private static final Pattern APPLICATION_REGEX = Pattern.compile(APPLICATION_REGEX_NAME);
/**
* Task Logger's prefix
*/
public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
/**
* Task Logger Thread's name
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/**
* Task Logger Thread's name
*/
public static final String TASK_APPID_LOG_FORMAT = "[taskAppId=";
/**
* build job id
*
* @param affix Task Logger's prefix
* @param processDefId process define id
* @param processInstId process instance id
* @param taskId task id
* @return task id format
*/
public static String buildTaskId(String affix,
int processDefId,
int processInstId,
int taskId) {
// - [taskAppId=TASK_79_4084_15210]
return String.format(" - %s%s-%s-%s-%s]", TASK_APPID_LOG_FORMAT, affix,
processDefId,
processInstId,
taskId);
}
/**
* processing log
* get yarn application id list
*
* @param log log content
* @param logger logger
* @return app id list
*/
public static List<String> getAppIds(String log, Logger logger) {
List<String> appIds = new ArrayList<>();
Matcher matcher = APPLICATION_REGEX.matcher(log);
// analyse logs to get all submit yarn application id
while (matcher.find()) {
String appId = matcher.group();
if (!appIds.contains(appId)) {
logger.info("find app id: {}", appId);
appIds.add(appId);
}
}
return appIds;
}
public static void logError(Optional<Logger> optionalLogger
, String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error));
}
public static void logError(Optional<Logger> optionalLogger
, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e));
}
public static void logError(Optional<Logger> optionalLogger
, String error, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error, e));
}
public static void logInfo(Optional<Logger> optionalLogger
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
}
Loading…
Cancel
Save