Browse Source

[DS-7004][MasterServer]fix master still work when it lose zk connection (#7045)

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
be3bd4c83d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 49
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
  2. 13
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

49
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -290,20 +290,20 @@ public class MasterRegistryClient {
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if (workerHost == null if (workerHost == null
|| !checkOwner || !checkOwner
|| processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { || processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
// only failover the task owned myself if worker down. // only failover the task owned myself if worker down.
if (processInstance == null) { if (processInstance == null) {
logger.error("failover error, the process {} of task {} do not exists.", logger.error("failover error, the process {} of task {} do not exists.",
taskInstance.getProcessInstanceId(), taskInstance.getId()); taskInstance.getProcessInstanceId(), taskInstance.getId());
continue; continue;
} }
taskInstance.setProcessInstance(processInstance); taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance) .buildProcessInstanceRelatedInfo(processInstance)
.create(); .create();
// only kill yarn job if exists , the local thread has exited // only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext); ProcessUtils.killYarnJob(taskExecutionContext);
@ -358,23 +358,40 @@ public class MasterRegistryClient {
localNodePath = getMasterPath(); localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getHeartbeatInterval(); int masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMaxCpuLoadAvg(), masterConfig.getMaxCpuLoadAvg(),
masterConfig.getReservedMemory(), masterConfig.getReservedMemory(),
Sets.newHashSet(getMasterPath()), Sets.newHashSet(getMasterPath()),
Constants.MASTER_TYPE, Constants.MASTER_TYPE,
registryClient); registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(newState -> { registryClient.addConnectionStateListener(this::handleConnectionState);
if (newState == ConnectionState.RECONNECTED || newState == ConnectionState.SUSPENDED) {
registryClient.persistEphemeral(localNodePath, "");
}
});
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
} }
public void handleConnectionState(ConnectionState state) {
switch (state) {
case CONNECTED:
logger.debug("registry connection state is {}", state);
break;
case SUSPENDED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is SUSPENDED, stop myself");
break;
case RECONNECTED:
logger.debug("registry connection state is {}, clean the node info", state);
registryClient.persistEphemeral(localNodePath, "");
break;
case DISCONNECTED:
logger.warn("registry connection state is {}, ready to stop myself", state);
registryClient.getStoppable().stop("registry connection state is DISCONNECTED, stop myself");
break;
default:
}
}
public void deregister() { public void deregister() {
try { try {
String address = getLocalAddress(); String address = getLocalAddress();

13
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.registry.api.ConnectionState;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -49,7 +50,7 @@ import org.springframework.test.util.ReflectionTestUtils;
* MasterRegistryClientTest * MasterRegistryClientTest
*/ */
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PrepareForTest({ RegistryClient.class }) @PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"}) @PowerMockIgnore({"javax.management.*"})
public class MasterRegistryClientTest { public class MasterRegistryClientTest {
@ -76,6 +77,9 @@ public class MasterRegistryClientTest {
given(registryClient.getLock(Mockito.anyString())).willReturn(true); given(registryClient.getLock(Mockito.anyString())).willReturn(true);
given(registryClient.releaseLock(Mockito.anyString())).willReturn(true); given(registryClient.releaseLock(Mockito.anyString())).willReturn(true);
given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080"); given(registryClient.getHostByEventDataPath(Mockito.anyString())).willReturn("127.0.0.1:8080");
given(registryClient.getStoppable()).willReturn(cause -> {
});
doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString()); doNothing().when(registryClient).handleDeadServer(Mockito.anySet(), Mockito.any(NodeType.class), Mockito.anyString());
ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient); ReflectionTestUtils.setField(masterRegistryClient, "registryClient", registryClient);
@ -105,6 +109,13 @@ public class MasterRegistryClientTest {
masterRegistryClient.registry(); masterRegistryClient.registry();
} }
@Test
public void handleConnectionStateTest() {
masterRegistryClient.handleConnectionState(ConnectionState.CONNECTED);
masterRegistryClient.handleConnectionState(ConnectionState.RECONNECTED);
masterRegistryClient.handleConnectionState(ConnectionState.SUSPENDED);
}
@Test @Test
public void removeNodePathTest() { public void removeNodePathTest() {
masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false); masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);

Loading…
Cancel
Save