Browse Source

[improvement] Add two parameters in workergroup, and support the application of description display and other parameters (#11542)

* [Improvement] Improvement default worker group

* [Improvement] Improvement default worker group

* [Improvement] Improvement default worker group

* [Improvement] Improvement default worker group

* [Improvement] Improvement default worker group

* [improvement] The default worker grouping supports editing

* [improvement] The default worker grouping supports editing

* Update WorkerGroupServiceImpl.java

* Update WorkerGroupServiceImpl.java

* The default worker grouping supports editing

* update sql file

* update

* update

* update

* update
3.1.0-release
insist777 2 years ago committed by GitHub
parent
commit
abfef1a929
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  3. 27
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  4. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
  5. 6
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java
  6. 64
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
  7. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java
  8. 1
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml
  9. 2
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  10. 2
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  11. 2
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  12. 20
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql
  13. 16
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql
  14. 54
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql
  15. 30
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql
  16. 31
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java

@ -73,9 +73,11 @@ public class WorkerGroupController extends BaseController {
*/ */
@ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES") @ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataTypeClass = int.class, example = "10", defaultValue = "0"), @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"),
@ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataTypeClass = String.class), @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"),
@ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataTypeClass = String.class) @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String"),
@ApiImplicitParam(name = "description", value = "WORKER_DESC", required = false, dataType = "String"),
@ApiImplicitParam(name = "otherParamsJson", value = "WORKER_PARMS_JSON", required = false, dataType = "String"),
}) })
@PostMapping() @PostMapping()
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ -84,8 +86,11 @@ public class WorkerGroupController extends BaseController {
public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id, @RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name, @RequestParam(value = "name") String name,
@RequestParam(value = "addrList") String addrList) { @RequestParam(value = "addrList") String addrList,
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList); @RequestParam(value = "description",required = false, defaultValue = "") String description,
@RequestParam(value = "otherParamsJson",required = false, defaultValue = "") String otherParamsJson
) {
Map<String, Object> result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson);
return returnDataList(result); return returnDataList(result);
} }

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -34,9 +34,11 @@ public interface WorkerGroupService {
* @param id worker group id * @param id worker group id
* @param name worker group name * @param name worker group name
* @param addrList addr list * @param addrList addr list
* @param description description
* @param otherParamsJson otherParamsJson
* @return create or update result code * @return create or update result code
*/ */
Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList); Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson);
/** /**
* query worker group paging * query worker group paging

27
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -85,7 +85,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
*/ */
@Override @Override
@Transactional @Transactional
public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList) { public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson) {
Map<String, Object> result = new HashMap<>(); Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser,null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) { if (!canOperatorPermissions(loginUser,null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM); putMsg(result, Status.USER_NO_OPERATION_PERM);
@ -111,6 +111,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroup.setName(name); workerGroup.setName(name);
workerGroup.setAddrList(addrList); workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now); workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
if (checkWorkerGroupNameExists(workerGroup)) { if (checkWorkerGroupNameExists(workerGroup)) {
putMsg(result, Status.NAME_EXIST, workerGroup.getName()); putMsg(result, Status.NAME_EXIST, workerGroup.getName());
@ -121,14 +122,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr); putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result; return result;
} }
handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
putMsg(result, Status.SUCCESS);
return result;
}
protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser, String otherParamsJson) {
if (workerGroup.getId() != 0) { if (workerGroup.getId() != 0) {
workerGroupMapper.updateById(workerGroup); workerGroupMapper.updateById(workerGroup);
} else { } else {
workerGroupMapper.insert(workerGroup); workerGroupMapper.insert(workerGroup);
permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger); permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger);
} }
putMsg(result, Status.SUCCESS);
return result;
} }
/** /**
@ -275,6 +280,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
} else { } else {
workerGroups = workerGroupMapper.queryAllWorkerGroup(); workerGroups = workerGroupMapper.queryAllWorkerGroup();
} }
// worker groups from zookeeper // worker groups from zookeeper
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
Collection<String> workerGroupList = null; Collection<String> workerGroupList = null;
@ -292,7 +298,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
} }
return workerGroups; return workerGroups;
} }
Map<String, WorkerGroup> workerGroupsMap = null;
if (workerGroups.size() != 0) {
workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName, workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup));
}
for (String workerGroup : workerGroupList) { for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup; String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
Collection<String> childrenNodes = null; Collection<String> childrenNodes = null;
@ -305,19 +314,27 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
continue; continue;
} }
WorkerGroup wg = new WorkerGroup(); WorkerGroup wg = new WorkerGroup();
handleAddrList(wg, workerGroup, childrenNodes);
wg.setName(workerGroup); wg.setName(workerGroup);
if (isPaging) { if (isPaging) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next()); String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue); HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue);
wg.setCreateTime(new Date(heartBeat.getStartupTime())); wg.setCreateTime(new Date(heartBeat.getStartupTime()));
wg.setUpdateTime(new Date(heartBeat.getReportTime())); wg.setUpdateTime(new Date(heartBeat.getReportTime()));
wg.setSystemDefault(true); wg.setSystemDefault(true);
if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
workerGroups.remove(workerGroupsMap.get(workerGroup));
}
} }
workerGroups.add(wg); workerGroups.add(wg);
} }
return workerGroups; return workerGroups;
} }
protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection<String> childrenNodes) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
}
/** /**
* delete worker group by id * delete worker group by id

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java

@ -72,6 +72,8 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>(); MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group"); paramsMap.add("name","cxc_work_group");
paramsMap.add("addrList","192.168.0.1,192.168.0.2"); paramsMap.add("addrList","192.168.0.1,192.168.0.2");
paramsMap.add("description","");
paramsMap.add("otherParamsJson","");
MvcResult mvcResult = mockMvc.perform(post("/worker-groups") MvcResult mvcResult = mockMvc.perform(post("/worker-groups")
.header("sessionId", sessionId) .header("sessionId", sessionId)
.params(paramsMap)) .params(paramsMap))

6
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java

@ -16,11 +16,15 @@
*/ */
package org.apache.dolphinscheduler.api.utils; package org.apache.dolphinscheduler.api.utils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@ -45,4 +49,4 @@ public class ResultTest {
Result ret = Result.errorWithArgs(Status.INTERNAL_SERVER_ERROR_ARGS, "test internal server error"); Result ret = Result.errorWithArgs(Status.INTERNAL_SERVER_ERROR_ARGS, "test internal server error");
Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), ret.getCode().intValue()); Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), ret.getCode().intValue());
} }
} }

