From abfef1a929b0b28f2e5bce4f52a571fe33a6fe86 Mon Sep 17 00:00:00 2001 From: insist777 <84278047+insist777@users.noreply.github.com> Date: Tue, 23 Aug 2022 10:54:17 +0800 Subject: [PATCH] [improvement] Add two parameters in workergroup, and support the application of description display and other parameters (#11542) * [Improvement] Improvement default worker group * [Improvement] Improvement default worker group * [Improvement] Improvement default worker group * [Improvement] Improvement default worker group * [Improvement] Improvement default worker group * [improvement] The default worker grouping supports editing * [improvement] The default worker grouping supports editing * Update WorkerGroupServiceImpl.java * Update WorkerGroupServiceImpl.java * The default worker grouping supports editing * update sql file * update * update * update * update --- .../api/controller/WorkerGroupController.java | 15 +++-- .../api/service/WorkerGroupService.java | 4 +- .../service/impl/WorkerGroupServiceImpl.java | 27 ++++++-- .../controller/WorkerGroupControllerTest.java | 2 + .../api/utils/ResultTest.java | 6 +- .../dao/entity/WorkerGroup.java | 64 ++----------------- .../dao/mapper/WorkerGroupMapper.java | 1 + .../dao/mapper/WorkerGroupMapper.xml | 1 + .../resources/sql/dolphinscheduler_h2.sql | 2 + .../resources/sql/dolphinscheduler_mysql.sql | 2 + .../sql/dolphinscheduler_postgresql.sql | 2 + .../mysql/dolphinscheduler_ddl.sql | 20 ++++++ .../mysql/dolphinscheduler_dml.sql | 16 +++++ .../postgresql/dolphinscheduler_ddl.sql | 54 ++++++++++++++++ .../postgresql/dolphinscheduler_dml.sql | 30 +++++++++ .../master/registry/ServerNodeManager.java | 31 +++++---- 16 files changed, 193 insertions(+), 84 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index ccb196eb6a..69e1a57d0f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -73,9 +73,11 @@ public class WorkerGroupController extends BaseController { */ @ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataTypeClass = int.class, example = "10", defaultValue = "0"), - @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataTypeClass = String.class), - @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataTypeClass = String.class) + @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"), + @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String"), + @ApiImplicitParam(name = "description", value = "WORKER_DESC", required = false, dataType = "String"), + @ApiImplicitParam(name = "otherParamsJson", value = "WORKER_PARMS_JSON", required = false, dataType = "String"), }) @PostMapping() @ResponseStatus(HttpStatus.OK) @@ -84,8 +86,11 @@ public class WorkerGroupController extends BaseController { public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam(value = "id", required = false, defaultValue = "0") int id, @RequestParam(value = "name") String name, - @RequestParam(value = "addrList") String addrList) { - Map result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList); + @RequestParam(value = "addrList") String addrList, + @RequestParam(value = "description",required = false, defaultValue = "") String description, + @RequestParam(value = "otherParamsJson",required = false, defaultValue = "") String otherParamsJson + ) { + Map result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 2de4134db5..4c474b75cd 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -34,9 +34,11 @@ public interface WorkerGroupService { * @param id worker group id * @param name worker group name * @param addrList addr list + * @param description description + * @param otherParamsJson otherParamsJson * @return create or update result code */ - Map saveWorkerGroup(User loginUser, int id, String name, String addrList); + Map saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson); /** * query worker group paging diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index d582549144..eee031942c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -85,7 +85,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro */ @Override @Transactional - public Map saveWorkerGroup(User loginUser, int id, String name, String addrList) { + public Map saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, String otherParamsJson) { Map result = new HashMap<>(); if (!canOperatorPermissions(loginUser,null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) { putMsg(result, Status.USER_NO_OPERATION_PERM); @@ -111,6 +111,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro workerGroup.setName(name); workerGroup.setAddrList(addrList); workerGroup.setUpdateTime(now); + workerGroup.setDescription(description); if (checkWorkerGroupNameExists(workerGroup)) { putMsg(result, Status.NAME_EXIST, workerGroup.getName()); @@ -121,14 +122,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr); return result; } + handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson); + putMsg(result, Status.SUCCESS); + return result; + } + + protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser, String otherParamsJson) { if (workerGroup.getId() != 0) { workerGroupMapper.updateById(workerGroup); } else { workerGroupMapper.insert(workerGroup); permissionPostHandle(AuthorizationType.WORKER_GROUP, loginUser.getId(), Collections.singletonList(workerGroup.getId()),logger); } - putMsg(result, Status.SUCCESS); - return result; } /** @@ -275,6 +280,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } else { workerGroups = workerGroupMapper.queryAllWorkerGroup(); } + // worker groups from zookeeper String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; Collection workerGroupList = null; @@ -292,7 +298,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } return workerGroups; } - + Map workerGroupsMap = null; + if (workerGroups.size() != 0) { + workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName, workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup)); + } for (String workerGroup : workerGroupList) { String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup; Collection childrenNodes = null; @@ -305,19 +314,27 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro continue; } WorkerGroup wg = new WorkerGroup(); + handleAddrList(wg, workerGroup, childrenNodes); wg.setName(workerGroup); if (isPaging) { - wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); String registeredValue = registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next()); HeartBeat heartBeat = HeartBeat.decodeHeartBeat(registeredValue); wg.setCreateTime(new Date(heartBeat.getStartupTime())); wg.setUpdateTime(new Date(heartBeat.getReportTime())); wg.setSystemDefault(true); + if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) { + wg.setDescription(workerGroupsMap.get(workerGroup).getDescription()); + workerGroups.remove(workerGroupsMap.get(workerGroup)); + } } workerGroups.add(wg); } return workerGroups; } + + protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection childrenNodes) { + wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); + } /** * delete worker group by id diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 0f639d3fa7..72bad1e686 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -72,6 +72,8 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("name","cxc_work_group"); paramsMap.add("addrList","192.168.0.1,192.168.0.2"); + paramsMap.add("description",""); + paramsMap.add("otherParamsJson",""); MvcResult mvcResult = mockMvc.perform(post("/worker-groups") .header("sessionId", sessionId) .params(paramsMap)) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java index 01fb75cdf7..6301a44468 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/utils/ResultTest.java @@ -16,11 +16,15 @@ */ package org.apache.dolphinscheduler.api.utils; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.junit.Assert; import org.junit.Test; import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.*; @@ -45,4 +49,4 @@ public class ResultTest { Result ret = Result.errorWithArgs(Status.INTERNAL_SERVER_ERROR_ARGS, "test internal server error"); Assert.assertEquals(Status.INTERNAL_SERVER_ERROR_ARGS.getCode(), ret.getCode().intValue()); } -} \ No newline at end of file +} diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java index cc19dd5649..2fdd711549 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java @@ -23,11 +23,13 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; /** * worker group */ @TableName("t_ds_worker_group") +@Data public class WorkerGroup { @TableId(value = "id", type = IdType.AUTO) @@ -41,67 +43,11 @@ public class WorkerGroup { private Date updateTime; + private String description; + @TableField(exist = false) private boolean systemDefault; - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getAddrList() { - return addrList; - } - - public void setAddrList(String addrList) { - this.addrList = addrList; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - public Date getUpdateTime() { - return updateTime; - } - - public void setUpdateTime(Date updateTime) { - this.updateTime = updateTime; - } - - public boolean getSystemDefault() { - return systemDefault; - } - - public void setSystemDefault(boolean systemDefault) { - this.systemDefault = systemDefault; - } - - @Override - public String toString() { - return "WorkerGroup{" - + "id= " + id - + ", name= " + name - + ", addrList= " + addrList - + ", createTime= " + createTime - + ", updateTime= " + updateTime - + ", systemDefault= " + systemDefault - + "}"; - } + private String otherParamsJson; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java index 36a57537b5..fcff987d02 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java @@ -61,4 +61,5 @@ public interface WorkerGroupMapper extends BaseMapper { * @return worker group list */ List queryWorkerGroupByName(@Param("name") String name); + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml index 2665f123cf..79305d1467 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml @@ -28,4 +28,5 @@ from t_ds_worker_group where name = #{name} + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 5211976ca0..be90dda167 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -962,6 +962,8 @@ CREATE TABLE t_ds_worker_group addr_list text NULL DEFAULT NULL, create_time datetime NULL DEFAULT NULL, update_time datetime NULL DEFAULT NULL, + description text NULL DEFAULT NULL, + other_params_json text NULL DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY name_unique (name) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index ebddafc793..c877fb6fef 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -955,6 +955,8 @@ CREATE TABLE `t_ds_worker_group` ( `addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]', `create_time` datetime NULL DEFAULT NULL COMMENT 'create time', `update_time` datetime NULL DEFAULT NULL COMMENT 'update time', + `description` text NULL DEFAULT NULL COMMENT 'description', + `other_params_json` text NULL DEFAULT NULL COMMENT 'other params json', PRIMARY KEY (`id`), UNIQUE KEY `name_unique` (`name`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index e4c0923bbb..f7cc1bf121 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -862,6 +862,8 @@ CREATE TABLE t_ds_worker_group ( addr_list text DEFAULT NULL , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , + description text DEFAULT NULL, + other_params_json text DEFAULT NULL, PRIMARY KEY (id) , CONSTRAINT name_unique UNIQUE (name) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..3d0ad0197f --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + + +ALTER TABLE `t_ds_worker_group` ADD COLUMN `other_params_json` text DEFAULT NULL COMMENT 'other params json'; + diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..4a14f326b9 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 0000000000..372b99be4c --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +delimiter d// + + + +CREATE OR REPLACE FUNCTION public.dolphin_update_metadata( + ) + RETURNS character varying + LANGUAGE 'plpgsql' + COST 100 + VOLATILE PARALLEL UNSAFE +AS $BODY$ +DECLARE +v_schema varchar; +BEGIN + ---get schema name + v_schema =current_schema(); + + + +--- add column +EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_worker_group ADD COLUMN IF NOT EXISTS other_params_json int DEFAULT NULL '; + + + +return 'Success!'; +exception when others then + ---Raise EXCEPTION '(%)',SQLERRM; + + return SQLERRM; +END; +$BODY$; + +select dolphin_update_metadata(); + + +d// + diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 0000000000..4d7327f767 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.1.1_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +delimiter d// + +return 'Success!'; +exception when others then + ---Raise EXCEPTION '(%)',SQLERRM; + return SQLERRM; +END; +$BODY$; + +select dolphin_insert_dq_initial_data(); + +d// + diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index b92cb61473..58df904fc5 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -200,23 +200,16 @@ public class ServerNodeManager implements InitializingBean { public void run() { try { // sync worker node info - Map newWorkerNodeInfo = registryClient.getServerMaps(NodeType.WORKER, true); - syncAllWorkerNodeInfo(newWorkerNodeInfo); - + Map registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true); + syncAllWorkerNodeInfo(registryWorkerNodeMap); // sync worker group nodes from database List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); if (CollectionUtils.isNotEmpty(workerGroupList)) { for (WorkerGroup wg : workerGroupList) { - String workerGroup = wg.getName(); - Set nodes = new HashSet<>(); - String[] addrs = wg.getAddrList().split(Constants.COMMA); - for (String addr : addrs) { - if (newWorkerNodeInfo.containsKey(addr)) { - nodes.add(addr); - } - } - if (!nodes.isEmpty()) { - syncWorkerGroupNodes(workerGroup, nodes); + String workerGroupName = wg.getName(); + Set workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg); + if (!workerAddress.isEmpty()) { + syncWorkerGroupNodes(workerGroupName, workerAddress); } } } @@ -227,6 +220,18 @@ public class ServerNodeManager implements InitializingBean { } } + + protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo, WorkerGroup wg) { + Set nodes = new HashSet<>(); + String[] addrs = wg.getAddrList().split(Constants.COMMA); + for (String addr : addrs) { + if (newWorkerNodeInfo.containsKey(addr)) { + nodes.add(addr); + } + } + return nodes; + } + /** * worker group node listener */