Browse Source

[Bug] [dolphinscheduler-server] memory leak of logger #7661 (#7665)

* [Feature][dolphinscheduler-api] parse traceId in http header for Cross system delivery to #7237 (#7238)

* to #7237

* rerun test

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 05aef27 and handle conflicts

* to #7065: fix ExecutorService and schedulerService (#7072)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* [Feature][dolphinscheduler-api] access control of taskDefinition and taskInstance in project to #7081  (#7082)

* to #7081

* fix #7081

* to #7081

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* chery-pick 8ebe060 and handle conflicts

* cherry-pick 1f18444 and handle conflicts

* fix #6807: dolphinscheduler.zookeeper.env_vars - > dolphinscheduler.registry.env_vars (#6808)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>

* add default constructor (#6780)

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>

* to #7108 (#7109)

* add conf && pick #7562 to worker

* to #7661

* to #7661

* to #7661:fix ut

Co-authored-by: honghuo.zw <honghuo.zw@alibaba-inc.com>
Co-authored-by: Kirs <acm_master@163.com>
2.0.7-release
zwZjut 3 years ago committed by GitHub
parent
commit
32fb3f84df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docker/build/conf/dolphinscheduler/logback/logback-master.xml
  2. 4
      docker/build/conf/dolphinscheduler/logback/logback-worker.xml
  3. 10
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
  4. 37
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java
  5. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java
  6. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  7. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  8. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  9. 32
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  10. 17
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java
  11. 18
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java
  12. 8
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java
  13. 24
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java
  14. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  15. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java
  16. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  17. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
  18. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  19. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

4
docker/build/conf/dolphinscheduler/logback/logback-master.xml

@ -45,7 +45,7 @@
<file>${log.base}/${taskAppId}.log</file> <file>${log.base}/${taskAppId}.log</file>
<encoder> <encoder>
<pattern> <pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] - %messsage%n
</pattern> </pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>
@ -66,7 +66,7 @@
</rollingPolicy> </rollingPolicy>
<encoder> <encoder>
<pattern> <pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %msg%n [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %msg%n
</pattern> </pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>

4
docker/build/conf/dolphinscheduler/logback/logback-worker.xml

@ -46,7 +46,7 @@
<file>${log.base}/${taskAppId}.log</file> <file>${log.base}/${taskAppId}.log</file>
<encoder> <encoder>
<pattern> <pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] - %messsage%n
</pattern> </pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>
@ -66,7 +66,7 @@
</rollingPolicy> </rollingPolicy>
<encoder> <encoder>
<pattern> <pattern>
[%level] %date{yyyy-MM-dd HH:mm:ss.SSS} %logger{96}:[%line] - %messsage%n [%level] %date{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{96}:[%line] - %messsage%n
</pattern> </pattern>
<charset>UTF-8</charset> <charset>UTF-8</charset>
</encoder> </encoder>

10
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java

@ -18,8 +18,6 @@
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -30,6 +28,9 @@ import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* logger utils * logger utils
*/ */
@ -51,11 +52,6 @@ public class LoggerUtils {
*/ */
public static final String TASK_LOGGER_INFO_PREFIX = "TASK"; public static final String TASK_LOGGER_INFO_PREFIX = "TASK";
/**
* Task Logger Thread's name
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/** /**
* Task Logger Thread's name * Task Logger Thread's name
*/ */

37
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminator.java

@ -16,16 +16,23 @@
*/ */
package org.apache.dolphinscheduler.server.log; package org.apache.dolphinscheduler.server.log;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.AbstractDiscriminator;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.AbstractDiscriminator;
/** /**
* Task Log Discriminator * Task Log Discriminator
*/ */
public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> { public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
private static Logger logger = LoggerFactory.getLogger(TaskLogDiscriminator.class);
/** /**
* key * key
*/ */
@ -42,15 +49,25 @@ public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
*/ */
@Override @Override
public String getDiscriminatingValue(ILoggingEvent event) { public String getDiscriminatingValue(ILoggingEvent event) {
String loggerName = event.getLoggerName() String key = "unknown_task";
.split(Constants.EQUAL_SIGN)[1];
String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-"; logger.debug("task log discriminator start, key is:{}, thread name:{},loggerName:{}", key, event.getThreadName(), event.getLoggerName());
if (loggerName.startsWith(prefix)) {
return loggerName.substring(prefix.length(), if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) {
loggerName.length() - 1).replace("-","/"); String threadName = event.getThreadName();
} else { if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) {
return "unknown_task"; threadName = threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length());
}
String part1 = threadName
.split(Constants.EQUAL_SIGN)[1];
String prefix = LoggerUtils.TASK_LOGGER_INFO_PREFIX + "-";
if (part1.startsWith(prefix)) {
key = part1.substring(prefix.length(),
part1.length() - 1).replace("-", "/");
}
} }
logger.debug("task log discriminator end, key is:{}, thread name:{},loggerName:{}", key, event.getThreadName(), event.getLoggerName());
return key;
} }
@Override @Override

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/TaskLogFilter.java

