Browse Source

[Fix-10413] Fix Master startup failure the server still hang (#10500)

* Fix Master startup failure the server still hang

(cherry picked from commit 117f78ec4b)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
90c87f0121
  1. 32
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java
  2. 36
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
  3. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java
  5. 17
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java
  6. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java
  7. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
  8. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  9. 7
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

32
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperate.java

@ -28,21 +28,21 @@ import java.util.List;
public interface StorageOperate {
public static final String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
String RESOURCE_UPLOAD_PATH = PropertyUtils.getString(Constants.RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
/**
* if the resource of tenant 's exist, the resource of folder will be created
* @param tenantCode
* @throws Exception
*/
public void createTenantDirIfNotExists(String tenantCode) throws Exception;
void createTenantDirIfNotExists(String tenantCode) throws Exception;
/**
* get the resource directory of tenant
* @param tenantCode
* @return
*/
public String getResDir(String tenantCode);
String getResDir(String tenantCode);
/**
* return the udf directory of tenant
@ -50,7 +50,7 @@ public interface StorageOperate {
* @return
*/
public String getUdfDir(String tenantCode);
String getUdfDir(String tenantCode);
/**
* create the directory that the path of tenant wanted to create
@ -59,7 +59,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
public boolean mkdir(String tenantCode,String path) throws IOException;
boolean mkdir(String tenantCode, String path) throws IOException;
/**
* get the path of the resource file
@ -67,7 +67,7 @@ public interface StorageOperate {
* @param fullName
* @return
*/
public String getResourceFileName(String tenantCode, String fullName);
String getResourceFileName(String tenantCode, String fullName);
/**
* get the path of the file
@ -76,7 +76,7 @@ public interface StorageOperate {
* @param fileName
* @return
*/
public String getFileName(ResourceType resourceType, String tenantCode, String fileName);
String getFileName(ResourceType resourceType, String tenantCode, String fileName);
/**
* predicate if the resource of tenant exists
@ -85,7 +85,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
public boolean exists(String tenantCode,String fileName) throws IOException;
boolean exists(String tenantCode, String fileName) throws IOException;
/**
* delete the resource of filePath
@ -96,7 +96,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
public boolean delete(String tenantCode,String filePath, boolean recursive) throws IOException;
boolean delete(String tenantCode, String filePath, boolean recursive) throws IOException;
/**
* copy the file from srcPath to dstPath
@ -107,7 +107,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
/**
* get the root path of the tenant with resourceType
@ -115,7 +115,7 @@ public interface StorageOperate {
* @param tenantCode
* @return
*/
public String getDir(ResourceType resourceType, String tenantCode);
String getDir(ResourceType resourceType, String tenantCode);
/**
* upload the local srcFile to dstPath
@ -127,7 +127,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
public boolean upload(String tenantCode,String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, boolean overwrite) throws IOException;
/**
* download the srcPath to local
@ -138,7 +138,7 @@ public interface StorageOperate {
* @param overwrite
* @throws IOException
*/
public void download(String tenantCode,String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite)throws IOException;
/**
* vim the context of filePath
@ -149,7 +149,7 @@ public interface StorageOperate {
* @return
* @throws IOException
*/
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException;
/**
* delete the files and directory of the tenant
@ -157,13 +157,13 @@ public interface StorageOperate {
* @param tenantCode
* @throws Exception
*/
public void deleteTenant(String tenantCode) throws Exception;
void deleteTenant(String tenantCode) throws Exception;
/**
* return the storageType
*
* @return
*/
public ResUploadType returnStorageType();
ResUploadType returnStorageType();
}

36
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.thread;
/**
* All thread used in DolphinScheduler should extend with this class to avoid the server hang issue.
*/
public abstract class BaseDaemonThread extends Thread {
protected BaseDaemonThread(Runnable runnable) {
super(runnable);
this.setDaemon(true);
}
protected BaseDaemonThread(String threadName) {
super();
this.setName(threadName);
this.setDaemon(true);
}
}

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@ -60,7 +61,7 @@ import org.springframework.stereotype.Component;
* TaskUpdateQueue consumer
*/
@Component
public class TaskPriorityQueueConsumer extends Thread {
public class TaskPriorityQueueConsumer extends BaseDaemonThread {
/**
* logger of TaskUpdateQueueConsumer
@ -108,6 +109,10 @@ public class TaskPriorityQueueConsumer extends Thread {
*/
private ThreadPoolExecutor consumerThreadPoolExecutor;
protected TaskPriorityQueueConsumer() {
super("TaskPriorityQueueConsumeThread");
}
@PostConstruct
public void init() {
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
@ -198,10 +203,8 @@ public class TaskPriorityQueueConsumer extends Thread {
} else {
logger.info("Master failed to dispatch task to worker, taskInstanceId: {}", taskPriority.getTaskId());
}
} catch (RuntimeException e) {
} catch (RuntimeException | ExecuteException e) {
logger.error("Master dispatch task to worker error: ", e);
} catch (ExecuteException e) {
logger.error("Master dispatch task to worker error: {}", e);
}
return result;
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/StateEventResponseService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.StateEventResponseCommand;
@ -70,7 +71,6 @@ public class StateEventResponseService {
@PostConstruct
public void start() {
this.responseWorker = new StateEventResponseWorker();
this.responseWorker.setName("StateEventResponseWorker");
this.responseWorker.start();
}
@ -101,7 +101,11 @@ public class StateEventResponseService {
/**
* task worker thread
*/
class StateEventResponseWorker extends Thread {
class StateEventResponseWorker extends BaseDaemonThread {
protected StateEventResponseWorker() {
super("StateEventResponseWorker");
}
@Override
public void run() {

17
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEventService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import java.util.ArrayList;
@ -63,11 +64,9 @@ public class TaskEventService {
@PostConstruct
public void start() {
this.taskEventThread = new TaskEventThread();
this.taskEventThread.setName("TaskEventThread");
this.taskEventThread.start();
this.taskEventHandlerThread = new TaskEventHandlerThread();
this.taskEventHandlerThread.setName("TaskEventHandlerThread");
this.taskEventHandlerThread.start();
}
@ -85,7 +84,7 @@ public class TaskEventService {
taskExecuteThreadPool.eventHandler();
}
} catch (Exception e) {
logger.error("stop error:", e);
logger.error("TaskEventService stop error:", e);
}
}
@ -101,7 +100,11 @@ public class TaskEventService {
/**
* task worker thread
*/
class TaskEventThread extends Thread {
class TaskEventThread extends BaseDaemonThread {
protected TaskEventThread() {
super("TaskEventLoopThread");
}
@Override
public void run() {
while (Stopper.isRunning()) {
@ -123,7 +126,11 @@ public class TaskEventService {
/**
* event handler thread
*/
class TaskEventHandlerThread extends Thread {
class TaskEventHandlerThread extends BaseDaemonThread {
protected TaskEventHandlerThread() {
super("TaskEventHandlerThread");
}
@Override
public void run() {

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/EventExecuteService.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class EventExecuteService extends Thread {
public class EventExecuteService extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(EventExecuteService.class);
@ -42,9 +43,12 @@ public class EventExecuteService extends Thread {
@Autowired
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
protected EventExecuteService() {
super("EventServiceStarted");
}
@Override
public synchronized void start() {
super.setName("EventServiceStarted");
super.start();
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -29,7 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class FailoverExecuteThread extends Thread {
public class FailoverExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(FailoverExecuteThread.class);
@ -42,16 +43,19 @@ public class FailoverExecuteThread extends Thread {
@Autowired
private FailoverService failoverService;
protected FailoverExecuteThread() {
super("FailoverExecuteThread");
}
@Override
public synchronized void start() {
super.setName("FailoverExecuteThread");
super.start();
}
@Override
public void run() {
// when startup, wait 10s for ready
ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 10);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 10);
logger.info("failover execute thread started");
while (Stopper.isRunning()) {

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
@ -52,7 +53,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed.
*/
@Service
public class MasterSchedulerService extends Thread {
public class MasterSchedulerService extends BaseDaemonThread {
/**
* logger of MasterSchedulerService
@ -102,6 +103,10 @@ public class MasterSchedulerService extends Thread {
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
protected MasterSchedulerService() {
super("MasterCommandLoopThread");
}
/**
* constructor of MasterSchedulerService
*/
@ -113,9 +118,8 @@ public class MasterSchedulerService extends Thread {
@Override
public synchronized void start() {
super.setName("MasterSchedulerService");
super.start();
this.stateWheelExecuteThread.start();
super.start();
}
public void close() {

7
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
@ -50,7 +51,7 @@ import org.springframework.stereotype.Component;
* 4. timeout process check
*/
@Component
public class StateWheelExecuteThread extends Thread {
public class StateWheelExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
@ -83,6 +84,10 @@ public class StateWheelExecuteThread extends Thread {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
protected StateWheelExecuteThread() {
super("StateWheelExecuteThread");
}
@Override
public void run() {
Duration checkInterval = Duration.ofMillis(masterConfig.getStateWheelInterval() * Constants.SLEEP_TIME_MILLIS);

Loading…
Cancel
Save