64
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java

@ -23,11 +23,13 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
/** /**
* worker group * worker group
*/ */
@TableName("t_ds_worker_group") @TableName("t_ds_worker_group")
@Data
public class WorkerGroup { public class WorkerGroup {
@TableId(value = "id", type = IdType.AUTO) @TableId(value = "id", type = IdType.AUTO)
@ -41,67 +43,11 @@ public class WorkerGroup {
private Date updateTime; private Date updateTime;
private String description;
@TableField(exist = false) @TableField(exist = false)
private boolean systemDefault; private boolean systemDefault;
public int getId() { private String otherParamsJson;
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddrList() {
return addrList;
}
public void setAddrList(String addrList) {
this.addrList = addrList;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public boolean getSystemDefault() {
return systemDefault;
}
public void setSystemDefault(boolean systemDefault) {
this.systemDefault = systemDefault;
}
@Override
public String toString() {
return "WorkerGroup{"
+ "id= " + id
+ ", name= " + name
+ ", addrList= " + addrList
+ ", createTime= " + createTime
+ ", updateTime= " + updateTime
+ ", systemDefault= " + systemDefault
+ "}";
}
} }

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java

@ -61,4 +61,5 @@ public interface WorkerGroupMapper extends BaseMapper<WorkerGroup> {
* @return worker group list * @return worker group list
*/ */
List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name); List<WorkerGroup> queryWorkerGroupByName(@Param("name") String name);
} }

1
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml

@ -28,4 +28,5 @@
from t_ds_worker_group from t_ds_worker_group
where name = #{name} where name = #{name}
</select> </select>
</mapper> </mapper>

2
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -962,6 +962,8 @@ CREATE TABLE t_ds_worker_group
addr_list text NULL DEFAULT NULL, addr_list text NULL DEFAULT NULL,
create_time datetime NULL DEFAULT NULL, create_time datetime NULL DEFAULT NULL,
update_time datetime NULL DEFAULT NULL, update_time datetime NULL DEFAULT NULL,
description text NULL DEFAULT NULL,
other_params_json text NULL DEFAULT NULL,
PRIMARY KEY (id), PRIMARY KEY (id),
UNIQUE KEY name_unique (name) UNIQUE KEY name_unique (name)
); );