@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.log; package org.apache.dolphinscheduler.server.log;
import static org.apache.dolphinscheduler.common.utils.LoggerUtils.TASK_APPID_LOG_FORMAT; import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
@ -30,6 +32,7 @@ import ch.qos.logback.core.spi.FilterReply;
*/ */
public class TaskLogFilter extends Filter<ILoggingEvent> { public class TaskLogFilter extends Filter<ILoggingEvent> {
private static Logger logger = LoggerFactory.getLogger(TaskLogFilter.class);
/** /**
* level * level
*/ */
@ -46,11 +49,13 @@ public class TaskLogFilter extends Filter<ILoggingEvent> {
*/ */
@Override @Override
public FilterReply decide(ILoggingEvent event) { public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith(LoggerUtils.TASK_LOGGER_THREAD_NAME) FilterReply filterReply = FilterReply.DENY;
|| event.getLoggerName().startsWith(" - " + TASK_APPID_LOG_FORMAT) if ((event.getThreadName().startsWith(TaskConstants.TASK_LOGGER_THREAD_NAME)
&& event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME))
|| event.getLevel().isGreaterOrEqual(level)) { || event.getLevel().isGreaterOrEqual(level)) {
return FilterReply.ACCEPT; filterReply = FilterReply.ACCEPT;
} }
return FilterReply.DENY; logger.debug("task log filter, thread name:{},loggerName:{},filterReply:{}", event.getThreadName(), event.getLoggerName(), filterReply.name());
return filterReply;
} }
} }

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -41,8 +41,6 @@ import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.quartz.SchedulerException; import org.quartz.SchedulerException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -69,7 +67,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
}) })
@EnableTransactionManagement @EnableTransactionManagement
public class MasterServer implements IStoppable { public class MasterServer implements IStoppable {
/** /**
* logger of MasterServer * logger of MasterServer
*/ */

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext; import org.apache.dolphinscheduler.spi.task.request.SQLTaskExecutionContext;
@ -67,7 +68,7 @@ import com.google.common.base.Strings;
public abstract class BaseTaskProcessor implements ITaskProcessor { public abstract class BaseTaskProcessor implements ITaskProcessor {
protected Logger logger = LoggerFactory.getLogger(getClass()); protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME, getClass()));
protected boolean killed = false; protected boolean killed = false;

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
@ -41,6 +42,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
@ -48,6 +50,8 @@ import org.slf4j.LoggerFactory;
*/ */
public class ConditionTaskProcessor extends BaseTaskProcessor { public class ConditionTaskProcessor extends BaseTaskProcessor {
protected static final Logger logger = LoggerFactory.getLogger(TaskConstants.TASK_LOG_LOGGER_NAME);
/** /**
* dependent parameters * dependent parameters
*/ */
@ -81,13 +85,12 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion() taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
); );
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, String threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(), taskInstance.getProcessInstanceId(),
taskInstance.getId())); taskInstance.getId());
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance)); Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,threadLoggerInfoName));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters(); initTaskParameters();
logger.info("dependent task start"); logger.info("dependent task start");
return true; return true;

