diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java index fca7fcc35a..add5767c26 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.spi.task.TaskConstants; import java.io.BufferedReader; import java.io.FileInputStream; @@ -48,38 +49,18 @@ public class LoggerUtils { */ private static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); - /** - * Task Logger's prefix - */ - public static final String TASK_LOGGER_INFO_PREFIX = "TASK"; - - /** - * Task Logger Thread's name - */ - public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; - - /** - * Task Logger Thread's name - */ - public static final String TASK_APPID_LOG_FORMAT = "[taskAppId="; - /** * build job id - * - * @param affix Task Logger's prefix - * @param processInstId process instance id - * @param taskId task id * @return task id format */ - public static String buildTaskId(String affix, - Date firstSubmitTime, + public static String buildTaskId(Date firstSubmitTime, Long processDefineCode, int processDefineVersion, int processInstId, int taskId) { - // - [taskAppId=TASK-20211107-798_1-4084-15210] + // like TaskAppId=TASK-20211107-798_1-4084-15210 String firstSubmitTimeStr = DateUtils.format(firstSubmitTime, Constants.YYYYMMDD); - return String.format(" - %s%s-%s-%s_%s-%s-%s]", TASK_APPID_LOG_FORMAT, affix, firstSubmitTimeStr, processDefineCode, processDefineVersion, processInstId, taskId); + return String.format("%s=%s-%s-%s_%s-%s-%s", TaskConstants.TASK_APPID_LOG_FORMAT, TaskConstants.TASK_LOGGER_INFO_PREFIX, firstSubmitTimeStr, processDefineCode, processDefineVersion, processInstId, taskId); } /** diff --git a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java b/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java index d8abb48d72..8905315bd3 100644 --- a/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java +++ b/dolphinscheduler-log-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java @@ -22,6 +22,8 @@ import ch.qos.logback.classic.spi.IThrowableProxy; import ch.qos.logback.classic.spi.LoggerContextVO; import ch.qos.logback.core.spi.FilterReply; import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.spi.task.TaskConstants; + import org.junit.Assert; import org.junit.Test; import org.slf4j.Marker; @@ -39,7 +41,7 @@ public class TaskLogFilterTest { FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() { @Override public String getThreadName() { - return LoggerUtils.TASK_LOGGER_THREAD_NAME; + return TaskConstants.TASK_APPID_LOG_FORMAT; } @Override @@ -64,7 +66,7 @@ public class TaskLogFilterTest { @Override public String getLoggerName() { - return null; + return TaskConstants.TASK_LOG_LOGGER_NAME; } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index e1366bcfb2..f7a8dddd03 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -39,7 +39,6 @@ public class MasterConfig { private int stateWheelInterval; private double maxCpuLoadAvg; private double reservedMemory; - private boolean taskLogger; private int failoverInterval; private boolean killYarnJobWhenTaskFailover; @@ -139,14 +138,6 @@ public class MasterConfig { this.reservedMemory = reservedMemory; } - public boolean isTaskLogger() { - return taskLogger; - } - - public void setTaskLogger(boolean taskLogger) { - this.taskLogger = taskLogger; - } - public int getFailoverInterval() { return failoverInterval; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 8d62f1fda0..6e67d487d6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.enums.ResourceType; +import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; @@ -69,7 +70,7 @@ import com.google.common.base.Strings; public abstract class BaseTaskProcessor implements ITaskProcessor { - protected Logger logger = LoggerFactory.getLogger(getClass()); + protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); protected boolean killed = false; @@ -85,12 +86,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected int commitInterval; - protected boolean isTaskLogger; - protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class); + protected String threadLoggerInfoName; + @Override public void init(TaskInstance taskInstance, ProcessInstance processInstance) { if (processService == null) { @@ -103,7 +104,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { this.processInstance = processInstance; this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes(); this.commitInterval = masterConfig.getTaskCommitInterval(); - this.isTaskLogger = masterConfig.isTaskLogger(); } /** @@ -133,13 +133,15 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { /** * dispatch task - * @return */ protected abstract boolean dispatchTask(); @Override public boolean action(TaskAction taskAction) { - + String threadName = Thread.currentThread().getName(); + if (StringUtils.isNotEmpty(threadLoggerInfoName)) { + Thread.currentThread().setName(threadLoggerInfoName); + } switch (taskAction) { case STOP: return stop(); @@ -155,8 +157,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return dispatch(); default: logger.error("unknown task action: {}", taskAction); - } + // reset thread name + Thread.currentThread().setName(threadName); return false; } @@ -205,6 +208,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { return this.taskInstance.getState(); } + /** + * set master task running logger. + */ + public void setTaskExecutionLogger() { + threadLoggerInfoName = LoggerUtils.buildTaskId(taskInstance.getFirstSubmitTime(), + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId()); + Thread.currentThread().setName(threadLoggerInfoName); + } + /** * get TaskExecutionContext * @@ -267,21 +282,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { .create(); } - /** - * set master task running logger. - */ - public void setTaskExecutionLogger(boolean isTaskLogger) { - if (!isTaskLogger) { - return; - } - logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskInstance.getFirstSubmitTime(), - processInstance.getProcessDefinitionCode(), - processInstance.getProcessDefinitionVersion(), - taskInstance.getProcessInstanceId(), - taskInstance.getId())); - } - /** * set procedure task relation * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index db6e7e202e..9d8a717743 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -48,13 +48,13 @@ public class CommonTaskProcessor extends BaseTaskProcessor { private NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); @Override - public boolean submitTask() { + protected boolean submitTask() { this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); if (this.taskInstance == null) { return false; } - setTaskExecutionLogger(isTaskLogger); + this.setTaskExecutionLogger(); int taskGroupId = taskInstance.getTaskGroupId(); if (taskGroupId > 0) { boolean acquireTaskGroup = processService.acquireTaskGroup(taskInstance.getId(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 3d96d2e14d..9646285a82 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -62,14 +62,10 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { @Override public boolean submitTask() { this.taskInstance = processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval); - if (this.taskInstance == null) { return false; } - - setTaskExecutionLogger(isTaskLogger); - String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); - Thread.currentThread().setName(threadLoggerInfoName); + this.setTaskExecutionLogger(); initTaskParameters(); logger.info("dependent task start"); return true; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index 4b20b681de..0f89a44124 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -75,13 +75,12 @@ public class DependentTaskProcessor extends BaseTaskProcessor { if (this.taskInstance == null) { return false; } - + this.setTaskExecutionLogger(); taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - setTaskExecutionLogger(isTaskLogger); taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index 85bce57a61..5a6355696c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand; import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService; +import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.Date; @@ -53,7 +54,13 @@ public class SubTaskProcessor extends BaseTaskProcessor { if (this.taskInstance == null) { return false; } - setTaskExecutionLogger(isTaskLogger); + this.setTaskExecutionLogger(); + taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), + processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion(), + taskInstance.getProcessInstanceId(), + taskInstance.getId())); + return true; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index ce2706f02f..912822b173 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -62,11 +62,11 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { if (this.taskInstance == null) { return false; } + this.setTaskExecutionLogger(); taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance.getFirstSubmitTime(), processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion(), taskInstance.getProcessInstanceId(), taskInstance.getId())); - setTaskExecutionLogger(isTaskLogger); taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort())); taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(new Date()); diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index cfbe857760..55e7ea6c25 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -104,8 +104,6 @@ master: max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 - # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file - task-logger: true # failover interval, the unit is minute failover-interval: 10 # kill yarn jon when failover taskInstance, default true diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index 35bd657395..b806cf357c 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -40,7 +40,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n UTF-8 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java index 6db851853d..b775d7461e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java @@ -16,16 +16,22 @@ */ package org.apache.dolphinscheduler.server.log; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.spi.task.TaskConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.sift.AbstractDiscriminator; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; /** * Task Log Discriminator */ public class TaskLogDiscriminator extends AbstractDiscriminator { + private static Logger logger = LoggerFactory.getLogger(TaskLogDiscriminator.class); + /** * key */ @@ -38,19 +44,24 @@ public class TaskLogDiscriminator extends AbstractDiscriminator { /** * logger name should be like: - * Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId} + * Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId} */ @Override public String getDiscriminatingValue(ILoggingEvent event) { - String loggerName = event.getLoggerName() - .split(Constants.EQUAL_SIGN)[1]; - String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-"; - if (loggerName.startsWith(prefix)) { - return loggerName.substring(prefix.length(), - loggerName.length() - 1).replaceFirst("-","/"); - } else { - return "unknown_task"; + String key = "unknown_task"; + if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) { + String threadName = event.getThreadName(); + if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) { + threadName = threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length()); + } + String part1 = threadName.split(Constants.EQUAL_SIGN)[1]; + String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-"; + if (part1.startsWith(prefix)) { + key = part1.substring(prefix.length()).replaceFirst("-", "/"); + } } + logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(), event.getLoggerName()); + return key; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java index ec93b85c76..b256fc8bb4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java @@ -14,9 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.log; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.spi.task.TaskConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -24,10 +28,12 @@ import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; /** - * task log filter + * task log filter */ public class TaskLogFilter extends Filter { + private static Logger logger = LoggerFactory.getLogger(TaskLogFilter.class); + /** * level */ @@ -39,16 +45,19 @@ public class TaskLogFilter extends Filter { /** * Accept or reject based on thread name + * * @param event event * @return FilterReply */ @Override public FilterReply decide(ILoggingEvent event) { - if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) - || event.getLoggerName().startsWith(" - " + LoggerUtils.TASK_APPID_LOG_FORMAT) + FilterReply filterReply = FilterReply.DENY; + if ((event.getThreadName().startsWith(TaskConstants.TASK_APPID_LOG_FORMAT) + && event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) || event.getLevel().isGreaterOrEqual(level)) { - return FilterReply.ACCEPT; + filterReply = FilterReply.ACCEPT; } - return FilterReply.DENY; + logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(), event.getLoggerName(), filterReply.name(), level); + return filterReply; } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java index 65b92e6379..a783c84f8b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.server.log; +import org.apache.dolphinscheduler.spi.task.TaskConstants; + import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.IThrowableProxy; @@ -48,7 +50,7 @@ public class TaskLogDiscriminatorTest { String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() { @Override public String getThreadName() { - return null; + return "taskAppId=TASK-20220105-101-1-1001"; } @Override @@ -73,7 +75,7 @@ public class TaskLogDiscriminatorTest { @Override public String getLoggerName() { - return "[taskAppId=TASK-1-1-1"; + return TaskConstants.TASK_LOG_LOGGER_NAME; } @Override @@ -121,7 +123,7 @@ public class TaskLogDiscriminatorTest { } }); - Assert.assertEquals("1/1-", result); + Assert.assertEquals("20220105/101-1-1001", result); } @Test diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java index 5ec6342e2d..2b410e3f40 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java @@ -126,7 +126,27 @@ public class TaskConstants { /** * task log info format */ - public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s"; + public static final String TASK_LOG_LOGGER_NAME = "TaskLogLogger"; + + /** + * task log logger name format + */ + public static final String TASK_LOG_LOGGER_NAME_FORMAT = TASK_LOG_LOGGER_NAME + "-%s"; + + /** + * Task Logger's prefix + */ + public static final String TASK_LOGGER_INFO_PREFIX = "TASK"; + + /** + * Task Logger Thread's name + */ + public static final String TASK_APPID_LOG_FORMAT = "taskAppId"; + + /** + * get output log service + */ + public static final String GET_OUTPUT_LOG_SERVICE = "-getOutputLogService"; /** * date format of yyyyMMdd @@ -320,11 +340,6 @@ public class TaskConstants { */ public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state"; - /** - * Task Logger Thread's name - */ - public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo"; - /** * hdfs/s3 configuration * resource.upload.path diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 02d2b9f7ad..c2400bc05d 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -108,8 +108,6 @@ master: max-cpu-load-avg: -1 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G reserved-memory: 0.3 - # use task logger, default true; if true, it will create log for every task; if false, the task log will append to master log file - task-logger: true # failover interval, the unit is minute failover-interval: 10 # kill yarn jon when failover taskInstance, default true diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index 8e135e443b..e23239ee56 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -63,7 +63,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n UTF-8 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 418bf1aa9f..df18ef99b9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.apache.hadoop.hive.common.LogUtils; + import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; @@ -307,8 +309,8 @@ public abstract class AbstractCommandExecutor { * @param process process */ private void parseProcessOutput(Process process) { - String threadLoggerInfoName = String.format(TaskConstants.TASK_LOGGER_THREAD_NAME + "-%s", taskRequest.getTaskAppId()); - ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); + String threadLoggerInfoName = taskRequest.getTaskLogName(); + ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName); getOutputLogService.submit(() -> { try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java index 9f1ea933d6..28c6bad39d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.task.api; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.Property; +import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import java.util.Map; @@ -36,7 +37,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask { public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION"); - protected Logger logger; + protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass())); public String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*"; /** @@ -46,7 +47,6 @@ public abstract class AbstractTaskExecutor extends AbstractTask { */ protected AbstractTaskExecutor(TaskRequest taskRequest) { super(taskRequest); - logger = LoggerFactory.getLogger(taskRequest.getTaskLogName()); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 3fe93ff1e4..073552d15d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -145,10 +145,6 @@ public class DataxTask extends AbstractTaskExecutor { @Override public void handle() throws Exception { try { - // set the name of the current thread - String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); - Thread.currentThread().setName(threadLoggerInfoName); - // replace placeholder,and combine local and global parameters Map paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); if (MapUtils.isEmpty(paramsMap)) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index f57238dcc2..22b8700a8f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.plugin.task.http; import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON; -import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.util.MapUtils; @@ -91,10 +90,6 @@ public class HttpTask extends AbstractTaskExecutor { @Override public void handle() throws Exception { - - String threadLoggerInfoName = String.format(TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); - Thread.currentThread().setName(threadLoggerInfoName); - long startTime = System.currentTimeMillis(); String formatTimeStamp = DateUtils.formatTimeStamp(startTime); String statusCode = null; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index c2d3238c3b..b33a00cb70 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -43,7 +43,6 @@ import java.util.Map; import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS; -import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT; /** * procedure task @@ -82,10 +81,6 @@ public class ProcedureTask extends AbstractTaskExecutor { @Override public void handle() throws Exception { - // set the name of the current thread - String threadLoggerInfoName = String.format(TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); - Thread.currentThread().setName(threadLoggerInfoName); - logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}", procedureParameters.getType(), procedureParameters.getDatasource(), diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 41b8915267..45e6c39d56 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -114,10 +114,6 @@ public class SqlTask extends AbstractTaskExecutor { @Override public void handle() throws Exception { - // set the name of the current thread - String threadLoggerInfoName = String.format(TaskConstants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); - Thread.currentThread().setName(threadLoggerInfoName); - logger.info("Full sql parameters: {}", sqlParameters); logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", sqlParameters.getType(), diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 9f306b8b47..c02c04ccfb 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskAlertInfo; import org.apache.dolphinscheduler.spi.task.TaskChannel; +import org.apache.dolphinscheduler.spi.task.TaskConstants; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; @@ -171,14 +172,16 @@ public class TaskExecuteThread implements Runnable, Delayed { throw new RuntimeException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType())); } TaskRequest taskRequest = JSONUtils.parseObject(JSONUtils.toJsonString(taskExecutionContext), TaskRequest.class); - String taskLogName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskExecutionContext.getFirstSubmitTime(), + String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getProcessDefineCode(), taskExecutionContext.getProcessDefineVersion(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId()); taskRequest.setTaskLogName(taskLogName); + // set the name of the current thread + Thread.currentThread().setName(taskLogName); + task = taskChannel.createTask(taskRequest); // task init diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 773fd4365a..3ed530b5bd 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -41,7 +41,7 @@ ${log.base}/${taskAppId}.log - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n UTF-8 diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java index ab5d6c36a8..5a345abf64 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/worker/processor/TaskExecuteProcessorTest.java @@ -107,13 +107,6 @@ public class TaskExecuteProcessorTest { PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) .thenReturn(workerConfig); - Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, - taskExecutionContext.getFirstSubmitTime(), - taskExecutionContext.getProcessDefineCode(), - taskExecutionContext.getProcessDefineVersion(), - taskExecutionContext.getProcessInstanceId(), - taskExecutionContext.getTaskInstanceId())); - workerManager = PowerMockito.mock(WorkerManagerThread.class); PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);