Browse Source

Merge remote-tracking branch 'upstream/dev' into dev

pull/3/MERGE
lidongdai 5 years ago
parent
commit
56fc32e568
  1. 1
      escheduler-api/src/main/resources/combined_logback.xml
  2. 4
      escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java
  3. 29
      escheduler-server/src/main/java/cn/escheduler/server/rpc/LoggerServer.java
  4. 7
      escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java
  5. 21
      escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogDiscriminator.java
  6. 2
      escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java
  7. 15
      escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java
  8. 7
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java
  9. 5
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java
  10. 5
      escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java
  11. 1
      escheduler-server/src/main/resources/worker_logback.xml

1
escheduler-api/src/main/resources/combined_logback.xml

@ -16,6 +16,7 @@
<filter class="cn.escheduler.server.worker.log.TaskLogFilter"></filter>
<Discriminator class="cn.escheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">

4
escheduler-common/src/main/java/cn/escheduler/common/utils/HadoopUtils.java

@ -66,6 +66,10 @@ public class HadoopUtils implements Closeable {
}
public static HadoopUtils getInstance(){
// if kerberos startup , renew HadoopUtils
if (CommonUtils.getKerberosStartupState()){
return new HadoopUtils();
}
return instance;
}

29
escheduler-server/src/main/java/cn/escheduler/server/rpc/LoggerServer.java

@ -100,14 +100,8 @@ public class LoggerServer {
StringBuilder sb = new StringBuilder();
boolean errorLineFlag = false;
for (String line : list){
if (line.contains("TaskLogger")){
errorLineFlag = filterLine(request.getPath(),line);
}
if (!errorLineFlag || !line.contains("TaskLogger")){
sb.append(line + "\r\n");
}
}
RetStrInfo retInfoBuild = RetStrInfo.newBuilder().setMsg(sb.toString()).build();
responseObserver.onNext(retInfoBuild);
responseObserver.onCompleted();
@ -204,14 +198,8 @@ public class LoggerServer {
br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
boolean errorLineFlag = false;
while ((line = br.readLine()) != null){
if (line.contains("TaskLogger")){
errorLineFlag = filterLine(path,line);
}
if (!errorLineFlag || !line.contains("TaskLogger")){
sb.append(line + "\r\n");
}
}
return sb.toString();
}catch (IOException e){
@ -228,21 +216,4 @@ public class LoggerServer {
return null;
}
/**
*
* @param path
* @param line
* @return
*/
private static boolean filterLine(String path,String line){
String removeSuffix = path.substring(0, path.length() - 4);
String[] strArrs = removeSuffix.split("/");
String taskAppId = String.format("%s_%s_%s",
strArrs[strArrs.length - 3],
strArrs[strArrs.length-2],
strArrs[strArrs.length - 1]);
return !line.contains(taskAppId);
}
}

7
escheduler-server/src/main/java/cn/escheduler/server/utils/LoggerUtils.java

@ -37,7 +37,9 @@ public class LoggerUtils {
/**
* Task Logger's prefix
*/
public static final String TASK_LOGGER_INFO_PREFIX = "TaskLogInfo";
public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/**
* build job id
@ -51,7 +53,8 @@ public class LoggerUtils {
int processDefId,
int processInstId,
int taskId){
return String.format("%s-%s/%s/%s",affix,
// - [taskAppId=TASK_79_4084_15210]
return String.format(" - [taskAppId=%s-%s-%s-%s]",affix,
processDefId,
processInstId,
taskId);

21
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogDiscriminator.java

@ -18,21 +18,27 @@ package cn.escheduler.server.worker.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.AbstractDiscriminator;
import cn.escheduler.common.Constants;
import cn.escheduler.server.utils.LoggerUtils;
public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
private String key;
private String logBase;
/**
* logger name should be like:
* Task Logger name should be like: TaskLogInfo-{processDefinitionId}/{processInstanceId}/{taskInstanceId}
* Task Logger name should be like: Task-{processDefinitionId}-{processInstanceId}-{taskInstanceId}
*/
@Override
public String getDiscriminatingValue(ILoggingEvent event) {
String loggerName = event.getLoggerName();
String loggerName = event.getLoggerName()
.split(Constants.EQUAL_SIGN)[1];
String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-";
if (loggerName.startsWith(prefix)) {
return loggerName.substring(prefix.length());
return loggerName.substring(prefix.length(),
loggerName.length() - 1).replace("-","/");
} else {
return "unknown_task";
}
@ -43,6 +49,7 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
started = true;
}
@Override
public String getKey() {
return key;
}
@ -50,4 +57,12 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
public void setKey(String key) {
this.key = key;
}
public String getLogBase() {
return logBase;
}
public void setLogBase(String logBase) {
this.logBase = logBase;
}
}

2
escheduler-server/src/main/java/cn/escheduler/server/worker/log/TaskLogFilter.java

@ -28,7 +28,7 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getLoggerName().startsWith(LoggerUtils.TASK_LOGGER_INFO_PREFIX)) {
if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME)) {
return FilterReply.ACCEPT;
}
return FilterReply.DENY;

