From 070424fc7825d0452827482a8d4815cb543d0b65 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Thu, 18 Feb 2021 19:18:05 +0800 Subject: [PATCH] [Improvement-3369][api] Introduce taskrecord, udffunc, workflowlineage and workergroup service interface for clear code (#4768) * [Improvement-3369][api] Introduce taskrecord, udffunc, workflowlineage and workergroup service interface for clear code --- .../api/service/TaskRecordService.java | 48 +-- .../api/service/UdfFuncService.java | 259 ++------------ .../api/service/WorkFlowLineageService.java | 83 +---- .../api/service/WorkerGroupService.java | 138 +------- .../service/impl/TaskRecordServiceImpl.java | 86 +++++ .../api/service/impl/UdfFuncServiceImpl.java | 326 ++++++++++++++++++ .../impl/WorkFlowLineageServiceImpl.java | 109 ++++++ .../service/impl/WorkerGroupServiceImpl.java | 180 ++++++++++ .../api/service/UdfFuncServiceTest.java | 17 +- .../service/WorkFlowLineageServiceTest.java | 17 +- .../api/service/WorkerGroupServiceTest.java | 3 +- .../dolphinscheduler/common/Constants.java | 6 + 12 files changed, 771 insertions(+), 501 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java index 54eba5c2d6..8c8ad0abff 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskRecordService.java @@ -14,26 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.dao.TaskRecordDao; -import org.apache.dolphinscheduler.dao.entity.TaskRecord; -import org.springframework.stereotype.Service; +package org.apache.dolphinscheduler.api.service; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.dolphinscheduler.common.Constants.*; - /** * task record service */ -@Service -public class TaskRecordService extends BaseService{ +public interface TaskRecordService { /** * query task record list paging @@ -50,33 +39,8 @@ public class TaskRecordService extends BaseService{ * @param isHistory is history * @return task record list */ - public Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, - String taskDate, String sourceTable, - String destTable, String endDate, - String state, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(10); - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - - Map map = new HashMap<>(10); - map.put("taskName", taskName); - map.put("taskDate", taskDate); - map.put("state", state); - map.put("sourceTable", sourceTable); - map.put("targetTable", destTable); - map.put("startTime", startDate); - map.put("endTime", endDate); - map.put("offset", pageInfo.getStart().toString()); - map.put("pageSize", pageInfo.getPageSize().toString()); - - String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; - int count = TaskRecordDao.countTaskRecord(map, table); - List recordList = TaskRecordDao.queryAllTaskRecord(map, table); - pageInfo.setTotalCount(count); - pageInfo.setLists(recordList); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - - return result; - - } + Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, + String taskDate, String sourceTable, + String destTable, String endDate, + String state, Integer pageNo, Integer pageSize); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java index cd962fdc70..0364691d81 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UdfFuncService.java @@ -14,51 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.UdfType; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.Resource; -import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; -import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; -import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.HashMap; -import java.util.List; import java.util.Map; /** * udf function service */ -@Service -public class UdfFuncService extends BaseService{ - - private static final Logger logger = LoggerFactory.getLogger(UdfFuncService.class); - - @Autowired - private ResourceMapper resourceMapper; - - @Autowired - private UdfFuncMapper udfFuncMapper; - - @Autowired - private UDFUserMapper udfUserMapper; - +public interface UdfFuncService { /** * create udf function @@ -73,72 +41,14 @@ public class UdfFuncService extends BaseService{ * @param className class name * @return create result code */ - public Result createUdfFunction(User loginUser, - String funcName, - String className, - String argTypes, - String database, - String desc, - UdfType type, - int resourceId) { - Result result = new Result(); - - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - // verify udf func name exist - if (checkUdfFuncNameExists(funcName)) { - logger.error("udf func {} has exist, can't recreate", funcName); - putMsg(result, Status.UDF_FUNCTION_EXISTS); - return result; - } - - Resource resource = resourceMapper.selectById(resourceId); - if (resource == null) { - logger.error("resourceId {} is not exist", resourceId); - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - - //save data - UdfFunc udf = new UdfFunc(); - Date now = new Date(); - udf.setUserId(loginUser.getId()); - udf.setFuncName(funcName); - udf.setClassName(className); - if (StringUtils.isNotEmpty(argTypes)) { - udf.setArgTypes(argTypes); - } - if (StringUtils.isNotEmpty(database)) { - udf.setDatabase(database); - } - udf.setDescription(desc); - udf.setResourceId(resourceId); - udf.setResourceName(resource.getFullName()); - udf.setType(type); - - udf.setCreateTime(now); - udf.setUpdateTime(now); - - udfFuncMapper.insert(udf); - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * - * @param name name - * @return check result code - */ - private boolean checkUdfFuncNameExists(String name){ - List resource = udfFuncMapper.queryUdfByIdStr(null, name); - return resource != null && resource.size() > 0; - } - + Result createUdfFunction(User loginUser, + String funcName, + String className, + String argTypes, + String database, + String desc, + UdfType type, + int resourceId); /** * query udf function @@ -146,18 +56,7 @@ public class UdfFuncService extends BaseService{ * @param id udf function id * @return udf function detail */ - public Map queryUdfFuncDetail(int id) { - - Map result = new HashMap<>(5); - UdfFunc udfFunc = udfFuncMapper.selectById(id); - if (udfFunc == null) { - putMsg(result, Status.RESOURCE_NOT_EXIST); - return result; - } - result.put(Constants.DATA_LIST, udfFunc); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryUdfFuncDetail(int id); /** * updateProcessInstance udf function @@ -172,67 +71,14 @@ public class UdfFuncService extends BaseService{ * @param className class name * @return update result code */ - public Map updateUdfFunc(int udfFuncId, - String funcName, - String className, - String argTypes, - String database, - String desc, - UdfType type, - int resourceId) { - Map result = new HashMap<>(); - // verify udfFunc is exist - UdfFunc udf = udfFuncMapper.selectUdfById(udfFuncId); - - if (udf == null) { - result.put(Constants.STATUS, Status.UDF_FUNCTION_NOT_EXIST); - result.put(Constants.MSG, Status.UDF_FUNCTION_NOT_EXIST.getMsg()); - return result; - } - - // if resource upload startup - if (!PropertyUtils.getResUploadStartupState()){ - logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - - // verify udfFuncName is exist - if (!funcName.equals(udf.getFuncName())) { - if (checkUdfFuncNameExists(funcName)) { - logger.error("UdfFunc {} has exist, can't create again.", funcName); - result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS); - result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg()); - return result; - } - } - - Resource resource = resourceMapper.selectById(resourceId); - if (resource == null) { - logger.error("resourceId {} is not exist", resourceId); - result.put(Constants.STATUS, Status.RESOURCE_NOT_EXIST); - result.put(Constants.MSG, Status.RESOURCE_NOT_EXIST.getMsg()); - return result; - } - Date now = new Date(); - udf.setFuncName(funcName); - udf.setClassName(className); - udf.setArgTypes(argTypes); - if (StringUtils.isNotEmpty(database)) { - udf.setDatabase(database); - } - udf.setDescription(desc); - udf.setResourceId(resourceId); - udf.setResourceName(resource.getFullName()); - udf.setType(type); - - udf.setUpdateTime(now); - - udfFuncMapper.updateById(udf); - putMsg(result, Status.SUCCESS); - return result; - } - + Map updateUdfFunc(int udfFuncId, + String funcName, + String className, + String argTypes, + String database, + String desc, + UdfType type, + int resourceId); /** * query udf function list paging @@ -243,37 +89,7 @@ public class UdfFuncService extends BaseService{ * @param searchVal search value * @return udf function list page */ - public Map queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { - Map result = new HashMap<>(5); - - - PageInfo pageInfo = new PageInfo(pageNo, pageSize); - IPage udfFuncList = getUdfFuncsPage(loginUser, searchVal, pageSize, pageNo); - pageInfo.setTotalCount((int)udfFuncList.getTotal()); - pageInfo.setLists(udfFuncList.getRecords()); - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * get udf functions - * - * @param loginUser login user - * @param searchVal search value - * @param pageSize page size - * @param pageNo page number - * @return udf function list page - */ - private IPage getUdfFuncsPage(User loginUser, String searchVal, Integer pageSize, int pageNo) { - - int userId = loginUser.getId(); - if (isAdmin(loginUser)) { - userId = 0; - } - Page page = new Page(pageNo, pageSize); - return udfFuncMapper.queryUdfFuncPaging(page, userId, searchVal); - } + Map queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize); /** * query udf list @@ -282,18 +98,7 @@ public class UdfFuncService extends BaseService{ * @param type udf type * @return udf func list */ - public Map queryUdfFuncList(User loginUser, Integer type) { - Map result = new HashMap<>(5); - int userId = loginUser.getId(); - if (isAdmin(loginUser)) { - userId = 0; - } - List udfFuncList = udfFuncMapper.getUdfFuncByType(userId, type); - - result.put(Constants.DATA_LIST, udfFuncList); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryUdfFuncList(User loginUser, Integer type); /** * delete udf function @@ -301,15 +106,7 @@ public class UdfFuncService extends BaseService{ * @param id udf function id * @return delete result code */ - @Transactional(rollbackFor = RuntimeException.class) - public Result delete(int id) { - Result result = new Result(); - - udfFuncMapper.deleteById(id); - udfUserMapper.deleteByUdfFuncId(id); - putMsg(result, Status.SUCCESS); - return result; - } + Result delete(int id); /** * verify udf function by name @@ -317,16 +114,6 @@ public class UdfFuncService extends BaseService{ * @param name name * @return true if the name can user, otherwise return false */ - public Result verifyUdfFuncByName(String name) { - Result result = new Result(); - if (checkUdfFuncNameExists(name)) { - logger.error("UDF function name:{} has exist, can't create again.", name); - putMsg(result, Status.UDF_FUNCTION_EXISTS); - } else { - putMsg(result, Status.SUCCESS); - } - - return result; - } + Result verifyUdfFuncByName(String name); } \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java index 29fd99c0f7..360e813c68 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageService.java @@ -14,84 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.api.service; - -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; -import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; -import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import java.util.*; -@Service -public class WorkFlowLineageService extends BaseService { - - @Autowired - private WorkFlowLineageMapper workFlowLineageMapper; +package org.apache.dolphinscheduler.api.service; - public Map queryWorkFlowLineageByName(String workFlowName, int projectId) { - Map result = new HashMap<>(5); - List workFlowLineageList = workFlowLineageMapper.queryByName(workFlowName, projectId); - result.put(Constants.DATA_LIST, workFlowLineageList); - putMsg(result, Status.SUCCESS); - return result; - } +import java.util.Map; +import java.util.Set; - private List getWorkFlowRelationRecursion(Set ids, List workFlowRelations,Set sourceIds) { - for(int id : ids) { - sourceIds.addAll(ids); - List workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id); - if(workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) { - Set idsTmp = new HashSet<>(); - for(WorkFlowRelation workFlowRelation:workFlowRelationsTmp) { - if(!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())){ - idsTmp.add(workFlowRelation.getTargetWorkFlowId()); - } - } - workFlowRelations.addAll(workFlowRelationsTmp); - getWorkFlowRelationRecursion(idsTmp, workFlowRelations,sourceIds); - } - } - return workFlowRelations; - } +/** + * work flow lineage service + */ +public interface WorkFlowLineageService { - public Map queryWorkFlowLineageByIds(Set ids,int projectId) { - Map result = new HashMap<>(5); - List workFlowLineageList = workFlowLineageMapper.queryByIds(ids, projectId); - Map workFlowLists = new HashMap<>(5); - Set idsV = new HashSet<>(); - if(ids == null || ids.isEmpty()){ - for(WorkFlowLineage workFlowLineage:workFlowLineageList) { - idsV.add(workFlowLineage.getWorkFlowId()); - } - } else { - idsV = ids; - } - List workFlowRelations = new ArrayList<>(); - Set sourceIds = new HashSet<>(); - getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds); + Map queryWorkFlowLineageByName(String workFlowName, int projectId); - Set idSet = new HashSet<>(); - //If the incoming parameter is not empty, you need to add downstream workflow detail attributes - if(ids != null && !ids.isEmpty()) { - for(WorkFlowRelation workFlowRelation : workFlowRelations) { - idSet.add(workFlowRelation.getTargetWorkFlowId()); - } - for(int id : ids){ - idSet.remove(id); - } - if(!idSet.isEmpty()) { - workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, projectId)); - } - } + Map queryWorkFlowLineageByIds(Set ids,int projectId); - workFlowLists.put("workFlowList",workFlowLineageList); - workFlowLists.put("workFlowRelationList",workFlowRelations); - result.put(Constants.DATA_LIST, workFlowLists); - putMsg(result, Status.SUCCESS); - return result; - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 1c634a9cd2..a8f86a8515 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -17,41 +17,14 @@ package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; - -import org.apache.dolphinscheduler.api.enums.Status; -import org.apache.dolphinscheduler.api.utils.PageInfo; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.User; -import org.apache.dolphinscheduler.dao.entity.WorkerGroup; -import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - /** * work group service */ -@Service -public class WorkerGroupService extends BaseService { - - private static final String NO_NODE_EXCEPTION_REGEX = "KeeperException$NoNodeException"; - @Autowired - protected ZookeeperCachedOperator zookeeperCachedOperator; - @Autowired - ProcessInstanceMapper processInstanceMapper; +public interface WorkerGroupService { /** * query worker group paging @@ -62,118 +35,13 @@ public class WorkerGroupService extends BaseService { * @param pageSize page size * @return worker group list page */ - public Map queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) { - - // list from index - Integer fromIndex = (pageNo - 1) * pageSize; - // list to index - Integer toIndex = (pageNo - 1) * pageSize + pageSize; - - Map result = new HashMap<>(); - if (isNotAdmin(loginUser, result)) { - return result; - } - - List workerGroups = getWorkerGroups(true); - - List resultDataList = new ArrayList<>(); - - if (CollectionUtils.isNotEmpty(workerGroups)) { - List searchValDataList = new ArrayList<>(); - - if (StringUtils.isNotEmpty(searchVal)) { - for (WorkerGroup workerGroup : workerGroups) { - if (workerGroup.getName().contains(searchVal)) { - searchValDataList.add(workerGroup); - } - } - } else { - searchValDataList = workerGroups; - } - - if (searchValDataList.size() < pageSize) { - toIndex = (pageNo - 1) * pageSize + searchValDataList.size(); - } - resultDataList = searchValDataList.subList(fromIndex, toIndex); - } - - PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); - pageInfo.setTotalCount(resultDataList.size()); - pageInfo.setLists(resultDataList); - - result.put(Constants.DATA_LIST, pageInfo); - putMsg(result, Status.SUCCESS); - return result; - } + Map queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal); /** * query all worker group * * @return all worker group list */ - public Map queryAllGroup() { - Map result = new HashMap<>(); - - List workerGroups = getWorkerGroups(false); - - Set availableWorkerGroupSet = workerGroups.stream() - .map(workerGroup -> workerGroup.getName()) - .collect(Collectors.toSet()); - result.put(Constants.DATA_LIST, availableWorkerGroupSet); - putMsg(result, Status.SUCCESS); - return result; - } - - /** - * get worker groups - * - * @param isPaging whether paging - * @return WorkerGroup list - */ - private List getWorkerGroups(boolean isPaging) { - - String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; - List workerGroups = new ArrayList<>(); - List workerGroupList; - try { - workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); - } catch (Exception e) { - if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) { - if (isPaging) { - return workerGroups; - } else { - //ignore noNodeException return Default - WorkerGroup wg = new WorkerGroup(); - wg.setName(DEFAULT_WORKER_GROUP); - workerGroups.add(wg); - return workerGroups; - } - } else { - throw e; - } - } + Map queryAllGroup(); - for (String workerGroup : workerGroupList) { - String workerGroupPath = workerPath + "/" + workerGroup; - List childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); - String timeStamp = ""; - for (int i = 0; i < childrenNodes.size(); i++) { - String ip = childrenNodes.get(i); - childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":"))); - timeStamp = ip.substring(ip.lastIndexOf(":")); - } - if (CollectionUtils.isNotEmpty(childrenNodes)) { - WorkerGroup wg = new WorkerGroup(); - wg.setName(workerGroup); - if (isPaging) { - wg.setIpList(childrenNodes); - String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp); - wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); - wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); - } - workerGroups.add(wg); - } - } - return workerGroups; - } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java new file mode 100644 index 0000000000..6021bd225f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskRecordServiceImpl.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HISTORY_HIVE_LOG; +import static org.apache.dolphinscheduler.common.Constants.TASK_RECORD_TABLE_HIVE_LOG; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.TaskRecordService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.TaskRecordDao; +import org.apache.dolphinscheduler.dao.entity.TaskRecord; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.stereotype.Service; + +/** + * task record service impl + */ +@Service +public class TaskRecordServiceImpl extends BaseService implements TaskRecordService { + + /** + * query task record list paging + * + * @param taskName task name + * @param state state + * @param sourceTable source table + * @param destTable destination table + * @param taskDate task date + * @param startDate start time + * @param endDate end time + * @param pageNo page numbere + * @param pageSize page size + * @param isHistory is history + * @return task record list + */ + public Map queryTaskRecordListPaging(boolean isHistory, String taskName, String startDate, + String taskDate, String sourceTable, + String destTable, String endDate, + String state, Integer pageNo, Integer pageSize) { + Map result = new HashMap<>(); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + + Map map = new HashMap<>(); + map.put("taskName", taskName); + map.put("taskDate", taskDate); + map.put("state", state); + map.put("sourceTable", sourceTable); + map.put("targetTable", destTable); + map.put("startTime", startDate); + map.put("endTime", endDate); + map.put("offset", pageInfo.getStart().toString()); + map.put("pageSize", pageInfo.getPageSize().toString()); + + String table = isHistory ? TASK_RECORD_TABLE_HISTORY_HIVE_LOG : TASK_RECORD_TABLE_HIVE_LOG; + int count = TaskRecordDao.countTaskRecord(map, table); + List recordList = TaskRecordDao.queryAllTaskRecord(map, table); + pageInfo.setTotalCount(count); + pageInfo.setLists(recordList); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + + return result; + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java new file mode 100644 index 0000000000..c08086aac6 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.UdfFuncService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.UdfType; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.UdfFunc; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; +import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; +import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + +/** + * udf function service impl + */ +@Service +public class UdfFuncServiceImpl extends BaseService implements UdfFuncService { + + private static final Logger logger = LoggerFactory.getLogger(UdfFuncServiceImpl.class); + + @Autowired + private ResourceMapper resourceMapper; + + @Autowired + private UdfFuncMapper udfFuncMapper; + + @Autowired + private UDFUserMapper udfUserMapper; + + /** + * create udf function + * + * @param loginUser login user + * @param type udf type + * @param funcName function name + * @param argTypes argument types + * @param database database + * @param desc description + * @param resourceId resource id + * @param className class name + * @return create result code + */ + public Result createUdfFunction(User loginUser, + String funcName, + String className, + String argTypes, + String database, + String desc, + UdfType type, + int resourceId) { + Result result = new Result<>(); + + // if resource upload startup + if (!PropertyUtils.getResUploadStartupState()) { + logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); + putMsg(result, Status.HDFS_NOT_STARTUP); + return result; + } + + // verify udf func name exist + if (checkUdfFuncNameExists(funcName)) { + putMsg(result, Status.UDF_FUNCTION_EXISTS); + return result; + } + + Resource resource = resourceMapper.selectById(resourceId); + if (resource == null) { + logger.error("resourceId {} is not exist", resourceId); + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + + //save data + UdfFunc udf = new UdfFunc(); + Date now = new Date(); + udf.setUserId(loginUser.getId()); + udf.setFuncName(funcName); + udf.setClassName(className); + if (StringUtils.isNotEmpty(argTypes)) { + udf.setArgTypes(argTypes); + } + if (StringUtils.isNotEmpty(database)) { + udf.setDatabase(database); + } + udf.setDescription(desc); + udf.setResourceId(resourceId); + udf.setResourceName(resource.getFullName()); + udf.setType(type); + + udf.setCreateTime(now); + udf.setUpdateTime(now); + + udfFuncMapper.insert(udf); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * + * @param name name + * @return check result code + */ + private boolean checkUdfFuncNameExists(String name) { + List resource = udfFuncMapper.queryUdfByIdStr(null, name); + return resource != null && !resource.isEmpty(); + } + + /** + * query udf function + * + * @param id udf function id + * @return udf function detail + */ + public Map queryUdfFuncDetail(int id) { + Map result = new HashMap<>(); + UdfFunc udfFunc = udfFuncMapper.selectById(id); + if (udfFunc == null) { + putMsg(result, Status.RESOURCE_NOT_EXIST); + return result; + } + result.put(Constants.DATA_LIST, udfFunc); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * updateProcessInstance udf function + * + * @param udfFuncId udf function id + * @param type resource type + * @param funcName function name + * @param argTypes argument types + * @param database data base + * @param desc description + * @param resourceId resource id + * @param className class name + * @return update result code + */ + public Map updateUdfFunc(int udfFuncId, + String funcName, + String className, + String argTypes, + String database, + String desc, + UdfType type, + int resourceId) { + Map result = new HashMap<>(); + // verify udfFunc is exist + UdfFunc udf = udfFuncMapper.selectUdfById(udfFuncId); + + if (udf == null) { + result.put(Constants.STATUS, Status.UDF_FUNCTION_NOT_EXIST); + result.put(Constants.MSG, Status.UDF_FUNCTION_NOT_EXIST.getMsg()); + return result; + } + + // if resource upload startup + if (!PropertyUtils.getResUploadStartupState()) { + logger.error("resource upload startup state: {}", PropertyUtils.getResUploadStartupState()); + putMsg(result, Status.HDFS_NOT_STARTUP); + return result; + } + + // verify udfFuncName is exist + if (!funcName.equals(udf.getFuncName())) { + if (checkUdfFuncNameExists(funcName)) { + logger.error("UdfFunc {} has exist, can't create again.", funcName); + result.put(Constants.STATUS, Status.UDF_FUNCTION_EXISTS); + result.put(Constants.MSG, Status.UDF_FUNCTION_EXISTS.getMsg()); + return result; + } + } + + Resource resource = resourceMapper.selectById(resourceId); + if (resource == null) { + logger.error("resourceId {} is not exist", resourceId); + result.put(Constants.STATUS, Status.RESOURCE_NOT_EXIST); + result.put(Constants.MSG, Status.RESOURCE_NOT_EXIST.getMsg()); + return result; + } + Date now = new Date(); + udf.setFuncName(funcName); + udf.setClassName(className); + udf.setArgTypes(argTypes); + if (StringUtils.isNotEmpty(database)) { + udf.setDatabase(database); + } + udf.setDescription(desc); + udf.setResourceId(resourceId); + udf.setResourceName(resource.getFullName()); + udf.setType(type); + + udf.setUpdateTime(now); + + udfFuncMapper.updateById(udf); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query udf function list paging + * + * @param loginUser login user + * @param pageNo page number + * @param pageSize page size + * @param searchVal search value + * @return udf function list page + */ + public Map queryUdfFuncListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize) { + Map result = new HashMap<>(); + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + IPage udfFuncList = getUdfFuncsPage(loginUser, searchVal, pageSize, pageNo); + pageInfo.setTotalCount((int)udfFuncList.getTotal()); + pageInfo.setLists(udfFuncList.getRecords()); + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * get udf functions + * + * @param loginUser login user + * @param searchVal search value + * @param pageSize page size + * @param pageNo page number + * @return udf function list page + */ + private IPage getUdfFuncsPage(User loginUser, String searchVal, Integer pageSize, int pageNo) { + int userId = loginUser.getId(); + if (isAdmin(loginUser)) { + userId = 0; + } + Page page = new Page<>(pageNo, pageSize); + return udfFuncMapper.queryUdfFuncPaging(page, userId, searchVal); + } + + /** + * query udf list + * + * @param loginUser login user + * @param type udf type + * @return udf func list + */ + public Map queryUdfFuncList(User loginUser, Integer type) { + Map result = new HashMap<>(); + int userId = loginUser.getId(); + if (isAdmin(loginUser)) { + userId = 0; + } + List udfFuncList = udfFuncMapper.getUdfFuncByType(userId, type); + + result.put(Constants.DATA_LIST, udfFuncList); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * delete udf function + * + * @param id udf function id + * @return delete result code + */ + @Transactional(rollbackFor = RuntimeException.class) + public Result delete(int id) { + Result result = new Result<>(); + udfFuncMapper.deleteById(id); + udfUserMapper.deleteByUdfFuncId(id); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * verify udf function by name + * + * @param name name + * @return true if the name can user, otherwise return false + */ + public Result verifyUdfFuncByName(String name) { + Result result = new Result<>(); + if (checkUdfFuncNameExists(name)) { + putMsg(result, Status.UDF_FUNCTION_EXISTS); + } else { + putMsg(result, Status.SUCCESS); + } + return result; + } + +} \ No newline at end of file diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java new file mode 100644 index 0000000000..89a2cf5d76 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkFlowLineageServiceImpl.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.WorkFlowLineageService; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; +import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; +import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * work flow lineage service impl + */ +@Service +public class WorkFlowLineageServiceImpl extends BaseService implements WorkFlowLineageService { + + @Autowired + private WorkFlowLineageMapper workFlowLineageMapper; + + public Map queryWorkFlowLineageByName(String workFlowName, int projectId) { + Map result = new HashMap<>(); + List workFlowLineageList = workFlowLineageMapper.queryByName(workFlowName, projectId); + result.put(Constants.DATA_LIST, workFlowLineageList); + putMsg(result, Status.SUCCESS); + return result; + } + + private void getWorkFlowRelationRecursion(Set ids, List workFlowRelations, Set sourceIds) { + for (int id : ids) { + sourceIds.addAll(ids); + List workFlowRelationsTmp = workFlowLineageMapper.querySourceTarget(id); + if (workFlowRelationsTmp != null && !workFlowRelationsTmp.isEmpty()) { + Set idsTmp = new HashSet<>(); + for (WorkFlowRelation workFlowRelation:workFlowRelationsTmp) { + if (!sourceIds.contains(workFlowRelation.getTargetWorkFlowId())) { + idsTmp.add(workFlowRelation.getTargetWorkFlowId()); + } + } + workFlowRelations.addAll(workFlowRelationsTmp); + getWorkFlowRelationRecursion(idsTmp, workFlowRelations,sourceIds); + } + } + } + + public Map queryWorkFlowLineageByIds(Set ids,int projectId) { + Map result = new HashMap<>(); + List workFlowLineageList = workFlowLineageMapper.queryByIds(ids, projectId); + Map workFlowLists = new HashMap<>(); + Set idsV = new HashSet<>(); + if (ids == null || ids.isEmpty()) { + for (WorkFlowLineage workFlowLineage:workFlowLineageList) { + idsV.add(workFlowLineage.getWorkFlowId()); + } + } else { + idsV = ids; + } + List workFlowRelations = new ArrayList<>(); + Set sourceIds = new HashSet<>(); + getWorkFlowRelationRecursion(idsV, workFlowRelations, sourceIds); + + Set idSet = new HashSet<>(); + //If the incoming parameter is not empty, you need to add downstream workflow detail attributes + if (ids != null && !ids.isEmpty()) { + for (WorkFlowRelation workFlowRelation : workFlowRelations) { + idSet.add(workFlowRelation.getTargetWorkFlowId()); + } + for (int id : ids) { + idSet.remove(id); + } + if (!idSet.isEmpty()) { + workFlowLineageList.addAll(workFlowLineageMapper.queryByIds(idSet, projectId)); + } + } + + workFlowLists.put(Constants.WORKFLOW_LIST, workFlowLineageList); + workFlowLists.put(Constants.WORKFLOW_RELATION_LIST, workFlowRelations); + result.put(Constants.DATA_LIST, workFlowLists); + putMsg(result, Status.SUCCESS); + return result; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java new file mode 100644 index 0000000000..391b7c365f --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.service.impl; + +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + +import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.BaseService; +import org.apache.dolphinscheduler.api.service.WorkerGroupService; +import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + + +/** + * work group service impl + */ +@Service +public class WorkerGroupServiceImpl extends BaseService implements WorkerGroupService { + + private static final String NO_NODE_EXCEPTION_REGEX = "KeeperException$NoNodeException"; + + @Autowired + protected ZookeeperCachedOperator zookeeperCachedOperator; + + @Autowired + ProcessInstanceMapper processInstanceMapper; + + /** + * query worker group paging + * + * @param loginUser login user + * @param pageNo page number + * @param searchVal search value + * @param pageSize page size + * @return worker group list page + */ + public Map queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) { + // list from index + int fromIndex = (pageNo - 1) * pageSize; + // list to index + int toIndex = (pageNo - 1) * pageSize + pageSize; + + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + + List workerGroups = getWorkerGroups(true); + + List resultDataList = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(workerGroups)) { + List searchValDataList = new ArrayList<>(); + + if (StringUtils.isNotEmpty(searchVal)) { + for (WorkerGroup workerGroup : workerGroups) { + if (workerGroup.getName().contains(searchVal)) { + searchValDataList.add(workerGroup); + } + } + } else { + searchValDataList = workerGroups; + } + + if (searchValDataList.size() < pageSize) { + toIndex = (pageNo - 1) * pageSize + searchValDataList.size(); + } + resultDataList = searchValDataList.subList(fromIndex, toIndex); + } + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotalCount(resultDataList.size()); + pageInfo.setLists(resultDataList); + + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query all worker group + * + * @return all worker group list + */ + public Map queryAllGroup() { + Map result = new HashMap<>(); + + List workerGroups = getWorkerGroups(false); + + Set availableWorkerGroupSet = workerGroups.stream() + .map(WorkerGroup::getName) + .collect(Collectors.toSet()); + result.put(Constants.DATA_LIST, availableWorkerGroupSet); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * get worker groups + * + * @param isPaging whether paging + * @return WorkerGroup list + */ + private List getWorkerGroups(boolean isPaging) { + String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; + List workerGroups = new ArrayList<>(); + List workerGroupList; + try { + workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); + } catch (Exception e) { + if (e.getMessage().contains(NO_NODE_EXCEPTION_REGEX)) { + if (!isPaging) { + //ignore noNodeException return Default + WorkerGroup wg = new WorkerGroup(); + wg.setName(DEFAULT_WORKER_GROUP); + workerGroups.add(wg); + } + return workerGroups; + } else { + throw e; + } + } + + for (String workerGroup : workerGroupList) { + String workerGroupPath = String.format("%s/%s", workerPath, workerGroup); + List childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); + String timeStamp = ""; + for (int i = 0; i < childrenNodes.size(); i++) { + String ip = childrenNodes.get(i); + childrenNodes.set(i, ip.substring(0, ip.lastIndexOf(":"))); + timeStamp = ip.substring(ip.lastIndexOf(":")); + } + if (CollectionUtils.isNotEmpty(childrenNodes)) { + WorkerGroup wg = new WorkerGroup(); + wg.setName(workerGroup); + if (isPaging) { + wg.setIpList(childrenNodes); + String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0) + timeStamp); + wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); + wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); + } + workerGroups.add(wg); + } + } + return workerGroups; + } + +} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java index 884e9b6b36..d6dd9bdfb8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java @@ -16,9 +16,8 @@ */ package org.apache.dolphinscheduler.api.service; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.impl.UdfFuncServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; @@ -32,6 +31,12 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; import org.apache.dolphinscheduler.dao.mapper.UDFUserMapper; import org.apache.dolphinscheduler.dao.mapper.UdfFuncMapper; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -45,10 +50,8 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @RunWith(PowerMockRunner.class) @PrepareForTest(PropertyUtils.class) @@ -56,7 +59,7 @@ public class UdfFuncServiceTest { private static final Logger logger = LoggerFactory.getLogger(UdfFuncServiceTest.class); @InjectMocks - private UdfFuncService udfFuncService; + private UdfFuncServiceImpl udfFuncService; @Mock private ResourceMapper resourceMapper; @Mock diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java index 999e079bf5..459aa90cd8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkFlowLineageServiceTest.java @@ -16,11 +16,20 @@ */ package org.apache.dolphinscheduler.api.service; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.api.service.impl.WorkFlowLineageServiceImpl; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.EncryptionUtils; import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage; import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation; import org.apache.dolphinscheduler.dao.mapper.WorkFlowLineageMapper; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -28,15 +37,11 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.util.*; - -import static org.mockito.Mockito.when; - @RunWith(MockitoJUnitRunner.class) public class WorkFlowLineageServiceTest { @InjectMocks - private WorkFlowLineageService workFlowLineageService; + private WorkFlowLineageServiceImpl workFlowLineageService; @Mock private WorkFlowLineageMapper workFlowLineageMapper; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 4a1d874c8a..233b562527 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.UserType; @@ -49,7 +50,7 @@ public class WorkerGroupServiceTest { private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class); @InjectMocks - private WorkerGroupService workerGroupService; + private WorkerGroupServiceImpl workerGroupService; @Mock private ProcessInstanceMapper processInstanceMapper; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 45af3b2700..d2855e0067 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -898,6 +898,12 @@ public final class Constants { public static final String TOTAL = "total"; + /** + * workflow + */ + public static final String WORKFLOW_LIST = "workFlowList"; + public static final String WORKFLOW_RELATION_LIST = "workFlowRelationList"; + /** * session user */