diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 81d923e9a0..2f899ed1b7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SLASH; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkerGroupService; @@ -29,6 +30,7 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import java.util.ArrayList; @@ -135,6 +137,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro * @return WorkerGroup list */ private List getWorkerGroups(boolean isPaging) { + String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; List workerGroups = new ArrayList<>(); List workerGroupList; @@ -142,38 +145,41 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); } catch (Exception e) { if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) { - if (!isPaging) { - //ignore noNodeException return Default - WorkerGroup wg = new WorkerGroup(); - wg.setName(DEFAULT_WORKER_GROUP); - workerGroups.add(wg); + if (isPaging) { + return workerGroups; } + + //ignore noNodeException return Default + WorkerGroup wg = new WorkerGroup(); + wg.setName(DEFAULT_WORKER_GROUP); + workerGroups.add(wg); return workerGroups; + } else { throw e; } } for (String workerGroup : workerGroupList) { - String workerGroupPath = String.format("%s/%s", workerPath, workerGroup); + String workerGroupPath = workerPath + SLASH + workerGroup; List childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); - String timeStamp = ""; + if (CollectionUtils.isEmpty(childrenNodes)) { + continue; + } + String timeStamp = childrenNodes.get(0); for (int i = 0; i < childrenNodes.size(); i++) { - String ip = childrenNodes.get(i); - childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":"))); - timeStamp = ip.substring(ip.lastIndexOf(":")); + childrenNodes.set(i, Host.of(childrenNodes.get(i)).getAddressAndWeight()); } - if (CollectionUtils.isNotEmpty(childrenNodes)) { - WorkerGroup wg = new WorkerGroup(); - wg.setName(workerGroup); - if (isPaging) { - wg.setIpList(childrenNodes); - String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp); - wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); - wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); - } - workerGroups.add(wg); + + WorkerGroup wg = new WorkerGroup(); + wg.setName(workerGroup); + if (isPaging) { + wg.setIpList(childrenNodes); + String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp); + wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); + wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); } + workerGroups.add(wg); } return workerGroups; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 93463dc154..db9bb4fb00 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -71,8 +71,8 @@ public class WorkerGroupServiceTest { Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList); List defaultIpList = new ArrayList<>(); - defaultIpList.add("192.168.220.188:1234"); - defaultIpList.add("192.168.220.189:1234"); + defaultIpList.add("192.168.220.188:1234:100:1234567"); + defaultIpList.add("192.168.220.189:1234:100:1234567"); Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index c18d02f09a..7e42984e49 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.remote.utils; +import static org.apache.dolphinscheduler.common.Constants.COLON; + import java.io.Serializable; import java.util.Objects; +import java.util.StringJoiner; /** * server address @@ -61,13 +64,13 @@ public class Host implements Serializable { public Host(String ip, int port) { this.ip = ip; this.port = port; - this.address = ip + ":" + port; + this.address = ip + COLON + port; } public Host(String ip, int port, int weight, long startTime) { this.ip = ip; this.port = port; - this.address = ip + ":" + port; + this.address = ip + COLON + port; this.weight = getWarmUpWeight(weight, startTime); this.startTime = startTime; } @@ -75,7 +78,7 @@ public class Host implements Serializable { public Host(String ip, int port, int weight, long startTime, String workGroup) { this.ip = ip; this.port = port; - this.address = ip + ":" + port; + this.address = ip + COLON + port; this.weight = getWarmUpWeight(weight, startTime); this.workGroup = workGroup; this.startTime = startTime; @@ -95,7 +98,7 @@ public class Host implements Serializable { public void setIp(String ip) { this.ip = ip; - this.address = ip + ":" + port; + this.address = ip + COLON + port; } public int getWeight() { @@ -120,7 +123,7 @@ public class Host implements Serializable { public void setPort(int port) { this.port = port; - this.address = ip + ":" + port; + this.address = ip + COLON + port; } public String getWorkGroup() { @@ -141,7 +144,7 @@ public class Host implements Serializable { if (address == null) { throw new IllegalArgumentException("Host : address is null."); } - String[] parts = address.split(":"); + String[] parts = address.split(COLON); if (parts.length < 2) { throw new IllegalArgumentException(String.format("Host : %s illegal.", address)); } @@ -155,6 +158,21 @@ public class Host implements Serializable { return host; } + /** + * generate host string + * @param address address + * @param weight weight + * @param startTime startTime + * @return address:weight:startTime + */ + public static String generate(String address, int weight, long startTime) { + StringJoiner stringJoiner = new StringJoiner(COLON); + stringJoiner.add(address) + .add(String.valueOf(weight)) + .add(String.valueOf(startTime)); + return stringJoiner.toString(); + } + /** * whether old version * @@ -162,7 +180,7 @@ public class Host implements Serializable { * @return old version is true , otherwise is false */ public static Boolean isOldVersion(String address) { - String[] parts = address.split(":"); + String[] parts = address.split(COLON); return parts.length != 2 && parts.length != 3; } @@ -186,8 +204,11 @@ public class Host implements Serializable { @Override public String toString() { return "Host{" - + "address='" + address + '\'' - + '}'; + + "address='" + address + '\'' + + ", weight=" + weight + + ", startTime=" + startTime + + ", workGroup='" + workGroup + '\'' + + '}'; } /** @@ -201,4 +222,13 @@ public class Host implements Serializable { } return weight; } + + /** + * get address and weight + * + * @return address:weight + */ + public String getAddressAndWeight() { + return address + COLON + weight; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index e779d5deb3..3d4d73f51a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -17,13 +17,13 @@ package org.apache.dolphinscheduler.server.worker.registry; -import static org.apache.dolphinscheduler.common.Constants.COLON; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.SLASH; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -146,8 +146,8 @@ public class WorkerRegistry { String address = getLocalAddress(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); - String weight = getWorkerWeight(); - String workerStartTime = COLON + System.currentTimeMillis(); + int weight = workerConfig.getWeight(); + long workerStartTime = System.currentTimeMillis(); for (String workGroup : this.workerGroups) { StringBuilder workerZkPathBuilder = new StringBuilder(100); @@ -157,9 +157,7 @@ public class WorkerRegistry { } // trim and lower case is need workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); - workerZkPathBuilder.append(address); - workerZkPathBuilder.append(weight); - workerZkPathBuilder.append(workerStartTime); + workerZkPathBuilder.append(Host.generate(address, weight, workerStartTime)); workerZkPaths.add(workerZkPathBuilder.toString()); } return workerZkPaths; @@ -172,11 +170,4 @@ public class WorkerRegistry { return NetUtils.getAddr(workerConfig.getListenPort()); } - /** - * get Worker Weight - */ - private String getWorkerWeight() { - return COLON + workerConfig.getWeight(); - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index f6d4d0d4bb..37484dafa5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -14,12 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.zk; -import org.apache.commons.lang.StringUtils; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -27,24 +25,27 @@ import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import java.util.Date; import java.util.List; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; /** * zookeeper master client @@ -134,9 +135,9 @@ public class ZKMasterClient extends AbstractZKClient { mutex.acquire(); String serverHost = null; - if(StringUtils.isNotEmpty(path)){ + if (StringUtils.isNotEmpty(path)) { serverHost = getHostByEventDataPath(path); - if(StringUtils.isEmpty(serverHost)){ + if (StringUtils.isEmpty(serverHost)) { logger.error("server down error: unknown path: {}", path); return; } @@ -305,8 +306,8 @@ public class ZKMasterClient extends AbstractZKClient { * @throws Exception exception */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { + workerHost = Host.of(workerHost).getAddress(); logger.info("start worker[{}] failover ...", workerHost); - List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); for (TaskInstance taskInstance : needFailoverTaskInstanceList) { if (needCheckWorkerAlive) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java index 6273569485..80ff11e0be 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java @@ -40,4 +40,13 @@ public class HostTest { Host host = Host.of("192.158.2.2:22"); Assert.assertEquals(22, host.getPort()); } + + @Test + public void testGenerate() { + String address = "192.158.2.2:22"; + int weight = 100; + long startTime = System.currentTimeMillis(); + String generateHost = Host.generate(address, weight, startTime); + Assert.assertEquals(address + ":" + weight + ":" + startTime, generateHost); + } }