diff --git a/dockerfile/conf/escheduler/conf/worker_logback.xml b/dockerfile/conf/escheduler/conf/worker_logback.xml index 32914ec84f..f630559da9 100644 --- a/dockerfile/conf/escheduler/conf/worker_logback.xml +++ b/dockerfile/conf/escheduler/conf/worker_logback.xml @@ -9,19 +9,26 @@ UTF-8 - + INFO - ${log.base}/{processDefinitionId}/{processInstanceId}/{taskInstanceId}.log - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - true + + taskAppId + + + + ${log.base}/${taskAppId}.log + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + true + + diff --git a/escheduler-api/src/main/resources/combined_logback.xml b/escheduler-api/src/main/resources/combined_logback.xml index d9c5b80ecf..6a186b09b7 100644 --- a/escheduler-api/src/main/resources/combined_logback.xml +++ b/escheduler-api/src/main/resources/combined_logback.xml @@ -9,19 +9,26 @@ UTF-8 - + INFO - ${log.base}/{processDefinitionId}/{processInstanceId}/{taskInstanceId}.log - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - true + + taskAppId + + + + ${log.base}/${taskAppId}.log + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + true + + diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java index 1e0bd196ff..ab141e8629 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java @@ -34,6 +34,11 @@ public class LoggerUtils { */ private static final Pattern APPLICATION_REGEX = Pattern.compile(Constants.APPLICATION_REGEX); + /** + * Task Logger's prefix + */ + public static final String TASK_LOGGER_INFO_PREFIX = "TaskLogInfo"; + /** * build job id * @param affix @@ -46,7 +51,7 @@ public class LoggerUtils { int processDefId, int processInstId, int taskId){ - return String.format("%s_%s_%s_%s",affix, + return String.format("%s-%s/%s/%s",affix, processDefId, processInstId, taskId); diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java deleted file mode 100644 index 6ee8f6af9e..0000000000 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogAppender.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cn.escheduler.server.worker.log; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.FileAppender; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * task log appender - */ -public class TaskLogAppender extends FileAppender { - - private String currentlyActiveFile; - - @Override - protected void append(ILoggingEvent event) { - - if (currentlyActiveFile == null){ - currentlyActiveFile = getFile(); - } - String activeFile = currentlyActiveFile; - // thread nameļ¼š taskThreadName-processDefineId_processInstanceId_taskInstanceId - String threadName = event.getThreadName(); - String[] threadNameArr = threadName.split("-"); - // logId = processDefineId_processInstanceId_taskInstanceId - String logId = threadNameArr[1]; - // split logId - threadNameArr = logId.split("_"); - String processDefineId = threadNameArr[0]; - String processInstanceId = threadNameArr[1]; - String taskInstanceId = threadNameArr[2]; - - activeFile = activeFile.replace("{processDefinitionId}",processDefineId); - activeFile = activeFile.replace("{processInstanceId}",processInstanceId); - activeFile = activeFile.replace("{taskInstanceId}",taskInstanceId); - - setFile(activeFile); - start(); - super.subAppend(event); - } -} diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogDiscriminator.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogDiscriminator.java new file mode 100644 index 0000000000..172b1b270b --- /dev/null +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogDiscriminator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cn.escheduler.server.worker.log; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.sift.AbstractDiscriminator; +import cn.escheduler.server.utils.LoggerUtils; + +public class TaskLogDiscriminator extends AbstractDiscriminator { + + private String key; + + /** + * logger name should be like: + * Task Logger name should be like: TaskLogInfo-{processDefinitionId}/{processInstanceId}/{taskInstanceId} + */ + public String getDiscriminatingValue(ILoggingEvent event) { + String loggerName = event.getLoggerName(); + String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-"; + if (loggerName.startsWith(prefix)) { + return loggerName.substring(prefix.length()); + } else { + return "unknown_task"; + } + } + + @Override + public void start() { + started = true; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } +} diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java index 7edc532f8e..26ef5e84a8 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java @@ -19,6 +19,7 @@ package cn.escheduler.server.worker.log; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.filter.Filter; import ch.qos.logback.core.spi.FilterReply; +import cn.escheduler.server.utils.LoggerUtils; /** * task log filter @@ -27,7 +28,7 @@ public class TaskLogFilter extends Filter { @Override public FilterReply decide(ILoggingEvent event) { - if (event.getThreadName().startsWith("TaskLogInfo-")){ + if (event.getLoggerName().startsWith(LoggerUtils.TASK_LOGGER_INFO_PREFIX)) { return FilterReply.ACCEPT; } return FilterReply.DENY; diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogger.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogger.java deleted file mode 100644 index a72f0620ce..0000000000 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogger.java +++ /dev/null @@ -1,345 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cn.escheduler.server.worker.log; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Marker; - -/** - * custom task logger - */ -public class TaskLogger implements Logger { - - private static Logger logger = LoggerFactory.getLogger(TaskLogger.class); - - private String taskAppId; - - public TaskLogger(String taskAppId) { - this.taskAppId = taskAppId; - } - - private String addJobId(String msg) { - return String.format("[taskAppId=%s] %s", taskAppId, msg); - } - - @Override - public String getName() { - return logger.getName(); - } - - @Override - public boolean isTraceEnabled() { - return logger.isTraceEnabled(); - } - - @Override - public void trace(String msg) { - logger.trace(addJobId(msg)); - } - - @Override - public void trace(String format, Object arg) { - logger.trace(addJobId(format), arg); - } - - @Override - public void trace(String format, Object arg1, Object arg2) { - logger.trace(addJobId(format), arg1, arg2); - } - - @Override - public void trace(String format, Object... arguments) { - logger.trace(addJobId(format), arguments); - } - - @Override - public void trace(String msg, Throwable t) { - logger.trace(addJobId(msg), t); - } - - @Override - public boolean isTraceEnabled(Marker marker) { - return logger.isTraceEnabled(marker); - } - - @Override - public void trace(Marker marker, String msg) { - logger.trace(marker, addJobId(msg)); - } - - @Override - public void trace(Marker marker, String format, Object arg) { - logger.trace(marker, addJobId(format), arg); - } - - @Override - public void trace(Marker marker, String format, Object arg1, Object arg2) { - logger.trace(marker, addJobId(format), arg1, arg2); - } - - @Override - public void trace(Marker marker, String format, Object... argArray) { - logger.trace(marker, addJobId(format), argArray); - } - - @Override - public void trace(Marker marker, String msg, Throwable t) { - logger.trace(marker, addJobId(msg), t); - } - - @Override - public boolean isDebugEnabled() { - return logger.isDebugEnabled(); - } - - @Override - public void debug(String msg) { - logger.debug(addJobId(msg)); - } - - @Override - public void debug(String format, Object arg) { - logger.debug(addJobId(format), arg); - } - - @Override - public void debug(String format, Object arg1, Object arg2) { - logger.debug(addJobId(format), arg1, arg2); - } - - @Override - public void debug(String format, Object... arguments) { - logger.debug(addJobId(format), arguments); - } - - @Override - public void debug(String msg, Throwable t) { - logger.debug(addJobId(msg), t); - } - - @Override - public boolean isDebugEnabled(Marker marker) { - return logger.isDebugEnabled(); - } - - @Override - public void debug(Marker marker, String msg) { - logger.debug(marker, addJobId(msg)); - } - - @Override - public void debug(Marker marker, String format, Object arg) { - logger.debug(marker, addJobId(format), arg); - } - - @Override - public void debug(Marker marker, String format, Object arg1, Object arg2) { - logger.debug(marker, addJobId(format), arg1, arg2); - } - - @Override - public void debug(Marker marker, String format, Object... arguments) { - logger.debug(marker, addJobId(format), arguments); - } - - @Override - public void debug(Marker marker, String msg, Throwable t) { - logger.debug(marker, addJobId(msg), t); - } - - @Override - public boolean isInfoEnabled() { - return logger.isInfoEnabled(); - } - - @Override - public void info(String msg) { - logger.info(addJobId(msg)); - } - - @Override - public void info(String format, Object arg) { - logger.info(addJobId(format), arg); - } - - @Override - public void info(String format, Object arg1, Object arg2) { - logger.info(addJobId(format), arg1, arg2); - } - - @Override - public void info(String format, Object... arguments) { - logger.info(addJobId(format), arguments); - } - - @Override - public void info(String msg, Throwable t) { - logger.info(addJobId(msg), t); - } - - @Override - public boolean isInfoEnabled(Marker marker) { - return logger.isInfoEnabled(); - } - - @Override - public void info(Marker marker, String msg) { - logger.info(marker, addJobId(msg)); - } - - @Override - public void info(Marker marker, String format, Object arg) { - logger.info(marker, addJobId(format), arg); - } - - @Override - public void info(Marker marker, String format, Object arg1, Object arg2) { - logger.info(marker, addJobId(format), arg1, arg2); - } - - @Override - public void info(Marker marker, String format, Object... arguments) { - logger.info(marker, addJobId(format), arguments); - } - - @Override - public void info(Marker marker, String msg, Throwable t) { - logger.info(marker, addJobId(msg), t); - } - - @Override - public boolean isWarnEnabled() { - return logger.isWarnEnabled(); - } - - @Override - public void warn(String msg) { - logger.warn(addJobId(msg)); - } - - @Override - public void warn(String format, Object arg) { - logger.warn(addJobId(format), arg); - } - - @Override - public void warn(String format, Object arg1, Object arg2) { - logger.warn(addJobId(format), arg1, arg2); - } - - @Override - public void warn(String format, Object... arguments) { - logger.warn(addJobId(format), arguments); - } - - @Override - public void warn(String msg, Throwable t) { - logger.warn(addJobId(msg), t); - } - - @Override - public boolean isWarnEnabled(Marker marker) { - return logger.isWarnEnabled(); - } - - @Override - public void warn(Marker marker, String msg) { - logger.warn(marker, addJobId(msg)); - } - - @Override - public void warn(Marker marker, String format, Object arg) { - logger.warn(marker, addJobId(format), arg); - } - - @Override - public void warn(Marker marker, String format, Object arg1, Object arg2) { - logger.warn(marker, addJobId(format), arg1, arg2); - } - - @Override - public void warn(Marker marker, String format, Object... arguments) { - logger.warn(marker, addJobId(format), arguments); - } - - @Override - public void warn(Marker marker, String msg, Throwable t) { - logger.warn(marker, addJobId(msg), t); - } - - @Override - public boolean isErrorEnabled() { - return logger.isErrorEnabled(); - } - - @Override - public void error(String msg) { - logger.error(addJobId(msg)); - } - - @Override - public void error(String format, Object arg) { - logger.error(addJobId(format), arg); - } - - @Override - public void error(String format, Object arg1, Object arg2) { - logger.error(addJobId(format), arg1, arg2); - } - - @Override - public void error(String format, Object... arguments) { - logger.error(addJobId(format), arguments); - } - - @Override - public void error(String msg, Throwable t) { - logger.error(addJobId(msg), t); - } - - @Override - public boolean isErrorEnabled(Marker marker) { - return logger.isErrorEnabled(); - } - - @Override - public void error(Marker marker, String msg) { - logger.error(marker, addJobId(msg)); - } - - @Override - public void error(Marker marker, String format, Object arg) { - logger.error(marker, addJobId(format), arg); - } - - @Override - public void error(Marker marker, String format, Object arg1, Object arg2) { - logger.error(marker, addJobId(format), arg1, arg2); - } - - @Override - public void error(Marker marker, String format, Object... arguments) { - logger.error(marker, addJobId(format), arguments); - } - - @Override - public void error(Marker marker, String msg, Throwable t) { - logger.error(marker, addJobId(msg), t); - } -} diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java index c045b0713a..a839e3f706 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java @@ -40,7 +40,6 @@ import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.dao.model.Tenant; import cn.escheduler.server.utils.LoggerUtils; import cn.escheduler.server.utils.ParamUtils; -import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskProps; @@ -66,11 +65,6 @@ public class TaskScheduleThread implements Runnable { */ private final Logger logger = LoggerFactory.getLogger(TaskScheduleThread.class); - /** - * task prefix - */ - private static final String TASK_PREFIX = "TASK"; - /** * task instance */ @@ -147,7 +141,7 @@ public class TaskScheduleThread implements Runnable { taskInstance.getId())); // custom logger - TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskInstance.getProcessDefine().getId(), taskInstance.getProcessInstance().getId(), taskInstance.getId())); diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java index e0aef326fd..9a9123a415 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -347,7 +348,8 @@ public abstract class AbstractCommandExecutor { */ private void parseProcessOutput(Process process) { String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskAppId); - ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName).submit(new Runnable(){ + ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName); + parseProcessOutputExecutorService.submit(new Runnable(){ @Override public void run() { BufferedReader inReader = null; @@ -373,7 +375,7 @@ public abstract class AbstractCommandExecutor { } } }); - + parseProcessOutputExecutorService.shutdown(); } public int getPid() { diff --git a/escheduler-server/src/main/resources/worker_logback.xml b/escheduler-server/src/main/resources/worker_logback.xml index 32914ec84f..f630559da9 100644 --- a/escheduler-server/src/main/resources/worker_logback.xml +++ b/escheduler-server/src/main/resources/worker_logback.xml @@ -9,19 +9,26 @@ UTF-8 - + INFO - ${log.base}/{processDefinitionId}/{processInstanceId}/{taskInstanceId}.log - - - [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n - - UTF-8 - - true + + taskAppId + + + + ${log.base}/${taskAppId}.log + + + [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n + + UTF-8 + + true + + diff --git a/escheduler-server/src/test/java/cn/escheduler/server/worker/shell/ShellCommandExecutorTest.java b/escheduler-server/src/test/java/cn/escheduler/server/worker/shell/ShellCommandExecutorTest.java index 6bee6752ba..a80167ddf7 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/worker/shell/ShellCommandExecutorTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/worker/shell/ShellCommandExecutorTest.java @@ -23,7 +23,6 @@ import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; -import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskProps; @@ -43,7 +42,6 @@ import java.util.Date; public class ShellCommandExecutorTest { private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class); - private static final String TASK_PREFIX = "TASK"; private ProcessDao processDao = null; @@ -75,7 +73,7 @@ public class ShellCommandExecutorTest { // custom logger - TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId())); diff --git a/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java b/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java index 6bacce113b..38d5fc02d0 100644 --- a/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java +++ b/escheduler-server/src/test/java/cn/escheduler/server/worker/sql/SqlExecutorTest.java @@ -23,7 +23,6 @@ import cn.escheduler.dao.DaoFactory; import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.utils.LoggerUtils; -import cn.escheduler.server.worker.log.TaskLogger; import cn.escheduler.server.worker.task.AbstractTask; import cn.escheduler.server.worker.task.TaskManager; import cn.escheduler.server.worker.task.TaskProps; @@ -43,7 +42,6 @@ import java.util.Date; public class SqlExecutorTest { private static final Logger logger = LoggerFactory.getLogger(SqlExecutorTest.class); - private static final String TASK_PREFIX = "TASK"; private ProcessDao processDao = null; @@ -118,7 +116,7 @@ public class SqlExecutorTest { // custom logger - TaskLogger taskLogger = new TaskLogger(LoggerUtils.buildTaskId(TASK_PREFIX, + Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId()));