Browse Source

[Fix] [Worker] Fix worker will hang if fails to start (#10501)

* [Fix] [Worker] Fix worker will hang if fails to start (#10342)

* Fix worker will hang if fails to start

* Add .run to ignore

Signed-off-by: ruanwenjun <wenjun@apache.org>

* Add import Epoll

* Add cpu_quota in h2 to fix UT

* Remove cpu_quota,memory_max in TaskDefiniitionMapper
3.0.0/version-upgrade
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
742944c60d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 2
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  3. 4
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java
  4. 16
      dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
  5. 7
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

1
.gitignore vendored

@ -6,6 +6,7 @@
.DS_Store .DS_Store
.target .target
.idea/ .idea/
.run/
target/ target/
dist/ dist/
all-dependencies.txt all-dependencies.txt

2
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -28,7 +28,7 @@
${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code, ${alias}.task_type, ${alias}.task_params, ${alias}.flag, ${alias}.task_priority, ${alias}.worker_group, ${alias}.environment_code,
${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout, ${alias}.fail_retry_times, ${alias}.fail_retry_interval, ${alias}.timeout_flag, ${alias}.timeout_notify_strategy, ${alias}.timeout,
${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id, ${alias}.delay_time, ${alias}.resource_ids, ${alias}.create_time, ${alias}.update_time, ${alias}.task_group_id,
${alias}.task_group_priority, ${alias}.cpu_quota, ${alias}.memory_max ${alias}.task_group_priority
</sql> </sql>
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition"> <select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select select

4
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.datasource.hive; package org.apache.dolphinscheduler.plugin.datasource.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider; import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
@ -60,7 +61,8 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
@Override @Override
protected void preInit() { protected void preInit() {
logger.info("PreInit in {}", getClass().getName()); logger.info("PreInit in {}", getClass().getName());
this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(); this.kerberosRenewalService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Hive-Kerberos-Renewal-Thread-").setDaemon(true).build());
} }
@Override @Override

16
dolphinscheduler-log-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand; import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -54,6 +55,8 @@ import org.springframework.stereotype.Component;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* logger request process logic * logger request process logic
*/ */
@ -65,7 +68,8 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
private final ExecutorService executor; private final ExecutorService executor;
public LoggerRequestProcessor() { public LoggerRequestProcessor() {
this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1); this.executor = Executors.newFixedThreadPool(Constants.CPUS * 2 + 1,
new NamedThreadFactory("Log-Request-Process-Thread"));
} }
@Override @Override
@ -80,7 +84,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
command.getBody(), GetLogBytesRequestCommand.class); command.getBody(), GetLogBytesRequestCommand.class);
String path = getLogRequest.getPath(); String path = getLogRequest.getPath();
if (!checkPathSecurity(path)) { if (!checkPathSecurity(path)) {
throw new IllegalArgumentException("Illegal path"); throw new IllegalArgumentException("Illegal path: " + path);
} }
byte[] bytes = getFileContentBytes(path); byte[] bytes = getFileContentBytes(path);
GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes); GetLogBytesResponseCommand getLogResponse = new GetLogBytesResponseCommand(bytes);
@ -91,7 +95,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
command.getBody(), ViewLogRequestCommand.class); command.getBody(), ViewLogRequestCommand.class);
String viewLogPath = viewLogRequest.getPath(); String viewLogPath = viewLogRequest.getPath();
if (!checkPathSecurity(viewLogPath)) { if (!checkPathSecurity(viewLogPath)) {
throw new IllegalArgumentException("Illegal path"); throw new IllegalArgumentException("Illegal path: " + viewLogPath);
} }
String msg = LoggerUtils.readWholeFileContent(viewLogPath); String msg = LoggerUtils.readWholeFileContent(viewLogPath);
ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg); ViewLogResponseCommand viewLogResponse = new ViewLogResponseCommand(msg);
@ -103,7 +107,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
String rollViewLogPath = rollViewLogRequest.getPath(); String rollViewLogPath = rollViewLogRequest.getPath();
if (!checkPathSecurity(rollViewLogPath)) { if (!checkPathSecurity(rollViewLogPath)) {
throw new IllegalArgumentException("Illegal path"); throw new IllegalArgumentException("Illegal path: " + rollViewLogPath);
} }
List<String> lines = readPartFileContent(rollViewLogPath, List<String> lines = readPartFileContent(rollViewLogPath,
@ -121,7 +125,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
String taskLogPath = removeTaskLogRequest.getPath(); String taskLogPath = removeTaskLogRequest.getPath();
if (!checkPathSecurity(taskLogPath)) { if (!checkPathSecurity(taskLogPath)) {
throw new IllegalArgumentException("Illegal path"); throw new IllegalArgumentException("Illegal path: " + taskLogPath);
} }
File taskLogFile = new File(taskLogPath); File taskLogFile = new File(taskLogPath);
boolean status = true; boolean status = true;
@ -137,7 +141,7 @@ public class LoggerRequestProcessor implements NettyRequestProcessor {
channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque())); channel.writeAndFlush(removeTaskLogResponse.convert2Command(command.getOpaque()));
break; break;
default: default:
throw new IllegalArgumentException("unknown commandType"); throw new IllegalArgumentException("unknown commandType: " + commandType);
} }
} }

7
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/NettyRemotingServer.java

@ -43,6 +43,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
@ -102,9 +103,9 @@ public class NettyRemotingServer {
*/ */
public NettyRemotingServer(final NettyServerConfig serverConfig) { public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig; this.serverConfig = serverConfig;
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerBossThread_%s").build(); ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("NettyServerWorkerThread_%s").build(); ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build();
if (NettyUtils.useEpoll()) { if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory); this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory); this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
} else { } else {

Loading…
Cancel
Save