Browse Source

[Fix-10413] Fix Master startup failure the server still hang (#10500)

* Fix Master startup failure the server still hang
3.1.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
117f78ec4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
  2. 36
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
  3. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  5. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
  6. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  7. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  8. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  9. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

32
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java

@ -28,21 +28,21 @@ import java.util.List;
public interface StorageOperate { public interface StorageOperate {
public static final String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler"); String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
/** /**
* if the resource of tenant 's exist, the resource of folder will be created * if the resource of tenant 's exist, the resource of folder will be created
* @param tenantCode * @param tenantCode
* @throws Exception * @throws Exception
*/ */
public void createTenantDirIfNotExists(String tenantCode) throws Exception; void createTenantDirIfNotExists(String tenantCode) throws Exception;
/** /**
* get the resource directory of tenant * get the resource directory of tenant
* @param tenantCode * @param tenantCode
* @return * @return
*/ */
public String getResDir(String tenantCode); String getResDir(String tenantCode);
/** /**
* return the udf directory of tenant * return the udf directory of tenant
@ -50,7 +50,7 @@ public interface StorageOperate {
* @return * @return
*/ */
public String getUdfDir(String tenantCode); String getUdfDir(String tenantCode);
/** /**
* create the directory that the path of tenant wanted to create * create the directory that the path of tenant wanted to create
@ -59,7 +59,7 @@ public interface StorageOperate {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public boolean mkdir(String tenantCode,String path) throws IOException; boolean mkdir(String tenantCode, String path) throws IOException;
/** /**
* get the path of the resource file * get the path of the resource file
@ -67,7 +67,7 @@ public interface StorageOperate {
* @param fullName * @param fullName
* @return * @return
*/ */
public String getResourceFileName(String tenantCode, String fullName); String getResourceFileName(String tenantCode, String fullName);
/** /**
* get the path of the file * get the path of the file
@ -76,7 +76,7 @@ public interface StorageOperate {
* @param fileName * @param fileName
* @return * @return
*/ */
public String getFileName(ResourceType resourceType, String tenantCode, String fileName); String getFileName(ResourceType resourceType, String tenantCode, String fileName);
/** /**
* predicate if the resource of tenant exists * predicate if the resource of tenant exists
@ -85,7 +85,7 @@ public interface StorageOperate {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public boolean exists(String tenantCode,String fileName) throws IOException; boolean exists(String tenantCode, String fileName) throws IOException;
/** /**
* delete the resource of filePath * delete the resource of filePath
@ -96,7 +96,7 @@ public interface StorageOperate {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public boolean delete(String tenantCode,String filePath, boolean recursive) throws IOException; boolean delete(String tenantCode, String filePath, boolean recursive) throws IOException;
/** /**
* copy the file from srcPath to dstPath * copy the file from srcPath to dstPath
@ -107,7 +107,7 @@ public interface StorageOperate {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException; boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
/** /**
* get the root path of the tenant with resourceType * get the root path of the tenant with resourceType
@ -115,7 +115,7 @@ public interface StorageOperate {
* @param tenantCode * @param tenantCode
* @return * @return
*/ */
public String getDir(ResourceType resourceType, String tenantCode); String getDir(ResourceType resourceType, String tenantCode);
/** /**
* upload the local srcFile to dstPath * upload the local srcFile to dstPath
@ -127,7 +127,7 @@ public interface StorageOperate {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public boolean upload(String tenantCode,String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException; boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
/** /**
* download the srcPath to local * download the srcPath to local
@ -138,7 +138,7 @@ public interface StorageOperate {
* @param overwrite * @param overwrite
* @throws IOException * @throws IOException
*/ */
public void download(String tenantCode,String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException; void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
/** /**
* vim the context of filePath * vim the context of filePath
@ -149,7 +149,7 @@ public interface StorageOperate {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException; List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
/** /**
* delete the files and directory of the tenant * delete the files and directory of the tenant
@ -157,13 +157,13 @@ public interface StorageOperate {
* @param tenantCode * @param tenantCode
* @throws Exception * @throws Exception
*/ */
public void deleteTenant(String tenantCode) throws Exception; void deleteTenant(String tenantCode) throws Exception;
/** /**
* return the storageType * return the storageType
* *
* @return * @return
*/ */
public ResUploadType returnStorageType(); ResUploadType returnStorageType();
} }

36
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.thread;
/**
* All thread used in DolphinScheduler should extend with this class to avoid the server hang issue.
*/
public abstract class BaseDaemonThread extends Thread {
protected BaseDaemonThread(Runnable runnable) {
super(runnable);
this.setDaemon(true);
}
protected BaseDaemonThread(String threadName) {
super();
this.setName(threadName);
this.setDaemon(true);
}
}

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.consumer; package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -60,7 +61,7 @@ import org.springframework.stereotype.Component;
* TaskUpdateQueue consumer * TaskUpdateQueue consumer
*/ */
@Component @Component
public class TaskPriorityQueueConsumer extends Thread { public class TaskPriorityQueueConsumer extends BaseDaemonThread {
/** /**
* logger of TaskUpdateQueueConsumer * logger of TaskUpdateQueueConsumer
@ -108,6 +109,10 @@ public class TaskPriorityQueueConsumer extends Thread {
*/ */
private ThreadPoolExecutor consumerThreadPoolExecutor; private ThreadPoolExecutor consumerThreadPoolExecutor;
protected TaskPriorityQueueConsumer() {
super("TaskPriorityQueueConsumeThread");
}
@PostConstruct @PostConstruct
public void init() { public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber()); this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
@ -198,10 +203,8 @@ public class TaskPriorityQueueConsumer extends Thread {
} else { } else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId()); logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
} }
} catch (RuntimeException e) { } catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error: ", e); logger.error("Master dispatch task to worker error: ", e);
} catch (ExecuteException e) {
logger.error("Master dispatch task to worker error: {}", e);
} }
return result; return result;
} }

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue; package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand; import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
@ -70,7 +71,6 @@ public class StateEventResponseService {
@PostConstruct @PostConstruct
public void start() { public void start() {
this.responseWorker = new StateEventResponseWorker(); this.responseWorker = new StateEventResponseWorker();
this.responseWorker.setName("StateEventResponseWorker");
this.responseWorker.start(); this.responseWorker.start();
} }
@ -101,7 +101,11 @@ public class StateEventResponseService {
/** /**
* task worker thread * task worker thread
*/ */
class StateEventResponseWorker extends Thread { class StateEventResponseWorker extends BaseDaemonThread {
protected StateEventResponseWorker() {
super("StateEventResponseWorker");
}
@Override @Override
public void run() { public void run() {

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue; package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import java.util.ArrayList; import java.util.ArrayList;
@ -63,11 +64,9 @@ public class TaskEventService {
@PostConstruct @PostConstruct
public void start() { public void start() {
this.taskEventThread = new TaskEventThread(); this.taskEventThread = new TaskEventThread();
this.taskEventThread.setName("TaskEventThread");
this.taskEventThread.start(); this.taskEventThread.start();
this.taskEventHandlerThread = new TaskEventHandlerThread(); this.taskEventHandlerThread = new TaskEventHandlerThread();
this.taskEventHandlerThread.setName("TaskEventHandlerThread");
this.taskEventHandlerThread.start(); this.taskEventHandlerThread.start();
} }
@ -85,7 +84,7 @@ public class TaskEventService {
taskExecuteThreadPool.eventHandler(); taskExecuteThreadPool.eventHandler();
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("stop error:", e); logger.error("TaskEventService stop error:", e);
} }
} }
@ -101,7 +100,11 @@ public class TaskEventService {
/** /**
* task worker thread * task worker thread
*/ */
class TaskEventThread extends Thread { class TaskEventThread extends BaseDaemonThread {
protected TaskEventThread() {
super("TaskEventLoopThread");
}
@Override @Override
public void run() { public void run() {
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
@ -123,7 +126,11 @@ public class TaskEventService {
/** /**
* event handler thread * event handler thread
*/ */
class TaskEventHandlerThread extends Thread { class TaskEventHandlerThread extends BaseDaemonThread {
protected TaskEventHandlerThread() {
super("TaskEventHandlerThread");
}
@Override @Override
public void run() { public void run() {

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class EventExecuteService extends Thread { public class EventExecuteService extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class); private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
@ -42,9 +43,12 @@ public class EventExecuteService extends Thread {
@Autowired @Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool; private WorkflowExecuteThreadPool workflowExecuteThreadPool;
protected EventExecuteService() {
super("EventServiceStarted");
}
@Override @Override
public synchronized void start() { public synchronized void start() {
super.setName("EventServiceStarted");
super.start(); super.start();
} }

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner; package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class FailoverExecuteThread extends Thread { public class FailoverExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class); private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
@ -42,16 +43,19 @@ public class FailoverExecuteThread extends Thread {
@Autowired @Autowired
private FailoverService failoverService; private FailoverService failoverService;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
}
@Override @Override
public synchronized void start() { public synchronized void start() {
super.setName("FailoverExecuteThread");
super.start(); super.start();
} }
@Override @Override
public void run() { public void run() {
// when startup, wait 10s for ready // when startup, wait 10s for ready
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10); ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
logger.info("failover execute thread started"); logger.info("failover execute thread started");
while (Stopper.isRunning()) { while (Stopper.isRunning()) {

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState; import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -52,7 +53,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/ */
@Service @Service
public class MasterSchedulerService extends Thread { public class MasterSchedulerService extends BaseDaemonThread {
/** /**
* logger of MasterSchedulerService * logger of MasterSchedulerService
@ -102,6 +103,10 @@ public class MasterSchedulerService extends Thread {
@Autowired @Autowired
private StateWheelExecuteThread stateWheelExecuteThread; private StateWheelExecuteThread stateWheelExecuteThread;
protected MasterSchedulerService() {
super("MasterCommandLoopThread");
}
/** /**
* constructor of MasterSchedulerService * constructor of MasterSchedulerService
*/ */
@ -113,9 +118,8 @@ public class MasterSchedulerService extends Thread {
@Override @Override
public synchronized void start() { public synchronized void start() {
super.setName("MasterSchedulerService");
super.start();
this.stateWheelExecuteThread.start(); this.stateWheelExecuteThread.start();
super.start();
} }
public void close() { public void close() {

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag; import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -50,7 +51,7 @@ import org.springframework.stereotype.Component;
* 4. timeout process check * 4. timeout process check
*/ */
@Component @Component
public class StateWheelExecuteThread extends Thread { public class StateWheelExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class); private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
@ -83,6 +84,10 @@ public class StateWheelExecuteThread extends Thread {
@Autowired @Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager; private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
protected StateWheelExecuteThread() {
super("StateWheelExecuteThread");
}
@Override @Override
public void run() { public void run() {
Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS); Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);

Loading…
Cancel
Save