Browse Source

Change remove task instance log to async request (#13399)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
01f4fb26f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
  2. 37
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java
  3. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  4. 11
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java

@ -379,13 +379,7 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl implements TaskInst
for (TaskInstance taskInstance : needToDeleteTaskInstances) {
// delete log
if (StringUtils.isNotEmpty(taskInstance.getLogPath())) {
try {
logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskInstance.getLogPath());
} catch (Exception e) {
logger.error(
"Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}",
taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e);
}
}
}

37
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClient.java

@ -177,29 +177,38 @@ public class LogClient implements AutoCloseable {
*
* @param host host
* @param path path
* @return remove task status
*/
public Boolean removeTaskLog(@NonNull Host host, String path) {
logger.info("Remove task log from host: {} logPath {}", host, path);
public void removeTaskLog(@NonNull Host host, String path) {
logger.info("Begin remove task log from host: {} logPath {}", host, path);
RemoveTaskLogRequestCommand request = new RemoveTaskLogRequestCommand(path);
try {
Command command = request.convert2Command();
Command response = this.client.sendSync(host, command, LOG_REQUEST_TIMEOUT);
if (response != null) {
RemoveTaskLogResponseCommand taskLogResponse =
client.sendAsync(host, command, LOG_REQUEST_TIMEOUT, responseFuture -> {
if (responseFuture.getCause() != null) {
logger.error("Remove task log from host: {} logPath {} error, meet an unknown exception", host,
path, responseFuture.getCause());
return;
}
Command response = responseFuture.getResponseCommand();
if (response == null) {
logger.error("Remove task log from host: {} logPath {} error, response is null", host, path);
return;
}
RemoveTaskLogResponseCommand removeTaskLogResponse =
JSONUtils.parseObject(response.getBody(), RemoveTaskLogResponseCommand.class);
return taskLogResponse.getStatus();
if (removeTaskLogResponse.getStatus()) {
logger.info("Success remove task log from host: {} logPath {}", host, path);
} else {
logger.error("Remove task log from host: {} logPath {} error", host, path);
}
return false;
} catch (InterruptedException ex) {
});
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
logger.error(
"Remove task log from host: {}, logPath: {} error, the current thread has been interrupted",
host, path, ex);
return false;
logger.error("Remove task log from host: {} logPath {} error, the current thread has been interrupted",
host,
path, interruptedException);
} catch (Exception e) {
logger.error("Remove task log from host: {}, logPath: {} error", host, path, e);
return false;
}
}

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -516,17 +516,10 @@ public class ProcessServiceImpl implements ProcessService {
}
for (TaskInstance taskInstance : taskInstanceList) {
String taskLogPath = taskInstance.getLogPath();
if (Strings.isNullOrEmpty(taskInstance.getHost())) {
if (StringUtils.isEmpty(taskInstance.getHost()) || StringUtils.isEmpty(taskLogPath)) {
continue;
}
try {
Host host = Host.of(taskInstance.getHost());
logClient.removeTaskLog(host, taskLogPath);
} catch (Exception e) {
logger.error(
"Remove task log error, meet an unknown exception, taskInstanceId: {}, host: {}, logPath: {}",
taskInstance.getId(), taskInstance.getHost(), taskInstance.getLogPath(), e);
}
logClient.removeTaskLog(Host.of(taskInstance.getHost()), taskLogPath);
}
}
@ -2429,8 +2422,6 @@ public class ProcessServiceImpl implements ProcessService {
/**
* the first time (when submit the task ) get the resource of the task group
*
* @param taskId task id
*/
@Override
public boolean acquireTaskGroup(int taskInstanceId,

11
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientTest.java

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.factory.NettyRemotingClientFactory;
@ -136,7 +135,7 @@ public class LogClientTest {
}
@Test
public void testRemoveTaskLog() throws Exception {
public void testRemoveTaskLog() {
try (
MockedStatic<NettyRemotingClientFactory> mockedNettyRemotingClientFactory =
@ -144,15 +143,9 @@ public class LogClientTest {
NettyRemotingClient remotingClient = Mockito.mock(NettyRemotingClient.class);
mockedNettyRemotingClientFactory.when(NettyRemotingClientFactory::buildNettyRemotingClient)
.thenReturn(remotingClient);
Command command = new Command();
command.setBody(JSONUtils.toJsonByteArray(new RemoveTaskLogResponseCommand(true)));
Mockito.when(
remotingClient.sendSync(Mockito.any(Host.class), Mockito.any(Command.class), Mockito.anyLong()))
.thenReturn(command);
LogClient logClient = new LogClient();
Boolean status = logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path");
Assertions.assertTrue(status);
Assertions.assertDoesNotThrow(() -> logClient.removeTaskLog(Host.of("localhost:1234"), "/log/path"));
}
}
}

Loading…
Cancel
Save