Browse Source

[Improvement][server] Refactor code for getTaskLogPath to reduce NPE log during UT (#3629)

* init

* update

* update

* update

* fix

* fix

* enable test

* fix

* fix
pull/3/MERGE
Hsu Pu 4 years ago committed by GitHub
parent
commit
68bc58892b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java
  2. 20
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java
  3. 62
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java
  4. 76
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java
  5. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  6. 7
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java
  7. 66
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java
  8. 1
      pom.xml

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/ConditionsTaskExecThread.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
@ -123,7 +125,7 @@ public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {
}
private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance));
this.taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
this.taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());

20
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/DependentTaskExecThread.java

@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import com.fasterxml.jackson.annotation.JsonFormat;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -24,16 +26,22 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonFormat;
public class DependentTaskExecThread extends MasterBaseTaskExecThread {
@ -172,7 +180,7 @@ public class DependentTaskExecThread extends MasterBaseTaskExecThread {
}
private void initTaskParameters() {
taskInstance.setLogPath(getTaskLogPath(taskInstance));
taskInstance.setLogPath(LogUtils.getTaskLogPath(taskInstance));
taskInstance.setHost(NetUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());

62
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterBaseTaskExecThread.java

@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -36,10 +35,6 @@ import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
/**
* master task exec base class
*/
@ -85,11 +80,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* taskUpdateQueue
*/
private TaskPriorityQueue taskUpdateQueue;
/**
* constructor of MasterBaseTaskExecThread
* @param taskInstance task instance
*
* @param taskInstance task instance
*/
public MasterBaseTaskExecThread(TaskInstance taskInstance){
public MasterBaseTaskExecThread(TaskInstance taskInstance) {
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
this.cancel = false;
@ -100,24 +97,26 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* get task instance
*
* @return TaskInstance
*/
public TaskInstance getTaskInstance(){
public TaskInstance getTaskInstance() {
return this.taskInstance;
}
/**
* kill master base task exec thread
*/
public void kill(){
public void kill() {
this.cancel = true;
}
/**
* submit master base task exec thread
*
* @return TaskInstance
*/
protected TaskInstance submit(){
protected TaskInstance submit() {
Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();
@ -156,14 +155,13 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
/**
* dispatcht task
*
* @param taskInstance taskInstance
* @return whether submit task success
*/
public Boolean dispatchTask(TaskInstance taskInstance) {
try{
if(taskInstance.isConditionsTask()
|| taskInstance.isDependTask()
@ -202,7 +200,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* buildTaskPriorityInfo
* buildTaskPriorityInfo
*
* @param processInstancePriority processInstancePriority
* @param processInstanceId processInstanceId
@ -215,7 +213,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
int processInstanceId,
int taskInstancePriority,
int taskInstanceId,
String workerGroup){
String workerGroup) {
return processInstancePriority +
UNDERLINE +
processInstanceId +
@ -229,14 +227,16 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
/**
* submit wait complete
*
* @return true
*/
protected Boolean submitWaitComplete(){
protected Boolean submitWaitComplete() {
return true;
}
/**
* call
*
* @return boolean
* @throws Exception exception
*/
@ -246,34 +246,4 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
return submitWaitComplete();
}
/**
* get task log path
* @return log path
*/
public String getTaskLogPath(TaskInstance task) {
String logPath;
try{
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
logPath = baseLog + Constants.SINGLE_SLASH +
task.getProcessDefinitionId() + Constants.SINGLE_SLASH +
task.getProcessInstanceId() + Constants.SINGLE_SLASH +
task.getId() + ".log";
}else{
logPath = System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
task.getProcessDefinitionId() + Constants.SINGLE_SLASH +
task.getProcessInstanceId() + Constants.SINGLE_SLASH +
task.getId() + ".log";
}
}catch (Exception e){
logger.error("logger", e);
logPath = "";
}
return logPath;
}
}

76
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/LogUtils.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;
import javax.transaction.NotSupportedException;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.sift.SiftingAppender;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.spi.AppenderAttachable;
public class LogUtils {
private LogUtils() throws NotSupportedException {
throw new NotSupportedException();
}
/**
* get task log path
*/
@SuppressWarnings("unchecked")
private static String getTaskLogPath(int processDefinitionId, int processInstanceId, int taskInstanceId) {
// Optional.map will be skipped if null
return Optional.of(LoggerFactory.getILoggerFactory())
.map(e -> (AppenderAttachable<ILoggingEvent>) (e.getLogger("ROOT")))
.map(e -> (SiftingAppender) (e.getAppender("TASKLOGFILE")))
.map(e -> ((TaskLogDiscriminator) (e.getDiscriminator())))
.map(TaskLogDiscriminator::getLogBase)
.map(e -> Paths.get(e)
.toAbsolutePath()
.resolve(String.valueOf(processDefinitionId))
.resolve(String.valueOf(processInstanceId))
.resolve(taskInstanceId + ".log"))
.map(Path::toString)
.orElse("");
}
/**
* get task log path by TaskInstance
*/
public static String getTaskLogPath(TaskInstance taskInstance) {
return getTaskLogPath(taskInstance.getProcessDefinitionId(), taskInstance.getProcessInstanceId(), taskInstance.getId());
}
/**
* get task log path by TaskExecutionContext
*/
public static String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
return getTaskLogPath(taskExecutionContext.getProcessId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
}
}

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -36,7 +34,7 @@ import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@ -51,8 +49,6 @@ import org.slf4j.LoggerFactory;
import com.github.rholder.retry.RetryException;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.sift.SiftingAppender;
import io.netty.channel.Channel;
/**
@ -154,28 +150,6 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
}
/**
* get task log path
* @return log path
*/
private String getTaskLogPath(TaskExecutionContext taskExecutionContext) {
String baseLog = ((TaskLogDiscriminator) ((SiftingAppender) ((LoggerContext) LoggerFactory.getILoggerFactory())
.getLogger("ROOT")
.getAppender("TASKLOGFILE"))
.getDiscriminator()).getLogBase();
if (baseLog.startsWith(Constants.SINGLE_SLASH)){
return baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
return System.getProperty("user.dir") + Constants.SINGLE_SLASH +
baseLog + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessDefineId() + Constants.SINGLE_SLASH +
taskExecutionContext.getProcessInstanceId() + Constants.SINGLE_SLASH +
taskExecutionContext.getTaskInstanceId() + ".log";
}
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
@ -185,7 +159,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){

7
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java

@ -17,14 +17,15 @@
package org.apache.dolphinscheduler.server.master.runner;
import java.util.HashSet;
import java.util.Set;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.HashSet;
import java.util.Set;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

66
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/LogUtilsTest.java

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.log.TaskLogDiscriminator;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.LoggerFactory;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.sift.SiftingAppender;
@RunWith(MockitoJUnitRunner.class)
public class LogUtilsTest {
@Test
public void testGetTaskLogPath() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setProcessDefinitionId(1);
taskInstance.setProcessInstanceId(100);
taskInstance.setId(1000);
Logger rootLogger = (Logger) LoggerFactory.getILoggerFactory().getLogger("ROOT");
Assert.assertNotNull(rootLogger);
SiftingAppender appender = Mockito.mock(SiftingAppender.class);
// it's a trick to mock logger.getAppend("TASKLOGFILE")
Mockito.when(appender.getName()).thenReturn("TASKLOGFILE");
rootLogger.addAppender(appender);
Path logBase = Paths.get("path").resolve("to").resolve("test");
TaskLogDiscriminator taskLogDiscriminator = Mockito.mock(TaskLogDiscriminator.class);
Mockito.when(taskLogDiscriminator.getLogBase()).thenReturn(logBase.toString());
Mockito.when(appender.getDiscriminator()).thenReturn(taskLogDiscriminator);
Path logPath = Paths.get(".").toAbsolutePath().getParent()
.resolve(logBase)
.resolve("1").resolve("100").resolve("1000.log");
Assert.assertEquals(logPath.toString(), LogUtils.getTaskLogPath(taskInstance));
}
}

1
pom.xml

@ -833,6 +833,7 @@
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include>
<include>**/server/utils/ProcessUtilsTest.java</include>
<include>**/server/utils/SparkArgsUtilsTest.java</include>

Loading…
Cancel
Save