From 90c87f012129e812d87a3f6e847de16f525611ea Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 20 Jun 2022 22:35:06 +0800 Subject: [PATCH] [Fix-10413] Fix Master startup failure the server still hang (#10500) * Fix Master startup failure the server still hang (cherry picked from commit 117f78ec4b0e2438082a3e25158492eca1b9b1be) --- .../common/storage/StorageOperate.java | 32 ++++++++--------- .../common/thread/BaseDaemonThread.java | 36 +++++++++++++++++++ .../consumer/TaskPriorityQueueConsumer.java | 11 +++--- .../queue/StateEventResponseService.java | 8 +++-- .../processor/queue/TaskEventService.java | 17 ++++++--- .../master/runner/EventExecuteService.java | 8 +++-- .../master/runner/FailoverExecuteThread.java | 10 ++++-- .../master/runner/MasterSchedulerService.java | 10 ++++-- .../runner/StateWheelExecuteThread.java | 7 +++- 9 files changed, 103 insertions(+), 36 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java index 5248586de1..7854eaa032 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java @@ -28,21 +28,21 @@ import java.util.List; public interface StorageOperate { - public static final String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); + String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); /** * if the resource of tenant 's exist, the resource of folder will be created * @param tenantCode * @throws Exception */ - public void createTenantDirIfNotExists(String tenantCode) throws Exception; + void createTenantDirIfNotExists(String tenantCode) throws Exception; /** * get the resource directory of tenant * @param tenantCode * @return */ - public String getResDir(String tenantCode); + String getResDir(String tenantCode); /** * return the udf directory of tenant @@ -50,7 +50,7 @@ public interface StorageOperate { * @return */ - public String getUdfDir(String tenantCode); + String getUdfDir(String tenantCode); /** * create the directory that the path of tenant wanted to create @@ -59,7 +59,7 @@ public interface StorageOperate { * @return * @throws IOException */ - public boolean mkdir(String tenantCode,String path) throws IOException; + boolean mkdir(String tenantCode, String path) throws IOException; /** * get the path of the resource file @@ -67,7 +67,7 @@ public interface StorageOperate { * @param fullName * @return */ - public String getResourceFileName(String tenantCode, String fullName); + String getResourceFileName(String tenantCode, String fullName); /** * get the path of the file @@ -76,7 +76,7 @@ public interface StorageOperate { * @param fileName * @return */ - public String getFileName(ResourceType resourceType, String tenantCode, String fileName); + String getFileName(ResourceType resourceType, String tenantCode, String fileName); /** * predicate if the resource of tenant exists @@ -85,7 +85,7 @@ public interface StorageOperate { * @return * @throws IOException */ - public boolean exists(String tenantCode,String fileName) throws IOException; + boolean exists(String tenantCode, String fileName) throws IOException; /** * delete the resource of filePath @@ -96,7 +96,7 @@ public interface StorageOperate { * @return * @throws IOException */ - public boolean delete(String tenantCode,String filePath, boolean recursive) throws IOException; + boolean delete(String tenantCode, String filePath, boolean recursive) throws IOException; /** * copy the file from srcPath to dstPath @@ -107,7 +107,7 @@ public interface StorageOperate { * @return * @throws IOException */ - public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException; + boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException; /** * get the root path of the tenant with resourceType @@ -115,7 +115,7 @@ public interface StorageOperate { * @param tenantCode * @return */ - public String getDir(ResourceType resourceType, String tenantCode); + String getDir(ResourceType resourceType, String tenantCode); /** * upload the local srcFile to dstPath @@ -127,7 +127,7 @@ public interface StorageOperate { * @return * @throws IOException */ - public boolean upload(String tenantCode,String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException; + boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException; /** * download the srcPath to local @@ -138,7 +138,7 @@ public interface StorageOperate { * @param overwrite * @throws IOException */ - public void download(String tenantCode,String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException; + void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException; /** * vim the context of filePath @@ -149,7 +149,7 @@ public interface StorageOperate { * @return * @throws IOException */ - public List vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException; + List vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException; /** * delete the files and directory of the tenant @@ -157,13 +157,13 @@ public interface StorageOperate { * @param tenantCode * @throws Exception */ - public void deleteTenant(String tenantCode) throws Exception; + void deleteTenant(String tenantCode) throws Exception; /** * return the storageType * * @return */ - public ResUploadType returnStorageType(); + ResUploadType returnStorageType(); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java new file mode 100644 index 0000000000..88a44004cb --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.thread; + +/** + * All thread used in DolphinScheduler should extend with this class to avoid the server hang issue. + */ +public abstract class BaseDaemonThread extends Thread { + + protected BaseDaemonThread(Runnable runnable) { + super(runnable); + this.setDaemon(true); + } + + protected BaseDaemonThread(String threadName) { + super(); + this.setName(threadName); + this.setDaemon(true); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index d75595de1a..eab33a07a6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -60,7 +61,7 @@ import org.springframework.stereotype.Component; * TaskUpdateQueue consumer */ @Component -public class TaskPriorityQueueConsumer extends Thread { +public class TaskPriorityQueueConsumer extends BaseDaemonThread { /** * logger of TaskUpdateQueueConsumer @@ -108,6 +109,10 @@ public class TaskPriorityQueueConsumer extends Thread { */ private ThreadPoolExecutor consumerThreadPoolExecutor; + protected TaskPriorityQueueConsumer() { + super("TaskPriorityQueueConsumeThread"); + } + @PostConstruct public void init() { this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber()); @@ -198,10 +203,8 @@ public class TaskPriorityQueueConsumer extends Thread { } else { logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); } - } catch (RuntimeException e) { + } catch (RuntimeException | ExecuteException e) { logger.error("Master dispatch task to worker error: ", e); - } catch (ExecuteException e) { - logger.error("Master dispatch task to worker error: {}", e); } return result; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java index 83772a1054..e34dedca67 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.StateEvent; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; @@ -70,7 +71,6 @@ public class StateEventResponseService { @PostConstruct public void start() { this.responseWorker = new StateEventResponseWorker(); - this.responseWorker.setName("StateEventResponseWorker"); this.responseWorker.start(); } @@ -101,7 +101,11 @@ public class StateEventResponseService { /** * task worker thread */ - class StateEventResponseWorker extends Thread { + class StateEventResponseWorker extends BaseDaemonThread { + + protected StateEventResponseWorker() { + super("StateEventResponseWorker"); + } @Override public void run() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java index 924dd131f8..bed3b3d9ed 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import java.util.ArrayList; @@ -63,11 +64,9 @@ public class TaskEventService { @PostConstruct public void start() { this.taskEventThread = new TaskEventThread(); - this.taskEventThread.setName("TaskEventThread"); this.taskEventThread.start(); this.taskEventHandlerThread = new TaskEventHandlerThread(); - this.taskEventHandlerThread.setName("TaskEventHandlerThread"); this.taskEventHandlerThread.start(); } @@ -85,7 +84,7 @@ public class TaskEventService { taskExecuteThreadPool.eventHandler(); } } catch (Exception e) { - logger.error("stop error:", e); + logger.error("TaskEventService stop error:", e); } } @@ -101,7 +100,11 @@ public class TaskEventService { /** * task worker thread */ - class TaskEventThread extends Thread { + class TaskEventThread extends BaseDaemonThread { + protected TaskEventThread() { + super("TaskEventLoopThread"); + } + @Override public void run() { while (Stopper.isRunning()) { @@ -123,7 +126,11 @@ public class TaskEventService { /** * event handler thread */ - class TaskEventHandlerThread extends Thread { + class TaskEventHandlerThread extends BaseDaemonThread { + + protected TaskEventHandlerThread() { + super("TaskEventHandlerThread"); + } @Override public void run() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java index fc9c4fd854..97c67d8493 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; @@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service -public class EventExecuteService extends Thread { +public class EventExecuteService extends BaseDaemonThread { private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class); @@ -42,9 +43,12 @@ public class EventExecuteService extends Thread { @Autowired private WorkflowExecuteThreadPool workflowExecuteThreadPool; + protected EventExecuteService() { + super("EventServiceStarted"); + } + @Override public synchronized void start() { - super.setName("EventServiceStarted"); super.start(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java index 4bac6cdf15..1c6ea144e4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service -public class FailoverExecuteThread extends Thread { +public class FailoverExecuteThread extends BaseDaemonThread { private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class); @@ -42,16 +43,19 @@ public class FailoverExecuteThread extends Thread { @Autowired private FailoverService failoverService; + protected FailoverExecuteThread() { + super("FailoverExecuteThread"); + } + @Override public synchronized void start() { - super.setName("FailoverExecuteThread"); super.start(); } @Override public void run() { // when startup, wait 10s for ready - ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10); logger.info("failover execute thread started"); while (Stopper.isRunning()) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 23577a90db..94991a50c3 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.SlotCheckState; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; @@ -52,7 +53,7 @@ import org.springframework.stereotype.Service; * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. */ @Service -public class MasterSchedulerService extends Thread { +public class MasterSchedulerService extends BaseDaemonThread { /** * logger of MasterSchedulerService @@ -102,6 +103,10 @@ public class MasterSchedulerService extends Thread { @Autowired private StateWheelExecuteThread stateWheelExecuteThread; + protected MasterSchedulerService() { + super("MasterCommandLoopThread"); + } + /** * constructor of MasterSchedulerService */ @@ -113,9 +118,8 @@ public class MasterSchedulerService extends Thread { @Override public synchronized void start() { - super.setName("MasterSchedulerService"); - super.start(); this.stateWheelExecuteThread.start(); + super.start(); } public void close() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index e85ddf08bd..12d404406c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.TimeoutFlag; +import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -50,7 +51,7 @@ import org.springframework.stereotype.Component; * 4. timeout process check */ @Component -public class StateWheelExecuteThread extends Thread { +public class StateWheelExecuteThread extends BaseDaemonThread { private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); @@ -83,6 +84,10 @@ public class StateWheelExecuteThread extends Thread { @Autowired private ProcessInstanceExecCacheManager processInstanceExecCacheManager; + protected StateWheelExecuteThread() { + super("StateWheelExecuteThread"); + } + @Override public void run() { Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);