32
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -17,16 +17,18 @@
package org.apache.dolphinscheduler.server.worker.runner; package org.apache.dolphinscheduler.server.worker.runner;
import com.github.rholder.retry.RetryException;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@ -39,20 +41,29 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskAlertInfo; import org.apache.dolphinscheduler.spi.task.TaskAlertInfo;
import org.apache.dolphinscheduler.spi.task.TaskChannel; import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.util.Calendar.DAY_OF_MONTH; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
/** /**
* task scheduler thread * task scheduler thread
@ -162,6 +173,9 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.getTaskInstanceId()); taskExecutionContext.getTaskInstanceId());
taskRequest.setTaskLogName(taskLogName); taskRequest.setTaskLogName(taskLogName);
// set the name of the current thread
Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,taskLogName));
task = taskChannel.createTask(taskRequest); task = taskChannel.createTask(taskRequest);
// task init // task init

17
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogDiscriminatorTest.java

@ -16,16 +16,19 @@
*/ */
package org.apache.dolphinscheduler.server.log; package org.apache.dolphinscheduler.server.log;
import ch.qos.logback.classic.Level; import org.apache.dolphinscheduler.spi.task.TaskConstants;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy; import java.util.Map;
import ch.qos.logback.classic.spi.LoggerContextVO;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Marker; import org.slf4j.Marker;
import java.util.Map; import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO;
public class TaskLogDiscriminatorTest { public class TaskLogDiscriminatorTest {
@ -48,7 +51,7 @@ public class TaskLogDiscriminatorTest {
String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() { String result = taskLogDiscriminator.getDiscriminatingValue(new ILoggingEvent() {
@Override @Override
public String getThreadName() { public String getThreadName() {
return null; return String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,"-[taskAppId=TASK-1-1-1");
} }
@Override @Override
@ -73,7 +76,7 @@ public class TaskLogDiscriminatorTest {
@Override @Override
public String getLoggerName() { public String getLoggerName() {
return "[taskAppId=TASK-1-1-1"; return TaskConstants.TASK_LOG_LOGGER_NAME;
} }
@Override @Override

18
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/TaskLogFilterTest.java

@ -16,17 +16,19 @@
*/ */
package org.apache.dolphinscheduler.server.log; package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Marker;
import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.classic.spi.IThrowableProxy; import ch.qos.logback.classic.spi.IThrowableProxy;
import ch.qos.logback.classic.spi.LoggerContextVO; import ch.qos.logback.classic.spi.LoggerContextVO;
import ch.qos.logback.core.spi.FilterReply; import ch.qos.logback.core.spi.FilterReply;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Marker;
import java.util.Map;
public class TaskLogFilterTest { public class TaskLogFilterTest {
@ -39,7 +41,7 @@ public class TaskLogFilterTest {
FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() { FilterReply filterReply = taskLogFilter.decide(new ILoggingEvent() {
@Override @Override
public String getThreadName() { public String getThreadName() {
return LoggerUtils.TASK_LOGGER_THREAD_NAME; return TaskConstants.TASK_LOGGER_THREAD_NAME;
} }
@Override @Override
@ -64,7 +66,7 @@ public class TaskLogFilterTest {
@Override @Override
public String getLoggerName() { public String getLoggerName() {
return null; return TaskConstants.TASK_LOG_LOGGER_NAME;
} }
@Override @Override

8
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessorTest.java

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
@ -49,7 +48,6 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* test task execute processor * test task execute processor
@ -107,12 +105,6 @@ public class TaskExecuteProcessorTest {
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)) PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class))
.thenReturn(workerConfig); .thenReturn(workerConfig);
Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
workerManager = PowerMockito.mock(WorkerManagerThread.class); workerManager = PowerMockito.mock(WorkerManagerThread.class);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE); PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService))).thenReturn(Boolean.TRUE);

24
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskConstants.java

@ -124,9 +124,24 @@ public class TaskConstants {
public static final String RWXR_XR_X = "rwxr-xr-x"; public static final String RWXR_XR_X = "rwxr-xr-x";
/** /**
* task log info format * Task Logger Thread's name
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/**
* task logger thread name format
*/
public static final String TASK_LOGGER_THREAD_NAME_FORMAT = TASK_LOGGER_THREAD_NAME + "-%s";
/**
* task log logger name
*/ */
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s"; public static final String TASK_LOG_LOGGER_NAME = "TaskLogLogger";
/**
* task log logger name format
*/
public static final String TASK_LOG_LOGGER_NAME_FORMAT = TASK_LOG_LOGGER_NAME + "-%s";
/** /**
* date format of yyyyMMdd * date format of yyyyMMdd
@ -320,10 +335,7 @@ public class TaskConstants {
*/ */
public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state"; public static final String HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE = "hadoop.security.authentication.startup.state";
/** public static final String GET_OUTPUT_LOG_SERVICE = "-getOutputLogService";
* Task Logger Thread's name
*/
public static final String TASK_LOGGER_THREAD_NAME = "TaskLogInfo";
/** /**
* hdfs/s3 configuration * hdfs/s3 configuration

5
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -307,8 +307,9 @@ public abstract class AbstractCommandExecutor {
* @param process process * @param process process
*/ */
private void parseProcessOutput(Process process) { private void parseProcessOutput(Process process) {
String threadLoggerInfoName = String.format(TaskConstants.TASK_LOGGER_THREAD_NAME + "-%s", taskRequest.getTaskAppId()); String threadLoggerInfoName = String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,
ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); taskRequest.getTaskLogName() + TaskConstants.GET_OUTPUT_LOG_SERVICE);
ExecutorService getOutputLogService = newDaemonSingleThreadExecutor(threadLoggerInfoName);
getOutputLogService.submit(() -> { getOutputLogService.submit(() -> {
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line; String line;

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskExecutor.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.task.api; package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.StringJoiner; import java.util.StringJoiner;
@ -32,7 +33,7 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION"); public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");
protected Logger logger; protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
/** /**
* constructor * constructor
@ -41,7 +42,6 @@ public abstract class AbstractTaskExecutor extends AbstractTask {
*/ */
protected AbstractTaskExecutor(TaskRequest taskRequest) { protected AbstractTaskExecutor(TaskRequest taskRequest) {
super(taskRequest); super(taskRequest);
logger = LoggerFactory.getLogger(taskRequest.getTaskLogName());
} }
/** /**

3
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -145,9 +145,6 @@ public class DataxTask extends AbstractTaskExecutor {
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// set the name of the current thread
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
// replace placeholder,and combine local and global parameters // replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters()); Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());

4
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.task.http; package org.apache.dolphinscheduler.plugin.task.http;
import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON; import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils; import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
@ -92,9 +91,6 @@ public class HttpTask extends AbstractTaskExecutor {
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
String threadLoggerInfoName = String.format(TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
String formatTimeStamp = DateUtils.formatTimeStamp(startTime); String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
String statusCode = null; String statusCode = null;

4
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.procedure;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS; import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_SUCCESS;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.TASK_LOG_INFO_FORMAT;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DatasourceUtil; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DatasourceUtil;
@ -85,9 +84,6 @@ public class ProcedureTask extends AbstractTaskExecutor {
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
// set the name of the current thread
String threadLoggerInfoName = String.format(TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}", logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(), procedureParameters.getType(),

3
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -114,9 +114,6 @@ public class SqlTask extends AbstractTaskExecutor {
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
// set the name of the current thread
String threadLoggerInfoName = String.format(TaskConstants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info("Full sql parameters: {}", sqlParameters); logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",

Loading…
Cancel
Save