2
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -955,6 +955,8 @@ CREATE TABLE `t_ds_worker_group` (
`addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]', `addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]',
`create_time` datetime NULL DEFAULT NULL COMMENT 'create time', `create_time` datetime NULL DEFAULT NULL COMMENT 'create time',
`update_time` datetime NULL DEFAULT NULL COMMENT 'update time', `update_time` datetime NULL DEFAULT NULL COMMENT 'update time',
`description` text NULL DEFAULT NULL COMMENT 'description',
`other_params_json` text NULL DEFAULT NULL COMMENT 'other params json',
PRIMARY KEY (`id`), PRIMARY KEY (`id`),
UNIQUE KEY `name_unique` (`name`) UNIQUE KEY `name_unique` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

2
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -862,6 +862,8 @@ CREATE TABLE t_ds_worker_group (
addr_list text DEFAULT NULL , addr_list text DEFAULT NULL ,
create_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL ,
description text DEFAULT NULL,
other_params_json text DEFAULT NULL,
PRIMARY KEY (id) , PRIMARY KEY (id) ,
CONSTRAINT name_unique UNIQUE (name) CONSTRAINT name_unique UNIQUE (name)
) ; ) ;

20
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql

@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE `t_ds_worker_group` ADD COLUMN `other_params_json` text DEFAULT NULL COMMENT 'other params json';

16
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

54
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
)
RETURNS character varying
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$
DECLARE
v_schema varchar;
BEGIN
---get schema name
v_schema =current_schema();
--- add column
EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json int DEFAULT NULL ';
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_update_metadata();
d//

30
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_insert_dq_initial_data();
d//

31
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -200,23 +200,16 @@ public class ServerNodeManager implements InitializingBean {
public void run() { public void run() {
try { try {
// sync worker node info // sync worker node info
Map<String, String> newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true); Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(newWorkerNodeInfo); syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database // sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup(); List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) { if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) { for (WorkerGroup wg : workerGroupList) {
String workerGroup = wg.getName(); String workerGroupName = wg.getName();
Set<String> nodes = new HashSet<>(); Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
String[] addrs = wg.getAddrList().split(Constants.COMMA); if (!workerAddress.isEmpty()) {
for (String addr : addrs) { syncWorkerGroupNodes(workerGroupName, workerAddress);
if (newWorkerNodeInfo.containsKey(addr)) {
nodes.add(addr);
}
}
if (!nodes.isEmpty()) {
syncWorkerGroupNodes(workerGroup, nodes);
} }
} }
} }
@ -227,6 +220,18 @@ public class ServerNodeManager implements InitializingBean {
} }
} }
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
if (newWorkerNodeInfo.containsKey(addr)) {
nodes.add(addr);
}
}
return nodes;
}
/** /**
* worker group node listener * worker group node listener
*/ */

Loading…
Cancel
Save