Browse Source
* [Improvement-3369][api] Introduce taskrecord, udffunc, workflowlineage and workergroup service interface for clear codepull/3/MERGE
Shiwen Cheng
4 years ago
committed by
GitHub
12 changed files with 771 additions and 501 deletions
@ -0,0 +1,86 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HISTORY_HIVE_LOG; |
||||
import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HIVE_LOG; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.service.BaseService; |
||||
import org.apache.dolphinscheduler.api.service.TaskRecordService; |
||||
import org.apache.dolphinscheduler.api.utils.PageInfo; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.dao.TaskRecordDao; |
||||
import org.apache.dolphinscheduler.dao.entity.TaskRecord; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import org.springframework.stereotype.Service; |
||||
|
||||
/** |
||||
* task record service impl |
||||
*/ |
||||
@Service |
||||
public class TaskRecordServiceImpl extends BaseService implements TaskRecordService { |
||||
|
||||
/** |
||||
* query task record list paging |
||||
* |
||||
* @param taskName task name |
||||
* @param state state |
||||
* @param sourceTable source table |
||||
* @param destTable destination table |
||||
* @param taskDate task date |
||||
* @param startDate start time |
||||
* @param endDate end time |
||||
* @param pageNo page numbere |
||||
* @param pageSize page size |
||||
* @param isHistory is history |
||||
* @return task record list |
||||
*/ |
||||
public Map<String,Object> queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, |
||||
String taskDate, String sourceTable, |
||||
String destTable, String endDate, |
||||
String state, Integer pageNo, Integer pageSize) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
PageInfo<TaskRecord> pageInfo = new PageInfo<>(pageNo, pageSize); |
||||
|
||||
Map<String, String> map = new HashMap<>(); |
||||
map.put("taskName", taskName); |
||||
map.put("taskDate", taskDate); |
||||
map.put("state", state); |
||||
map.put("sourceTable", sourceTable); |
||||
map.put("targetTable", destTable); |
||||
map.put("startTime", startDate); |
||||
map.put("endTime", endDate); |
||||
map.put("offset", pageInfo.getStart().toString()); |
||||
map.put("pageSize", pageInfo.getPageSize().toString()); |
||||
|
||||
String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; |
||||
int count = TaskRecordDao.countTaskRecord(map, table); |
||||
List<TaskRecord> recordList = TaskRecordDao.queryAllTaskRecord(map, table); |
||||
pageInfo.setTotalCount(count); |
||||
pageInfo.setLists(recordList); |
||||
result.put(Constants.DATA_LIST, pageInfo); |
||||
putMsg(result, Status.SUCCESS); |
||||
|
||||
return result; |
||||
} |
||||
} |
@ -0,0 +1,326 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.service.BaseService; |
||||
import org.apache.dolphinscheduler.api.service.UdfFuncService; |
||||
import org.apache.dolphinscheduler.api.utils.PageInfo; |
||||
import org.apache.dolphinscheduler.api.utils.Result; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.UdfType; |
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.Resource; |
||||
import org.apache.dolphinscheduler.dao.entity.UdfFunc; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; |
||||
import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import org.slf4j.Logger; |
||||
import org.slf4j.LoggerFactory; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
import org.springframework.transaction.annotation.Transactional; |
||||
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage; |
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; |
||||
|
||||
/** |
||||
* udf function service impl |
||||
*/ |
||||
@Service |
||||
public class UdfFuncServiceImpl extends BaseService implements UdfFuncService { |
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(UdfFuncServiceImpl.class); |
||||
|
||||
@Autowired |
||||
private ResourceMapper resourceMapper; |
||||
|
||||
@Autowired |
||||
private UdfFuncMapper udfFuncMapper; |
||||
|
||||
@Autowired |
||||
private UDFUserMapper udfUserMapper; |
||||
|
||||
/** |
||||
* create udf function |
||||
* |
||||
* @param loginUser login user |
||||
* @param type udf type |
||||
* @param funcName function name |
||||
* @param argTypes argument types |
||||
* @param database database |
||||
* @param desc description |
||||
* @param resourceId resource id |
||||
* @param className class name |
||||
* @return create result code |
||||
*/ |
||||
public Result<Object> createUdfFunction(User loginUser, |
||||
String funcName, |
||||
String className, |
||||
String argTypes, |
||||
String database, |
||||
String desc, |
||||
UdfType type, |
||||
int resourceId) { |
||||
Result<Object> result = new Result<>(); |
||||
|
||||
// if resource upload startup
|
||||
if (!PropertyUtils.getResUploadStartupState()) { |
||||
logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); |
||||
putMsg(result, Status.HDFS_NOT_STARTUP); |
||||
return result; |
||||
} |
||||
|
||||
// verify udf func name exist
|
||||
if (checkUdfFuncNameExists(funcName)) { |
||||
putMsg(result, Status.UDF_FUNCTION_EXISTS); |
||||
return result; |
||||
} |
||||
|
||||
Resource resource = resourceMapper.selectById(resourceId); |
||||
if (resource == null) { |
||||
logger.error("resourceId {} is not exist", resourceId); |
||||
putMsg(result, Status.RESOURCE_NOT_EXIST); |
||||
return result; |
||||
} |
||||
|
||||
//save data
|
||||
UdfFunc udf = new UdfFunc(); |
||||
Date now = new Date(); |
||||
udf.setUserId(loginUser.getId()); |
||||
udf.setFuncName(funcName); |
||||
udf.setClassName(className); |
||||
if (StringUtils.isNotEmpty(argTypes)) { |
||||
udf.setArgTypes(argTypes); |
||||
} |
||||
if (StringUtils.isNotEmpty(database)) { |
||||
udf.setDatabase(database); |
||||
} |
||||
udf.setDescription(desc); |
||||
udf.setResourceId(resourceId); |
||||
udf.setResourceName(resource.getFullName()); |
||||
udf.setType(type); |
||||
|
||||
udf.setCreateTime(now); |
||||
udf.setUpdateTime(now); |
||||
|
||||
udfFuncMapper.insert(udf); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* |
||||
* @param name name |
||||
* @return check result code |
||||
*/ |
||||
private boolean checkUdfFuncNameExists(String name) { |
||||
List<UdfFunc> resource = udfFuncMapper.queryUdfByIdStr(null, name); |
||||
return resource != null && !resource.isEmpty(); |
||||
} |
||||
|
||||
/** |
||||
* query udf function |
||||
* |
||||
* @param id udf function id |
||||
* @return udf function detail |
||||
*/ |
||||
public Map<String, Object> queryUdfFuncDetail(int id) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
UdfFunc udfFunc = udfFuncMapper.selectById(id); |
||||
if (udfFunc == null) { |
||||
putMsg(result, Status.RESOURCE_NOT_EXIST); |
||||
return result; |
||||
} |
||||
result.put(Constants.DATA_LIST, udfFunc); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* updateProcessInstance udf function |
||||
* |
||||
* @param udfFuncId udf function id |
||||
* @param type resource type |
||||
* @param funcName function name |
||||
* @param argTypes argument types |
||||
* @param database data base |
||||
* @param desc description |
||||
* @param resourceId resource id |
||||
* @param className class name |
||||
* @return update result code |
||||
*/ |
||||
public Map<String, Object> updateUdfFunc(int udfFuncId, |
||||
String funcName, |
||||
String className, |
||||
String argTypes, |
||||
String database, |
||||
String desc, |
||||
UdfType type, |
||||
int resourceId) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
// verify udfFunc is exist
|
||||
UdfFunc udf = udfFuncMapper.selectUdfById(udfFuncId); |
||||
|
||||
if (udf == null) { |
||||
result.put(Constants.STATUS, Status.UDF_FUNCTION_NOT_EXIST); |
||||
result.put(Constants.MSG, Status.UDF_FUNCTION_NOT_EXIST.getMsg()); |
||||
return result; |
||||
} |
||||
|
||||
// if resource upload startup
|
||||
if (!PropertyUtils.getResUploadStartupState()) { |
||||
logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); |
||||
putMsg(result, Status.HDFS_NOT_STARTUP); |
||||
return result; |
||||
} |
||||
|
||||
// verify udfFuncName is exist
|
||||
if (!funcName.equals(udf.getFuncName())) { |
||||
if (checkUdfFuncNameExists(funcName)) { |
||||
logger.error("UdfFunc {} has exist, can't create again.", funcName); |
||||
result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS); |
||||
result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg()); |
||||
return result; |
||||
} |
||||
} |
||||
|
||||
Resource resource = resourceMapper.selectById(resourceId); |
||||
if (resource == null) { |
||||
logger.error("resourceId {} is not exist", resourceId); |
||||
result.put(Constants.STATUS, Status.RESOURCE_NOT_EXIST); |
||||
result.put(Constants.MSG, Status.RESOURCE_NOT_EXIST.getMsg()); |
||||
return result; |
||||
} |
||||
Date now = new Date(); |
||||
udf.setFuncName(funcName); |
||||
udf.setClassName(className); |
||||
udf.setArgTypes(argTypes); |
||||
if (StringUtils.isNotEmpty(database)) { |
||||
udf.setDatabase(database); |
||||
} |
||||
udf.setDescription(desc); |
||||
udf.setResourceId(resourceId); |
||||
udf.setResourceName(resource.getFullName()); |
||||
udf.setType(type); |
||||
|
||||
udf.setUpdateTime(now); |
||||
|
||||
udfFuncMapper.updateById(udf); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* query udf function list paging |
||||
* |
||||
* @param loginUser login user |
||||
* @param pageNo page number |
||||
* @param pageSize page size |
||||
* @param searchVal search value |
||||
* @return udf function list page |
||||
*/ |
||||
public Map<String, Object> queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
PageInfo<UdfFunc> pageInfo = new PageInfo<>(pageNo, pageSize); |
||||
IPage<UdfFunc> udfFuncList = getUdfFuncsPage(loginUser, searchVal, pageSize, pageNo); |
||||
pageInfo.setTotalCount((int)udfFuncList.getTotal()); |
||||
pageInfo.setLists(udfFuncList.getRecords()); |
||||
result.put(Constants.DATA_LIST, pageInfo); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* get udf functions |
||||
* |
||||
* @param loginUser login user |
||||
* @param searchVal search value |
||||
* @param pageSize page size |
||||
* @param pageNo page number |
||||
* @return udf function list page |
||||
*/ |
||||
private IPage<UdfFunc> getUdfFuncsPage(User loginUser, String searchVal, Integer pageSize, int pageNo) { |
||||
int userId = loginUser.getId(); |
||||
if (isAdmin(loginUser)) { |
||||
userId = 0; |
||||
} |
||||
Page<UdfFunc> page = new Page<>(pageNo, pageSize); |
||||
return udfFuncMapper.queryUdfFuncPaging(page, userId, searchVal); |
||||
} |
||||
|
||||
/** |
||||
* query udf list |
||||
* |
||||
* @param loginUser login user |
||||
* @param type udf type |
||||
* @return udf func list |
||||
*/ |
||||
public Map<String, Object> queryUdfFuncList(User loginUser, Integer type) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
int userId = loginUser.getId(); |
||||
if (isAdmin(loginUser)) { |
||||
userId = 0; |
||||
} |
||||
List<UdfFunc> udfFuncList = udfFuncMapper.getUdfFuncByType(userId, type); |
||||
|
||||
result.put(Constants.DATA_LIST, udfFuncList); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* delete udf function |
||||
* |
||||
* @param id udf function id |
||||
* @return delete result code |
||||
*/ |
||||
@Transactional(rollbackFor = RuntimeException.class) |
||||
public Result<Object> delete(int id) { |
||||
Result<Object> result = new Result<>(); |
||||
udfFuncMapper.deleteById(id); |
||||
udfUserMapper.deleteByUdfFuncId(id); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* verify udf function by name |
||||
* |
||||
* @param name name |
||||
* @return true if the name can user, otherwise return false |
||||
*/ |
||||
public Result<Object> verifyUdfFuncByName(String name) { |
||||
Result<Object> result = new Result<>(); |
||||
if (checkUdfFuncNameExists(name)) { |
||||
putMsg(result, Status.UDF_FUNCTION_EXISTS); |
||||
} else { |
||||
putMsg(result, Status.SUCCESS); |
||||
} |
||||
return result; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,109 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.service.BaseService; |
||||
import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; |
||||
import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.HashMap; |
||||
import java.util.HashSet; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
/** |
||||
* work flow lineage service impl |
||||
*/ |
||||
@Service |
||||
public class WorkFlowLineageServiceImpl extends BaseService implements WorkFlowLineageService { |
||||
|
||||
@Autowired |
||||
private WorkFlowLineageMapper workFlowLineageMapper; |
||||
|
||||
public Map<String, Object> queryWorkFlowLineageByName(String workFlowName, int projectId) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByName(workFlowName, projectId); |
||||
result.put(Constants.DATA_LIST, workFlowLineageList); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
private void getWorkFlowRelationRecursion(Set<Integer> ids, List<WorkFlowRelation> workFlowRelations, Set<Integer> sourceIds) { |
||||
for (int id : ids) { |
||||
sourceIds.addAll(ids); |
||||
List<WorkFlowRelation> workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id); |
||||
if (workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) { |
||||
Set<Integer> idsTmp = new HashSet<>(); |
||||
for (WorkFlowRelation workFlowRelation:workFlowRelationsTmp) { |
||||
if (!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())) { |
||||
idsTmp.add(workFlowRelation.getTargetWorkFlowId()); |
||||
} |
||||
} |
||||
workFlowRelations.addAll(workFlowRelationsTmp); |
||||
getWorkFlowRelationRecursion(idsTmp, workFlowRelations,sourceIds); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public Map<String, Object> queryWorkFlowLineageByIds(Set<Integer> ids,int projectId) { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
List<WorkFlowLineage> workFlowLineageList = workFlowLineageMapper.queryByIds(ids, projectId); |
||||
Map<String, Object> workFlowLists = new HashMap<>(); |
||||
Set<Integer> idsV = new HashSet<>(); |
||||
if (ids == null || ids.isEmpty()) { |
||||
for (WorkFlowLineage workFlowLineage:workFlowLineageList) { |
||||
idsV.add(workFlowLineage.getWorkFlowId()); |
||||
} |
||||
} else { |
||||
idsV = ids; |
||||
} |
||||
List<WorkFlowRelation> workFlowRelations = new ArrayList<>(); |
||||
Set<Integer> sourceIds = new HashSet<>(); |
||||
getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds); |
||||
|
||||
Set<Integer> idSet = new HashSet<>(); |
||||
//If the incoming parameter is not empty, you need to add downstream workflow detail attributes
|
||||
if (ids != null && !ids.isEmpty()) { |
||||
for (WorkFlowRelation workFlowRelation : workFlowRelations) { |
||||
idSet.add(workFlowRelation.getTargetWorkFlowId()); |
||||
} |
||||
for (int id : ids) { |
||||
idSet.remove(id); |
||||
} |
||||
if (!idSet.isEmpty()) { |
||||
workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, projectId)); |
||||
} |
||||
} |
||||
|
||||
workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineageList); |
||||
workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); |
||||
result.put(Constants.DATA_LIST, workFlowLists); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,180 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.api.service.impl; |
||||
|
||||
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; |
||||
|
||||
import org.apache.dolphinscheduler.api.enums.Status; |
||||
import org.apache.dolphinscheduler.api.service.BaseService; |
||||
import org.apache.dolphinscheduler.api.service.WorkerGroupService; |
||||
import org.apache.dolphinscheduler.api.utils.PageInfo; |
||||
import org.apache.dolphinscheduler.common.Constants; |
||||
import org.apache.dolphinscheduler.common.utils.CollectionUtils; |
||||
import org.apache.dolphinscheduler.common.utils.DateUtils; |
||||
import org.apache.dolphinscheduler.common.utils.StringUtils; |
||||
import org.apache.dolphinscheduler.dao.entity.User; |
||||
import org.apache.dolphinscheduler.dao.entity.WorkerGroup; |
||||
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; |
||||
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; |
||||
|
||||
import java.util.ArrayList; |
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Set; |
||||
import java.util.stream.Collectors; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
|
||||
/** |
||||
* work group service impl |
||||
*/ |
||||
@Service |
||||
public class WorkerGroupServiceImpl extends BaseService implements WorkerGroupService { |
||||
|
||||
private static final String NO_NODE_EXCEPTION_REGEX = "KeeperException$NoNodeException"; |
||||
|
||||
@Autowired |
||||
protected ZookeeperCachedOperator zookeeperCachedOperator; |
||||
|
||||
@Autowired |
||||
ProcessInstanceMapper processInstanceMapper; |
||||
|
||||
/** |
||||
* query worker group paging |
||||
* |
||||
* @param loginUser login user |
||||
* @param pageNo page number |
||||
* @param searchVal search value |
||||
* @param pageSize page size |
||||
* @return worker group list page |
||||
*/ |
||||
public Map<String, Object> queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) { |
||||
// list from index
|
||||
int fromIndex = (pageNo - 1) * pageSize; |
||||
// list to index
|
||||
int toIndex = (pageNo - 1) * pageSize + pageSize; |
||||
|
||||
Map<String, Object> result = new HashMap<>(); |
||||
if (isNotAdmin(loginUser, result)) { |
||||
return result; |
||||
} |
||||
|
||||
List<WorkerGroup> workerGroups = getWorkerGroups(true); |
||||
|
||||
List<WorkerGroup> resultDataList = new ArrayList<>(); |
||||
|
||||
if (CollectionUtils.isNotEmpty(workerGroups)) { |
||||
List<WorkerGroup> searchValDataList = new ArrayList<>(); |
||||
|
||||
if (StringUtils.isNotEmpty(searchVal)) { |
||||
for (WorkerGroup workerGroup : workerGroups) { |
||||
if (workerGroup.getName().contains(searchVal)) { |
||||
searchValDataList.add(workerGroup); |
||||
} |
||||
} |
||||
} else { |
||||
searchValDataList = workerGroups; |
||||
} |
||||
|
||||
if (searchValDataList.size() < pageSize) { |
||||
toIndex = (pageNo - 1) * pageSize + searchValDataList.size(); |
||||
} |
||||
resultDataList = searchValDataList.subList(fromIndex, toIndex); |
||||
} |
||||
|
||||
PageInfo<WorkerGroup> pageInfo = new PageInfo<>(pageNo, pageSize); |
||||
pageInfo.setTotalCount(resultDataList.size()); |
||||
pageInfo.setLists(resultDataList); |
||||
|
||||
result.put(Constants.DATA_LIST, pageInfo); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* query all worker group |
||||
* |
||||
* @return all worker group list |
||||
*/ |
||||
public Map<String, Object> queryAllGroup() { |
||||
Map<String, Object> result = new HashMap<>(); |
||||
|
||||
List<WorkerGroup> workerGroups = getWorkerGroups(false); |
||||
|
||||
Set<String> availableWorkerGroupSet = workerGroups.stream() |
||||
.map(WorkerGroup::getName) |
||||
.collect(Collectors.toSet()); |
||||
result.put(Constants.DATA_LIST, availableWorkerGroupSet); |
||||
putMsg(result, Status.SUCCESS); |
||||
return result; |
||||
} |
||||
|
||||
/** |
||||
* get worker groups |
||||
* |
||||
* @param isPaging whether paging |
||||
* @return WorkerGroup list |
||||
*/ |
||||
private List<WorkerGroup> getWorkerGroups(boolean isPaging) { |
||||
String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; |
||||
List<WorkerGroup> workerGroups = new ArrayList<>(); |
||||
List<String> workerGroupList; |
||||
try { |
||||
workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); |
||||
} catch (Exception e) { |
||||
if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) { |
||||
if (!isPaging) { |
||||
//ignore noNodeException return Default
|
||||
WorkerGroup wg = new WorkerGroup(); |
||||
wg.setName(DEFAULT_WORKER_GROUP); |
||||
workerGroups.add(wg); |
||||
} |
||||
return workerGroups; |
||||
} else { |
||||
throw e; |
||||
} |
||||
} |
||||
|
||||
for (String workerGroup : workerGroupList) { |
||||
String workerGroupPath = String.format("%s/%s", workerPath, workerGroup); |
||||
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); |
||||
String timeStamp = ""; |
||||
for (int i = 0; i < childrenNodes.size(); i++) { |
||||
String ip = childrenNodes.get(i); |
||||
childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":"))); |
||||
timeStamp = ip.substring(ip.lastIndexOf(":")); |
||||
} |
||||
if (CollectionUtils.isNotEmpty(childrenNodes)) { |
||||
WorkerGroup wg = new WorkerGroup(); |
||||
wg.setName(workerGroup); |
||||
if (isPaging) { |
||||
wg.setIpList(childrenNodes); |
||||
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp); |
||||
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); |
||||
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); |
||||
} |
||||
workerGroups.add(wg); |
||||
} |
||||
} |
||||
return workerGroups; |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue