Browse Source

[Fix-4757][worker] fix worker failover host error (#4799)

* resolve code conflicts

* resolve code conflicts

* resolve code conflicts

* update WorkerGroupServiceImpl code style.

* update worker group service test host data.

* add ZookeeperNodeHandlerTest class

* change WorkerZkNode to Host class.

* add generate host string method.
pull/3/MERGE
zhuangchong 4 years ago committed by GitHub
parent
commit
4aac23481e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 46
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  2. 4
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  3. 48
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  4. 17
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  5. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  6. 9
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java

46
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.api.service.WorkerGroupService;
@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import java.util.ArrayList; import java.util.ArrayList;
@ -135,6 +137,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return WorkerGroup list * @return WorkerGroup list
*/ */
private List<WorkerGroup> getWorkerGroups(boolean isPaging) { private List<WorkerGroup> getWorkerGroups(boolean isPaging) {
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
List<WorkerGroup> workerGroups = new ArrayList<>(); List<WorkerGroup> workerGroups = new ArrayList<>();
List<String> workerGroupList; List<String> workerGroupList;
@ -142,38 +145,41 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
} catch (Exception e) { } catch (Exception e) {
if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) { if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) {
if (!isPaging) { if (isPaging) {
//ignore noNodeException return Default return workerGroups;
WorkerGroup wg = new WorkerGroup();
wg.setName(DEFAULT_WORKER_GROUP);
workerGroups.add(wg);
} }
//ignore noNodeException return Default
WorkerGroup wg = new WorkerGroup();
wg.setName(DEFAULT_WORKER_GROUP);
workerGroups.add(wg);
return workerGroups; return workerGroups;
} else { } else {
throw e; throw e;
} }
} }
for (String workerGroup : workerGroupList) { for (String workerGroup : workerGroupList) {
String workerGroupPath = String.format("%s/%s", workerPath, workerGroup); String workerGroupPath = workerPath + SLASH + workerGroup;
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
String timeStamp = ""; if (CollectionUtils.isEmpty(childrenNodes)) {
continue;
}
String timeStamp = childrenNodes.get(0);
for (int i = 0; i < childrenNodes.size(); i++) { for (int i = 0; i < childrenNodes.size(); i++) {
String ip = childrenNodes.get(i); childrenNodes.set(i, Host.of(childrenNodes.get(i)).getAddressAndWeight());
childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":")));
timeStamp = ip.substring(ip.lastIndexOf(":"));
} }
if (CollectionUtils.isNotEmpty(childrenNodes)) {
WorkerGroup wg = new WorkerGroup(); WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup); wg.setName(workerGroup);
if (isPaging) { if (isPaging) {
wg.setIpList(childrenNodes); wg.setIpList(childrenNodes);
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp); String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp);
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
}
workerGroups.add(wg);
} }
workerGroups.add(wg);
} }
return workerGroups; return workerGroups;
} }

4
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -71,8 +71,8 @@ public class WorkerGroupServiceTest {
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList); Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultIpList = new ArrayList<>(); List<String> defaultIpList = new ArrayList<>();
defaultIpList.add("192.168.220.188:1234"); defaultIpList.add("192.168.220.188:1234:100:1234567");
defaultIpList.add("192.168.220.189:1234"); defaultIpList.add("192.168.220.189:1234:100:1234567");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList); Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList);

48
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.io.Serializable; import java.io.Serializable;
import java.util.Objects; import java.util.Objects;
import java.util.StringJoiner;
/** /**
* server address * server address
@ -61,13 +64,13 @@ public class Host implements Serializable {
public Host(String ip, int port) { public Host(String ip, int port) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + COLON + port;
} }
public Host(String ip, int port, int weight, long startTime) { public Host(String ip, int port, int weight, long startTime) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + COLON + port;
this.weight = getWarmUpWeight(weight, startTime); this.weight = getWarmUpWeight(weight, startTime);
this.startTime = startTime; this.startTime = startTime;
} }
@ -75,7 +78,7 @@ public class Host implements Serializable {
public Host(String ip, int port, int weight, long startTime, String workGroup) { public Host(String ip, int port, int weight, long startTime, String workGroup) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + COLON + port;
this.weight = getWarmUpWeight(weight, startTime); this.weight = getWarmUpWeight(weight, startTime);
this.workGroup = workGroup; this.workGroup = workGroup;
this.startTime = startTime; this.startTime = startTime;
@ -95,7 +98,7 @@ public class Host implements Serializable {
public void setIp(String ip) { public void setIp(String ip) {
this.ip = ip; this.ip = ip;
this.address = ip + ":" + port; this.address = ip + COLON + port;
} }
public int getWeight() { public int getWeight() {
@ -120,7 +123,7 @@ public class Host implements Serializable {
public void setPort(int port) { public void setPort(int port) {
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + COLON + port;
} }
public String getWorkGroup() { public String getWorkGroup() {
@ -141,7 +144,7 @@ public class Host implements Serializable {
if (address == null) { if (address == null) {
throw new IllegalArgumentException("Host : address is null."); throw new IllegalArgumentException("Host : address is null.");
} }
String[] parts = address.split(":"); String[] parts = address.split(COLON);
if (parts.length < 2) { if (parts.length < 2) {
throw new IllegalArgumentException(String.format("Host : %s illegal.", address)); throw new IllegalArgumentException(String.format("Host : %s illegal.", address));
} }
@ -155,6 +158,21 @@ public class Host implements Serializable {
return host; return host;
} }
/**
* generate host string
* @param address address
* @param weight weight
* @param startTime startTime
* @return address:weight:startTime
*/
public static String generate(String address, int weight, long startTime) {
StringJoiner stringJoiner = new StringJoiner(COLON);
stringJoiner.add(address)
.add(String.valueOf(weight))
.add(String.valueOf(startTime));
return stringJoiner.toString();
}
/** /**
* whether old version * whether old version
* *
@ -162,7 +180,7 @@ public class Host implements Serializable {
* @return old version is true , otherwise is false * @return old version is true , otherwise is false
*/ */
public static Boolean isOldVersion(String address) { public static Boolean isOldVersion(String address) {
String[] parts = address.split(":"); String[] parts = address.split(COLON);
return parts.length != 2 && parts.length != 3; return parts.length != 2 && parts.length != 3;
} }
@ -186,8 +204,11 @@ public class Host implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "Host{" return "Host{"
+ "address='" + address + '\'' + "address='" + address + '\''
+ '}'; + ", weight=" + weight
+ ", startTime=" + startTime
+ ", workGroup='" + workGroup + '\''
+ '}';
} }
/** /**
@ -201,4 +222,13 @@ public class Host implements Serializable {
} }
return weight; return weight;
} }
/**
* get address and weight
*
* @return address:weight
*/
public String getAddressAndWeight() {
return address + COLON + weight;
}
} }

