Browse Source

[Fix-#3618][server] resolve task executed finished but not release the file handle (#3975)

* [Fix-#3618][server] resolve task executed finished but not release the file handle

* [Fix-#3618][server] resolve task executed finished but not release the file handle
pull/3/MERGE
lgcareer 4 years ago committed by GitHub
parent
commit
075d1638d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogAppender.java
  2. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  3. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
  4. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  5. 2
      dolphinscheduler-server/src/main/resources/logback-worker.xml

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogAppender.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.FileAppender;
import org.slf4j.Marker;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
/**
* Task log appender
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent>{
@Override
protected void append(ILoggingEvent event) {
Marker marker = event.getMarker();
if (marker !=null) {
if (marker.equals(FINALIZE_SESSION_MARKER)) {
stop();
}
}
super.subAppend(event);
}
}

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -45,6 +45,8 @@ import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
/** /**
* task scheduler thread * task scheduler thread
@ -132,7 +134,6 @@ public class TaskExecuteThread implements Runnable {
// task result process // task result process
task.after(); task.after();
responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId()); responseCommand.setProcessId(task.getProcessId());

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java

@ -319,12 +319,16 @@ public abstract class AbstractCommandExecutor {
* clear * clear
*/ */
private void clear() { private void clear() {
List<String> markerList = new ArrayList<>();
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
if (!logBuffer.isEmpty()) { if (!logBuffer.isEmpty()) {
// log handle // log handle
logHandler.accept(logBuffer); logHandler.accept(logBuffer);
logBuffer.clear(); logBuffer.clear();
} }
logHandler.accept(markerList);
} }
/** /**

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -16,8 +16,12 @@
*/ */
package org.apache.dolphinscheduler.server.worker.task; package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters; import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
@ -34,14 +38,13 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
/** /**
* executive task * executive task
*/ */
@ -118,7 +121,11 @@ public abstract class AbstractTask {
*/ */
public void logHandle(List<String> logs) { public void logHandle(List<String> logs) {
// note that the "new line" is added here to facilitate log parsing // note that the "new line" is added here to facilitate log parsing
logger.info(" -> {}", String.join("\n\t", logs)); if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
} else {
logger.info(" -> {}", String.join("\n\t", logs));
}
} }
/** /**

2
dolphinscheduler-server/src/main/resources/logback-worker.xml

@ -42,7 +42,7 @@
<logBase>${log.base}</logBase> <logBase>${log.base}</logBase>
</Discriminator> </Discriminator>
<sift> <sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender"> <appender name="FILE-${taskAppId}" class="org.apache.dolphinscheduler.server.log.TaskLogAppender">
<file>${log.base}/${taskAppId}.log</file> <file>${log.base}/${taskAppId}.log</file>
<encoder> <encoder>
<pattern> <pattern>

Loading…
Cancel
Save