Browse Source

[Fix-5037][Server] Fix that both the master and the worker is hanging after restarting and stopping (#5038)

* [Fix-5037][Server] Fix that both the master and the worker is hanging after restarting and stopping

* [Improvement][*] Replace commons.lang.StringUtils with common.utils.StringUtils
pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
ae608e024a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java
  2. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java
  3. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java
  4. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java
  5. 21
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  6. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  7. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
  8. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  9. 8
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  10. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  11. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  12. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  13. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  14. 9
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java
  15. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/LoginHandlerInterceptor.java

@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.interceptor;
import javax.servlet.http.HttpServletRequest; package org.apache.dolphinscheduler.api.interceptor;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.security.Authenticator; import org.apache.dolphinscheduler.api.security.Authenticator;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.commons.httpclient.HttpStatus;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/IStoppable.java

@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common; package org.apache.dolphinscheduler.common;
/** /**
* server stop interface. * server stop interface.
*/ */
public interface IStoppable { public interface IStoppable {
/** /**
* Stop this service. * Stop this service.
* @param cause why stopping * @param cause why stopping

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/http/HttpParameters.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.task.http; package org.apache.dolphinscheduler.common.task.http;
import org.apache.dolphinscheduler.common.enums.HttpCheckCondition; import org.apache.dolphinscheduler.common.enums.HttpCheckCondition;
@ -21,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.HttpMethod;
import org.apache.dolphinscheduler.common.process.HttpProperty; import org.apache.dolphinscheduler.common.process.HttpProperty;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/procedure/ProcedureParameters.java

@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.common.task.procedure; package org.apache.dolphinscheduler.common.task.procedure;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
/** /**
* procedure parameter * procedure parameter
*/ */

21
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -85,9 +85,7 @@ public class MasterServer implements IStoppable {
private MasterSchedulerService masterSchedulerService; private MasterSchedulerService masterSchedulerService;
/** /**
* master server startup * master server startup, not use web service
* <p>
* master server not use web service
* *
* @param args arguments * @param args arguments
*/ */
@ -133,13 +131,10 @@ public class MasterServer implements IStoppable {
/** /**
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@Override
public void run() {
if (Stopper.isRunning()) { if (Stopper.isRunning()) {
close("shutdownHook"); close("shutdownHook");
} }
}
})); }));
} }
@ -152,7 +147,7 @@ public class MasterServer implements IStoppable {
public void close(String cause) { public void close(String cause) {
try { try {
//execute only once // execute only once
if (Stopper.isStopped()) { if (Stopper.isStopped()) {
return; return;
} }
@ -163,27 +158,24 @@ public class MasterServer implements IStoppable {
Stopper.stop(); Stopper.stop();
try { try {
//thread sleep 3 seconds for thread quietly stop // thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L); Thread.sleep(3000L);
} catch (Exception e) { } catch (Exception e) {
logger.warn("thread sleep exception ", e); logger.warn("thread sleep exception ", e);
} }
//close // close
this.masterSchedulerService.close(); this.masterSchedulerService.close();
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.zkMasterClient.close(); this.zkMasterClient.close();
//close quartz // close quartz
try { try {
QuartzExecutors.getInstance().shutdown(); QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped"); logger.info("Quartz service stopped");
} catch (Exception e) { } catch (Exception e) {
logger.warn("Quartz service stopped exception:{}", e.getMessage()); logger.warn("Quartz service stopped exception:{}", e.getMessage());
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("master server stop exception ", e); logger.error("master server stop exception ", e);
} finally {
System.exit(-1);
} }
} }
@ -192,4 +184,3 @@ public class MasterServer implements IStoppable {
close(cause); close(cause);
} }
} }

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -116,6 +116,8 @@ public class MasterRegistry {
String localNodePath = getMasterPath(); String localNodePath = getMasterPath();
zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath); zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
logger.info("master node : {} unRegistry to ZK.", address); logger.info("master node : {} unRegistry to ZK.", address);
heartBeatExecutor.shutdown();
logger.info("heartbeat executor shutdown");
} }
/** /**

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java

@ -14,15 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.runner;
import java.util.concurrent.ThreadPoolExecutor; package org.apache.dolphinscheduler.server.master.runner;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@ -36,6 +30,15 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager; import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -90,14 +93,14 @@ public class MasterSchedulerService extends Thread {
* constructor of MasterSchedulerService * constructor of MasterSchedulerService
*/ */
@PostConstruct @PostConstruct
public void init(){ public void init() {
this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig(); NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig); this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
} }
@Override @Override
public synchronized void start(){ public synchronized void start() {
super.setName("MasterSchedulerService"); super.setName("MasterSchedulerService");
super.start(); super.start();
} }
@ -110,7 +113,7 @@ public class MasterSchedulerService extends Thread {
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
if(!terminated){ if (!terminated) {
logger.warn("masterExecService shutdown without terminated, increase await time"); logger.warn("masterExecService shutdown without terminated, increase await time");
} }
nettyRemotingClient.close(); nettyRemotingClient.close();
@ -123,7 +126,7 @@ public class MasterSchedulerService extends Thread {
@Override @Override
public void run() { public void run() {
logger.info("master scheduler started"); logger.info("master scheduler started");
while (Stopper.isRunning()){ while (Stopper.isRunning()) {
try { try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory()); boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
if (!runCheckFlag) { if (!runCheckFlag) {

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -43,9 +43,8 @@ public class HeartBeatTask implements Runnable {
private Set<String> heartBeatPaths; private Set<String> heartBeatPaths;
private String serverType; private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter; private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* server stop or not // server stop or not
*/
protected IStoppable stoppable = null; protected IStoppable stoppable = null;
public HeartBeatTask(String startTime, public HeartBeatTask(String startTime,

8
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -227,18 +227,14 @@ public class ZookeeperRegistryCenter implements InitializingBean {
* @throws Exception errors * @throws Exception errors
*/ */
protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception { protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception {
//ip_sequenceno // ip_sequence_no
String[] zNodesPath = zNode.split("\\/"); String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1]; String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) { return !registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath);
return true;
}
return false;
} }
public RegisterOperator getRegisterOperator() { public RegisterOperator getRegisterOperator() {

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -95,9 +95,8 @@ public class WorkerServer implements IStoppable {
private WorkerManagerThread workerManagerThread; private WorkerManagerThread workerManagerThread;
/** /**
* worker server startup * worker server startup, not use web service
* *
* worker server not use web service
* @param args arguments * @param args arguments
*/ */
public static void main(String[] args) { public static void main(String[] args) {
@ -143,20 +142,17 @@ public class WorkerServer implements IStoppable {
/** /**
* register hooks, which are called before the process exits * register hooks, which are called before the process exits
*/ */
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
@Override
public void run() {
if (Stopper.isRunning()) { if (Stopper.isRunning()) {
close("shutdownHook"); close("shutdownHook");
} }
}
})); }));
} }
public void close(String cause) { public void close(String cause) {
try { try {
//execute only once // execute only once
if (Stopper.isStopped()) { if (Stopper.isStopped()) {
return; return;
} }
@ -167,21 +163,18 @@ public class WorkerServer implements IStoppable {
Stopper.stop(); Stopper.stop();
try { try {
//thread sleep 3 seconds for thread quitely stop // thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L); Thread.sleep(3000L);
} catch (Exception e) { } catch (Exception e) {
logger.warn("thread sleep exception", e); logger.warn("thread sleep exception", e);
} }
// close
this.nettyRemotingServer.close(); this.nettyRemotingServer.close();
this.workerRegistry.unRegistry(); this.workerRegistry.unRegistry();
this.alertClientService.close(); this.alertClientService.close();
} catch (Exception e) { } catch (Exception e) {
logger.error("worker server stop exception ", e); logger.error("worker server stop exception ", e);
} finally {
System.exit(-1);
} }
} }

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -47,7 +47,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
/** /**
* worker registry * worker registry
*/ */
@ -115,6 +114,7 @@ public class WorkerRegistry {
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} else if (newState == ConnectionState.SUSPENDED) { } else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ", address); logger.warn("worker : {} connection SUSPENDED ", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} }
}); });
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
@ -142,6 +142,7 @@ public class WorkerRegistry {
logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
} }
this.heartBeatExecutor.shutdownNow(); this.heartBeatExecutor.shutdownNow();
logger.info("heartbeat executor shutdown");
} }
/** /**

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.task; package org.apache.dolphinscheduler.server.worker.task;
import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER;
@ -25,13 +26,12 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils; import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.commons.lang.StringUtils;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -110,8 +110,8 @@ public class ZKMasterClient extends AbstractZKClient {
@Override @Override
public void close() { public void close() {
super.close();
masterRegistry.unRegistry(); masterRegistry.unRegistry();
super.close();
} }
/** /**

9
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/EnvFileTest.java

@ -14,18 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker; package org.apache.dolphinscheduler.server.worker;
import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class EnvFileTest { public class EnvFileTest {

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -58,7 +58,7 @@ public class ZookeeperOperator implements InitializingBean {
protected CuratorFramework zkClient; protected CuratorFramework zkClient;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() {
this.zkClient = buildClient(); this.zkClient = buildClient();
initStateListener(); initStateListener();
treeCacheStart(); treeCacheStart();

Loading…
Cancel
Save