From 516757cc7400b576ca0933dd5826005db06fa508 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 6 Jun 2022 10:49:23 +0800 Subject: [PATCH] [Fix] [Worker] Fix worker will hang if fails to start (#10342) * Fix worker will hang if fails to start * Add .run to ignore --- .gitignore | 1 + .../datasource/hive/HiveDataSourceClient.java | 4 +++- .../server/log/LoggerRequestProcessor.java | 16 ++++++++++------ .../remote/NettyRemotingServer.java | 4 ++-- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 8e55e20304..fbc74e41d1 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ .DS_Store .target .idea/ +.run/ target/ dist/ all-dependencies.txt diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java index 08b57631ac..dd4f7e89ca 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.datasource.hive; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.zaxxer.hikari.HikariDataSource; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; @@ -60,7 +61,8 @@ public class HiveDataSourceClient extends CommonDataSourceClient { @Override protected void preInit() { logger.info("PreInit in {}", getClass().getName()); - this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(); + this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build()); } @Override diff --git a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java index afc914ef05..1ab4aa13fe 100644 --- a/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java +++ b/dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.commons.lang3.StringUtils; @@ -54,6 +55,8 @@ import org.springframework.stereotype.Component; import io.netty.channel.Channel; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * logger request process logic */ @@ -65,7 +68,8 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { private final ExecutorService executor; public LoggerRequestProcessor() { - this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1); + this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1, + new NamedThreadFactory("Log-Request-Process-Thread")); } @Override @@ -80,7 +84,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { command.getBody(), GetLogBytesRequestCommand.class); String path = getLogRequest.getPath(); if (!checkPathSecurity(path)) { - throw new IllegalArgumentException("Illegal path"); + throw new IllegalArgumentException("Illegal path: " + path); } byte[] bytes = getFileContentBytes(path); GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); @@ -91,7 +95,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { command.getBody(), ViewLogRequestCommand.class); String viewLogPath = viewLogRequest.getPath(); if (!checkPathSecurity(viewLogPath)) { - throw new IllegalArgumentException("Illegal path"); + throw new IllegalArgumentException("Illegal path: " + viewLogPath); } String msg = LoggerUtils.readWholeFileContent(viewLogPath); ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); @@ -103,7 +107,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { String rollViewLogPath = rollViewLogRequest.getPath(); if (!checkPathSecurity(rollViewLogPath)) { - throw new IllegalArgumentException("Illegal path"); + throw new IllegalArgumentException("Illegal path: " + rollViewLogPath); } List lines = readPartFileContent(rollViewLogPath, @@ -121,7 +125,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { String taskLogPath = removeTaskLogRequest.getPath(); if (!checkPathSecurity(taskLogPath)) { - throw new IllegalArgumentException("Illegal path"); + throw new IllegalArgumentException("Illegal path: " + taskLogPath); } File taskLogFile = new File(taskLogPath); boolean status = true; @@ -137,7 +141,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor { channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque())); break; default: - throw new IllegalArgumentException("unknown commandType"); + throw new IllegalArgumentException("unknown commandType: " + commandType); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java index 2154cb7173..098751ab65 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java @@ -103,8 +103,8 @@ public class NettyRemotingServer { */ public NettyRemotingServer(final NettyServerConfig serverConfig) { this.serverConfig = serverConfig; - ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build(); - ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build(); + ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build(); + ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build(); if (Epoll.isAvailable()) { this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);