17
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -17,13 +17,13 @@
package org.apache.dolphinscheduler.server.worker.registry; package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH; import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@ -146,8 +146,8 @@ public class WorkerRegistry {
String address = getLocalAddress(); String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
String weight = getWorkerWeight(); int weight = workerConfig.getWeight();
String workerStartTime = COLON + System.currentTimeMillis(); long workerStartTime = System.currentTimeMillis();
for (String workGroup : this.workerGroups) { for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100); StringBuilder workerZkPathBuilder = new StringBuilder(100);
@ -157,9 +157,7 @@ public class WorkerRegistry {
} }
// trim and lower case is need // trim and lower case is need
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
workerZkPathBuilder.append(address); workerZkPathBuilder.append(Host.generate(address, weight, workerStartTime));
workerZkPathBuilder.append(weight);
workerZkPathBuilder.append(workerStartTime);
workerZkPaths.add(workerZkPathBuilder.toString()); workerZkPaths.add(workerZkPathBuilder.toString());
} }
return workerZkPaths; return workerZkPaths;
@ -172,11 +170,4 @@ public class WorkerRegistry {
return NetUtils.getAddr(workerConfig.getListenPort()); return NetUtils.getAddr(workerConfig.getListenPort());
} }
/**
* get Worker Weight
*/
private String getWorkerWeight() {
return COLON + workerConfig.getWeight();
}
} }

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -14,12 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.zk; package org.apache.dolphinscheduler.server.zk;
import org.apache.commons.lang.StringUtils; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
@ -27,24 +25,27 @@ import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.AbstractZKClient; import org.apache.dolphinscheduler.service.zk.AbstractZKClient;
import org.slf4j.Logger; import org.apache.curator.framework.CuratorFramework;
import org.slf4j.LoggerFactory; import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.springframework.beans.factory.annotation.Autowired; import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/** /**
* zookeeper master client * zookeeper master client
@ -134,9 +135,9 @@ public class ZKMasterClient extends AbstractZKClient {
mutex.acquire(); mutex.acquire();
String serverHost = null; String serverHost = null;
if(StringUtils.isNotEmpty(path)){ if (StringUtils.isNotEmpty(path)) {
serverHost = getHostByEventDataPath(path); serverHost = getHostByEventDataPath(path);
if(StringUtils.isEmpty(serverHost)){ if (StringUtils.isEmpty(serverHost)) {
logger.error("server down error: unknown path: {}", path); logger.error("server down error: unknown path: {}", path);
return; return;
} }
@ -305,8 +306,8 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception * @throws Exception exception
*/ */
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
workerHost = Host.of(workerHost).getAddress();
logger.info("start worker[{}] failover ...", workerHost); logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) { for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
if (needCheckWorkerAlive) { if (needCheckWorkerAlive) {

9
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java

@ -40,4 +40,13 @@ public class HostTest {
Host host = Host.of("192.158.2.2:22"); Host host = Host.of("192.158.2.2:22");
Assert.assertEquals(22, host.getPort()); Assert.assertEquals(22, host.getPort());
} }
@Test
public void testGenerate() {
String address = "192.158.2.2:22";
int weight = 100;
long startTime = System.currentTimeMillis();
String generateHost = Host.generate(address, weight, startTime);
Assert.assertEquals(address + ":" + weight + ":" + startTime, generateHost);
}
} }

Loading…
Cancel
Save