Browse Source

Use DefaultUncaughtExceptionHandler to log the uncached exception (#15496)

Co-authored-by: fuchanghai <changhaifu@apache.org>
3.2.1-prepare
Wenjun Ruan 10 months ago committed by GitHub
parent
commit
64e1e67581
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
  2. 6
      dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  4. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
  5. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
  6. 47
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
  7. 20
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  8. 37
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
  9. 2
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
  10. 16
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
  11. 17
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
  12. 2
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
  13. 61
      dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
  14. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  15. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
  16. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  17. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
  18. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
  19. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
  20. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  21. 3
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  22. 6
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
  23. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java

4
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.alert;
import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import javax.annotation.PreDestroy;
@ -54,6 +56,8 @@ public class AlertServer {
private AlertRegistryClient alertRegistryClient;
public static void main(String[] args) {
AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
new SpringApplicationBuilder(AlertServer.class).run(args);
}

6
dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java

@ -45,6 +45,12 @@ public class AlertServerMetrics {
.register(Metrics.globalRegistry);
}
public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.alert.uncached.exception", supplier)
.description("number of uncached exception")
.register(Metrics.globalRegistry);
}
public void incAlertSuccessCount() {
alertSuccessCounter.increment();
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
import org.apache.dolphinscheduler.common.enums.PluginType;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
@ -51,6 +53,8 @@ public class ApiApplicationServer {
private PluginDao pluginDao;
public static void main(String[] args) {
ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
SpringApplication.run(ApiApplicationServer.class);
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java

@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.api.metrics;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.experimental.UtilityClass;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
@ -120,4 +122,10 @@ public class ApiServerMetrics {
"ds.api.response.time",
"user.id", String.valueOf(userId)));
}
public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.api.uncached.exception", supplier)
.description("number of uncached exception")
.register(Metrics.globalRegistry);
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java

@ -25,12 +25,14 @@ public abstract class BaseDaemonThread extends Thread {
protected BaseDaemonThread(Runnable runnable) {
super(runnable);
this.setDaemon(true);
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
}
protected BaseDaemonThread(String threadName) {
super();
this.setName(threadName);
this.setDaemon(true);
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
}
}

47
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler();
private static final LongAdder uncaughtExceptionCount = new LongAdder();
private DefaultUncaughtExceptionHandler() {
}
public static DefaultUncaughtExceptionHandler getInstance() {
return INSTANCE;
}
public static long getUncaughtExceptionCount() {
return uncaughtExceptionCount.longValue();
}
@Override
public void uncaughtException(Thread t, Throwable e) {
uncaughtExceptionCount.add(1);
log.error("Caught an exception in {}.", t, e);
}
}

20
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -31,24 +31,20 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@Slf4j
public class ThreadUtils {
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
* @param threadName threadName
* @param threadsNum threadsNum
* @return ExecutorService
*/
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, threadFactory);
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, newDaemonThreadFactory(threadName));
}
public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadName)
return Executors.newSingleThreadScheduledExecutor(newDaemonThreadFactory(threadName));
}
public static ThreadFactory newDaemonThreadFactory(String threadName) {
return new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadName)
.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance())
.build();
return Executors.newSingleThreadScheduledExecutor(threadFactory);
}
/**

37
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.ThreadPoolExecutor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class ThreadUtilsTest {
@Test
void newDaemonFixedThreadExecutor() throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("DemonThread", 1);
threadPoolExecutor.execute(() -> {
throw new IllegalArgumentException("I am an exception");
});
Thread.sleep(1_000);
Assertions.assertEquals(1, DefaultUncaughtExceptionHandler.getUncaughtExceptionCount());
}
}

2
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java

@ -67,7 +67,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
future.release();
if (future.getInvokeCallback() != null) {
future.removeFuture();
this.callbackExecutor.submit(future::executeInvokeCallback);
this.callbackExecutor.execute(future::executeInvokeCallback);
} else {
future.putResponse(deserialize);
}

16
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.extract.base;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress;
@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -80,12 +81,11 @@ public class NettyRemotingClient implements AutoCloseable {
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
ThreadFactory nettyClientThreadFactory = ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
if (Epoll.isAvailable()) {
this.workerGroup =
new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
} else {
this.workerGroup =
new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
}
this.callbackExecutor = new ThreadPoolExecutor(
Constants.CPUS,
@ -93,12 +93,12 @@ public class NettyRemotingClient implements AutoCloseable {
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("CallbackExecutor"),
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
this.start();
}

17
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.extract.base;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
@ -27,15 +28,11 @@ import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@ -55,8 +52,8 @@ public class NettyRemotingServer {
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private final ExecutorService defaultExecutor =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
private final ExecutorService defaultExecutor = ThreadUtils
.newDaemonFixedThreadExecutor("NettyRemotingServerThread", Runtime.getRuntime().availableProcessors() * 2);
private final EventLoopGroup bossGroup;
@ -68,16 +65,12 @@ public class NettyRemotingServer {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail";
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
ThreadFactory bossThreadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName() + "BossThread_%s")
.build();
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "BossThread_%s");
ThreadFactory workerThreadFactory =
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat(serverConfig.getServerName() + "WorkerThread_%s").build();
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + "WorkerThread_%s");
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);

2
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java

@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends ChannelInboundHandlerAdapter {
channel.writeAndFlush(response);
return;
}
nettyRemotingServer.getDefaultExecutor().submit(() -> {
nettyRemotingServer.getDefaultExecutor().execute(() -> {
StandardRpcResponse iRpcResponse;
try {
StandardRpcRequest standardRpcRequest =

61
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.base.utils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* thread factory
*/
public class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger increment = new AtomicInteger(1);
/**
* name
*/
private final String name;
/**
* count
*/
private final int count;
public NamedThreadFactory(String name) {
this(name, 0);
}
public NamedThreadFactory(String name, int count) {
this.name = name;
this.count = count;
}
/**
* create thread
* @param r runnable
* @return thread
*/
@Override
public Thread newThread(Runnable r) {
final String threadName = count > 0 ? String.format("%s_%d_%d", name, count, increment.getAndIncrement())
: String.format("%s_%d", name, increment.getAndIncrement());
Thread t = new Thread(r, threadName);
t.setDaemon(true);
return t;
}
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@ -84,6 +85,9 @@ public class MasterServer implements IStoppable {
private MasterSlotManager masterSlotManager;
public static void main(String[] args) {
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java

@ -61,6 +61,12 @@ public class MasterServerMetrics {
.register(Metrics.globalRegistry);
}
public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.master.uncached.exception", supplier)
.description("number of uncached exception")
.register(Metrics.globalRegistry);
}
public void incMasterOverload() {
masterOverloadCounter.increment();
}

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
@ -116,8 +116,8 @@ public class ServerNodeManager implements InitializingBean {
refreshNodesAndGroupMappings();
// init executor service
executorService =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService = Executors
.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(
new WorkerNodeInfoAndGroupDbSyncTask(),
0,

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java

@ -78,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
"Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed, will stop the async master task");
continue;
}
masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() -> {
masterAsyncTaskExecutorThreadPool.getThreadPool().execute(() -> {
final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction =

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java

@ -41,7 +41,7 @@ public class MasterAsyncTaskExecutorThreadPool implements IMasterTaskExecutorThr
public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor asyncMasterTaskExecutor) {
synchronized (MasterAsyncTaskExecutorThreadPool.class) {
// todo: check if the thread pool is overload
threadPoolExecutor.submit(asyncMasterTaskExecutor);
threadPoolExecutor.execute(asyncMasterTaskExecutor);
return true;
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java

@ -41,7 +41,7 @@ public class MasterSyncTaskExecutorThreadPool implements IMasterTaskExecutorThre
public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor syncMasterTaskExecutor) {
synchronized (MasterSyncTaskExecutorThreadPool.class) {
// todo: check if the thread pool is overload
threadPoolExecutor.submit(syncMasterTaskExecutor);
threadPoolExecutor.execute(syncMasterTaskExecutor);
return true;
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -265,7 +265,7 @@ public abstract class AbstractCommandExecutor {
// todo: remove this this thread pool.
ExecutorService getOutputLogService = ThreadUtils
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
getOutputLogService.submit(() -> {
getOutputLogService.execute(() -> {
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());

3
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@ -75,6 +76,8 @@ public class WorkerServer implements IStoppable {
* @param args arguments
*/
public static void main(String[] args) {
WorkerServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
SpringApplication.run(WorkerServer.class);
}

6
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java

@ -127,4 +127,10 @@ public class WorkerServerMetrics {
.register(Metrics.globalRegistry);
}
public static void registerUncachedException(final Supplier<Number> supplier) {
Gauge.builder("ds.worker.uncached.exception", supplier)
.description("number of uncached exception")
.register(Metrics.globalRegistry);
}
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java

@ -49,7 +49,7 @@ public class WorkerTaskExecutorThreadPool {
synchronized (WorkerTaskExecutorThreadPool.class) {
if (TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy())) {
WorkerTaskExecutorHolder.put(workerTaskExecutor);
threadPoolExecutor.submit(workerTaskExecutor);
threadPoolExecutor.execute(workerTaskExecutor);
return true;
}
if (isOverload()) {
@ -58,7 +58,7 @@ public class WorkerTaskExecutorThreadPool {
return false;
}
WorkerTaskExecutorHolder.put(workerTaskExecutor);
threadPoolExecutor.submit(workerTaskExecutor);
threadPoolExecutor.execute(workerTaskExecutor);
return true;
}
}

Loading…
Cancel
Save