15
escheduler-server/src/main/java/cn/escheduler/server/worker/runner/TaskScheduleThread.java

@ -17,6 +17,8 @@
package cn.escheduler.server.worker.runner;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.TaskRecordStatus;
@ -40,6 +42,7 @@ import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.Tenant;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ParamUtils;
import cn.escheduler.server.worker.log.TaskLogDiscriminator;
import cn.escheduler.server.worker.task.AbstractTask;
import cn.escheduler.server.worker.task.TaskManager;
import cn.escheduler.server.worker.task.TaskProps;
@ -220,8 +223,18 @@ public class TaskScheduleThread implements Runnable {
* @return
*/
private String getTaskLogPath() {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
"logs" + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskInstance.getProcessDefinitionId() + Constants.SINGLE_SLASH +
taskInstance.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskInstance.getId() + ".log";

7
escheduler-server/src/main/java/cn/escheduler/server/worker/task/AbstractCommandExecutor.java

@ -22,6 +22,7 @@ import cn.escheduler.common.thread.ThreadUtils;
import cn.escheduler.common.utils.HadoopUtils;
import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.utils.LoggerUtils;
import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@ -347,7 +348,7 @@ public abstract class AbstractCommandExecutor {
* get the standard output of the process
*/
private void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskAppId);
String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskAppId);
ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
parseProcessOutputExecutorService.submit(new Runnable(){
@Override
@ -361,10 +362,7 @@ public abstract class AbstractCommandExecutor {
long lastFlushTime = System.currentTimeMillis();
while ((line = inReader.readLine()) != null) {
if(checkShowLog(line)){
logBuffer.add(line);
}
lastFlushTime = flush(lastFlushTime);
}
} catch (Exception e) {
@ -566,7 +564,6 @@ public abstract class AbstractCommandExecutor {
protected abstract String buildCommandFilePath();
protected abstract String commandType();
protected abstract boolean checkShowLog(String line);
protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
}

5
escheduler-server/src/main/java/cn/escheduler/server/worker/task/PythonCommandExecutor.java

@ -100,11 +100,6 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
return pythonHome;
}
@Override
protected boolean checkShowLog(String line) {
return true;
}
@Override
protected boolean checkFindApp(String line) {
return true;

5
escheduler-server/src/main/java/cn/escheduler/server/worker/task/ShellCommandExecutor.java

@ -60,11 +60,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
return SH;
}
@Override
protected boolean checkShowLog(String line) {
return line.contains(taskAppId) || !line.contains("cn.escheduler.server.worker.log.TaskLogger");
}
@Override
protected boolean checkFindApp(String line) {
return line.contains(taskAppId);

1
escheduler-server/src/main/resources/worker_logback.xml

@ -16,6 +16,7 @@
<filter class="cn.escheduler.server.worker.log.TaskLogFilter"></filter>
<Discriminator class="cn.escheduler.server.worker.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">

Loading…
Cancel
Save