From 9e763ad0d1e5576e6e5735fb2afd8638011e54dc Mon Sep 17 00:00:00 2001 From: wangxj3 <857234426@qq.com> Date: Fri, 24 Dec 2021 16:37:59 +0800 Subject: [PATCH] [Feature-#6422] [api-server] task group queue (#7491) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add task group * modify task group * pull dev * add license header * modify code style * fix code style * fix sql error * fix error * fix test * fix test * fix test * fix test * fix code style * fix ut * code style * fix unit test * test ut * ut * add unittest * test ut * modify back ut * majorization code * fix conflict * fix ut * add task group api * reset file * fix ut * fix lost column * fix ut * fix ut * fix ut * fix ut * delete duplicate code * fix code style 、name * fix ut * fix mapper Co-authored-by: wangxj --- .../api/controller/TaskGroupController.java | 103 +++++++++------ .../api/service/TaskGroupQueueService.java | 8 +- .../api/service/TaskGroupService.java | 12 +- .../impl/TaskGroupQueueServiceImpl.java | 43 +++++-- .../service/impl/TaskGroupServiceImpl.java | 90 ++++++++----- .../resources/i18n/messages_en_US.properties | 118 +++++++++--------- .../resources/i18n/messages_zh_CN.properties | 2 + .../controller/TaskGroupControllerTest.java | 23 +--- .../api/service/TaskGroupServiceTest.java | 11 +- .../common/model/TaskNode.java | 25 ++++ .../dao/entity/TaskDefinition.java | 12 ++ .../dao/entity/TaskGroup.java | 15 ++- .../dao/entity/TaskGroupQueue.java | 69 +++++++--- .../dao/entity/TaskInstance.java | 14 +++ .../dao/mapper/TaskGroupMapper.java | 4 +- .../dao/mapper/TaskGroupQueueMapper.java | 9 +- .../dao/mapper/TaskGroupMapper.xml | 13 +- .../dao/mapper/TaskGroupQueueMapper.xml | 36 +++++- .../resources/sql/dolphinscheduler_h2.sql | 5 +- .../resources/sql/dolphinscheduler_mysql.sql | 4 +- .../sql/dolphinscheduler_postgresql.sql | 3 + .../dao/mapper/TaskGroupMapperTest.java | 4 +- .../dao/mapper/TaskGroupQueueMapperTest.java | 1 + .../ClickHouseDataSourceProcessorTest.java | 101 --------------- .../master/runner/WorkflowExecuteThread.java | 8 +- .../service/process/ProcessService.java | 2 + 26 files changed, 434 insertions(+), 301 deletions(-) delete mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java index c2f22c5d7a..ba0b9af215 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java @@ -17,13 +17,10 @@ package org.apache.dolphinscheduler.api.controller; -import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR; - +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; @@ -43,13 +40,17 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; import springfox.documentation.annotations.ApiIgnore; +import java.util.Map; + +import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR; + /** * task group controller @@ -72,9 +73,10 @@ public class TaskGroupController extends BaseController { * @param name project id * @return result and msg code */ - @ApiOperation(value = "createTaskGroup", notes = "CREATE_TAKS_GROUP_NOTE") + @ApiOperation(value = "create", notes = "CREATE_TAKS_GROUP_NOTE") @ApiImplicitParams({ @ApiImplicitParam(name = "name", value = "NAME", dataType = "String"), + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", type = "Long"), @ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"), @ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"), @@ -85,9 +87,10 @@ public class TaskGroupController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result createTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("name") String name, + @RequestParam(value = "projectCode", required = false, defaultValue = "0") Long projectcode, @RequestParam("description") String description, @RequestParam("groupSize") Integer groupSize) { - Map result = taskGroupService.createTaskGroup(loginUser, name, description, groupSize); + Map result = taskGroupService.createTaskGroup(loginUser, projectcode, name, description, groupSize); return returnDataList(result); } @@ -101,7 +104,7 @@ public class TaskGroupController extends BaseController { * @param name project id * @return result and msg code */ - @ApiOperation(value = "updateTaskGroup", notes = "UPDATE_TAKS_GROUP_NOTE") + @ApiOperation(value = "update", notes = "UPDATE_TAKS_GROUP_NOTE") @ApiImplicitParams({ @ApiImplicitParam(name = "id", value = "id", dataType = "Int"), @ApiImplicitParam(name = "name", value = "NAME", dataType = "String"), @@ -130,19 +133,22 @@ public class TaskGroupController extends BaseController { * @param pageSize page size * @return queue list */ - @ApiOperation(value = "queryAllTaskGroup", notes = "QUERY_ALL_TASK_GROUP_NOTES") + @ApiOperation(value = "list-paging", notes = "QUERY_ALL_TASK_GROUP_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), + @ApiImplicitParam(name = "name", value = "NAME", required = false, dataType = "String"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20") }) - @GetMapping(value = "/query-list-all") + @GetMapping(value = "/list-paging") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_GROUP_LIST_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryAllTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "name", required = false) String name, + @RequestParam(value = "status", required = false) Integer status, @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize) { - Map result = taskGroupService.queryAllTaskGroup(loginUser, pageNo, pageSize); + Map result = taskGroupService.queryAllTaskGroup(loginUser, name, status, pageNo, pageSize); return returnDataList(result); } @@ -176,27 +182,27 @@ public class TaskGroupController extends BaseController { /** * query task group list paging by project id * - * @param loginUser login user - * @param pageNo page number - * @param name project id - * @param pageSize page size + * @param loginUser login user + * @param pageNo page number + * @param projectCode project id + * @param pageSize page size * @return queue list */ @ApiOperation(value = "queryTaskGroupByName", notes = "QUERY_TASK_GROUP_LIST_BY_PROJECT_ID_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"), - @ApiImplicitParam(name = "name", value = "PROJECT_ID", required = true, dataType = "String") + @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "String") }) - @GetMapping(value = "/query-list-by-name") + @GetMapping(value = "/query-list-by-projectCode") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_GROUP_LIST_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result queryTaskGroupByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + public Result queryTaskGroupByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("pageNo") Integer pageNo, - @RequestParam(value = "name", required = false) String name, + @RequestParam(value = "projectCode", required = false) Long projectCode, @RequestParam("pageSize") Integer pageSize) { - Map result = taskGroupService.queryTaskGroupByName(loginUser, pageNo, pageSize, name); + Map result = taskGroupService.queryTaskGroupByProjectCode(loginUser, pageNo, pageSize, projectCode); return returnDataList(result); } @@ -247,20 +253,43 @@ public class TaskGroupController extends BaseController { * force start task without task group * * @param loginUser login user - * @param taskId task id + * @param queueId task group queue id + * @return result + */ + @ApiOperation(value = "forceStart", notes = "WAKE_TASK_COMPULSIVELY_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "queueId", value = "TASK_GROUP_QUEUEID", required = true, dataType = "Int") + }) + @PostMapping(value = "/forceStart") + @ResponseStatus(HttpStatus.CREATED) + @ApiException(START_TASK_GROUP_ERROR) + @AccessLogAnnotation(ignoreRequestArgs = "loginUser") + public Result forceStart(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "queueId") Integer queueId) { + Map result = taskGroupService.forceStartTask(loginUser, queueId); + return returnDataList(result); + } + + /** + * force start task without task group + * + * @param loginUser login user + * @param queueId task group queue id * @return result */ - @ApiOperation(value = "wakeCompulsively", notes = "WAKE_TASK_COMPULSIVELY_NOTES") + @ApiOperation(value = "modifyPriority", notes = "WAKE_TASK_COMPULSIVELY_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "taskId", value = "TASKID", required = true, dataType = "Int") + @ApiImplicitParam(name = "queueId", value = "TASK_GROUP_QUEUEID", required = true, dataType = "Int"), + @ApiImplicitParam(name = "priority", value = "TASK_GROUP_QUEUE_PRIORITY", required = true, dataType = "Int") }) - @PostMapping(value = "/wake-task-compulsively") + @PostMapping(value = "/modifyPriority") @ResponseStatus(HttpStatus.CREATED) @ApiException(START_TASK_GROUP_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") - public Result wakeCompulsively(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "taskId") Integer taskId) { - Map result = taskGroupService.wakeTaskcompulsively(loginUser, taskId); + public Result modifyPriority(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "queueId") Integer queueId, + @RequestParam(value = "priority") Integer priority) { + Map result = taskGroupService.modifyPriority(loginUser, queueId,priority); return returnDataList(result); } @@ -287,9 +316,13 @@ public class TaskGroupController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryTasksByGroupId(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("groupId") Integer groupId, + @RequestParam(value = "taskInstanceName",required = false) String taskName, + @RequestParam(value = "processInstanceName",required = false) String processName, + @RequestParam(value = "status",required = false) Integer status, @RequestParam("pageNo") Integer pageNo, @RequestParam("pageSize") Integer pageSize) { - Map result = taskGroupQueueService.queryTasksByGroupId(loginUser, groupId, pageNo, pageSize); + Map result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName,processName,status, + groupId, pageNo, pageSize); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java index 08d3f57213..651267ebb2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java @@ -35,8 +35,8 @@ public interface TaskGroupQueueService { * @return tasks list */ - Map queryTasksByGroupId(User loginUser, int groupId, int pageNo, - int pageSize); + Map queryTasksByGroupId(User loginUser, String taskName + , String processName, Integer status, int groupId, int pageNo,int pageSize); /** * query tasks in task group queue by project id @@ -65,5 +65,7 @@ public interface TaskGroupQueueService { */ boolean deleteByTaskId(int taskId); - void forceStartTask(int taskId,int forceStart); + void forceStartTask(int queueId,int forceStart); + + void modifyPriority(Integer queueId, Integer priority); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java index 1a73b04ce0..465f2efa79 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java @@ -35,7 +35,7 @@ public interface TaskGroupService { * @param groupSize task group total size * @return the result code and msg */ - Map createTaskGroup(User loginUser, String name, + Map createTaskGroup(User loginUser, long projectcode,String name, String description, int groupSize); /** @@ -66,7 +66,7 @@ public interface TaskGroupService { * @param pageSize page size * @return the result code and msg */ - Map queryAllTaskGroup(User loginUser, int pageNo, int pageSize); + Map queryAllTaskGroup(User loginUser, String name,Integer status, int pageNo, int pageSize); /** * query all task group by status @@ -88,7 +88,7 @@ public interface TaskGroupService { * @param name name * @return the result code and msg */ - Map queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name); + Map queryTaskGroupByProjectCode(User loginUser, int pageNo, int pageSize, Long projectCode); /** * query all task group by id @@ -110,7 +110,7 @@ public interface TaskGroupService { * @param status status * @return the result code and msg */ - Map doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status); + Map doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, Integer status); /** * close a task group @@ -136,5 +136,7 @@ public interface TaskGroupService { * @param taskId task id * @return result */ - Map wakeTaskcompulsively(User loginUser, int taskId); + Map forceStartTask(User loginUser, int taskId); + + Map modifyPriority(User loginUser, Integer queueId, Integer priority); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java index 3dfe3110fd..b07f18a1a2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java @@ -17,25 +17,26 @@ package org.apache.dolphinscheduler.api.service.impl; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; - -import java.util.HashMap; -import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * task group queue service @@ -49,6 +50,9 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr @Autowired private TaskInstanceMapper taskInstanceMapper; + @Autowired + private ProjectService projectService; + private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceImpl.class); /** @@ -61,8 +65,22 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr * @return tasks list */ @Override - public Map queryTasksByGroupId(User loginUser, int groupId, int pageNo, int pageSize) { - return this.doQuery(loginUser, pageNo, pageSize, groupId); + public Map queryTasksByGroupId(User loginUser, String taskName + , String processName, Integer status, int groupId, int pageNo, int pageSize) { + Map result = new HashMap<>(); + Page page = new Page<>(pageNo, pageSize); + Map objectMap = this.projectService.queryAuthorizedProject(loginUser, loginUser.getId()); + List projects = (List)objectMap.get(Constants.DATA_LIST); + IPage taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page, taskName + ,processName,status,groupId,projects); + + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + pageInfo.setTotal((int) taskGroupQueue.getTotal()); + pageInfo.setTotalList(taskGroupQueue.getRecords()); + + result.put(Constants.DATA_LIST, pageInfo); + putMsg(result, Status.SUCCESS); + return result; } /** @@ -124,7 +142,12 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr } @Override - public void forceStartTask(int taskId,int forceStart) { - taskGroupQueueMapper.updateForceStart(taskId,forceStart); + public void forceStartTask(int queueId,int forceStart) { + taskGroupQueueMapper.updateForceStart(queueId,forceStart); + } + + @Override + public void modifyPriority(Integer queueId, Integer priority) { + taskGroupQueueMapper.modifyPriority(queueId,priority); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java index 7802850e54..b2354c28ad 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.api.service.impl; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.TaskGroupQueueService; import org.apache.dolphinscheduler.api.service.TaskGroupService; @@ -28,6 +30,10 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.spi.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Date; @@ -35,14 +41,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; - /** * task Group Service */ @@ -70,7 +68,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe * @return the result code and msg */ @Override - public Map createTaskGroup(User loginUser, String name, String description, int groupSize) { + public Map createTaskGroup(User loginUser, long projectcode, String name, String description, int groupSize) { Map result = new HashMap<>(); if (isNotAdmin(loginUser, result)) { return result; @@ -82,18 +80,20 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe if (groupSize <= 0) { putMsg(result, Status.TASK_GROUP_SIZE_ERROR); return result; - } TaskGroup taskGroup1 = taskGroupMapper.queryByName(loginUser.getId(), name); if (taskGroup1 != null) { putMsg(result, Status.TASK_GROUP_NAME_EXSIT); return result; } - TaskGroup taskGroup = new TaskGroup(name, description, - groupSize, loginUser.getId(),Flag.YES.getCode()); - int insert = taskGroupMapper.insert(taskGroup); - logger.info("insert result:{}", insert); - putMsg(result, Status.SUCCESS); + TaskGroup taskGroup = new TaskGroup(name, projectcode, description, + groupSize, loginUser.getId(), Flag.YES.getCode()); + if (taskGroupMapper.insert(taskGroup) > 0) { + putMsg(result, Status.SUCCESS); + } else { + putMsg(result, Status.CREATE_TASK_GROUP_ERROR); + return result; + } return result; } @@ -137,7 +137,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe */ @Override public boolean isTheTaskGroupAvailable(int id) { - return taskGroupMapper.selectCountByIdStatus(id,Flag.YES.getCode()) == 1; + return taskGroupMapper.selectCountByIdStatus(id, Flag.YES.getCode()) == 1; } /** @@ -149,8 +149,8 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe * @return the result code and msg */ @Override - public Map queryAllTaskGroup(User loginUser, int pageNo, int pageSize) { - return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, 0); + public Map queryAllTaskGroup(User loginUser, String name, Integer status, int pageNo, int pageSize) { + return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, status); } /** @@ -177,8 +177,28 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe * @return the result code and msg */ @Override - public Map queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name) { - return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, 0); + public Map queryTaskGroupByProjectCode(User loginUser, int pageNo, int pageSize, Long projectCode) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + Page page = new Page<>(pageNo, pageSize); + IPage taskGroupPaging = taskGroupMapper.queryTaskGroupPagingByProjectCode(page, projectCode); + + return getStringObjectMap(pageNo, pageSize, result, taskGroupPaging); + } + + private Map getStringObjectMap(int pageNo, int pageSize, Map result, IPage taskGroupPaging) { + PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); + int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal(); + List list = taskGroupPaging == null ? new ArrayList() : taskGroupPaging.getRecords(); + pageInfo.setTotal(total); + pageInfo.setTotalList(list); + + result.put(Constants.DATA_LIST, pageInfo); + logger.info("select result:{}", taskGroupPaging); + putMsg(result, Status.SUCCESS); + return result; } /** @@ -212,7 +232,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe */ @Override - public Map doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status) { + public Map doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, Integer status) { Map result = new HashMap<>(); if (isNotAdmin(loginUser, result)) { return result; @@ -220,16 +240,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe Page page = new Page<>(pageNo, pageSize); IPage taskGroupPaging = taskGroupMapper.queryTaskGroupPaging(page, userId, name, status); - PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); - int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal(); - List list = taskGroupPaging == null ? new ArrayList() : taskGroupPaging.getRecords(); - pageInfo.setTotal(total); - pageInfo.setTotalList(list); - - result.put(Constants.DATA_LIST, pageInfo); - logger.info("select result:{}", taskGroupPaging); - putMsg(result, Status.SUCCESS); - return result; + return getStringObjectMap(pageNo, pageSize, result, taskGroupPaging); } /** @@ -282,16 +293,27 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe * wake a task manually * * @param loginUser - * @param taskId task id + * @param queueId task group queue id * @return result */ @Override - public Map wakeTaskcompulsively(User loginUser, int taskId) { + public Map forceStartTask(User loginUser, int queueId) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + taskGroupQueueService.forceStartTask(queueId, Flag.YES.getCode()); + putMsg(result, Status.SUCCESS); + return result; + } + + @Override + public Map modifyPriority(User loginUser, Integer queueId, Integer priority) { Map result = new HashMap<>(); if (isNotAdmin(loginUser, result)) { return result; } - taskGroupQueueService.forceStartTask(taskId,Flag.YES.getCode()); + taskGroupQueueService.modifyPriority(queueId, priority); putMsg(result, Status.SUCCESS); return result; } diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index 19dc343c59..56800f29e6 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -17,26 +17,26 @@ QUERY_SCHEDULE_LIST_NOTES=query schedule list EXECUTE_PROCESS_TAG=execute process related operation PROCESS_INSTANCE_EXECUTOR_TAG=process instance executor related operation -RUN_PROCESS_INSTANCE_NOTES=run process instance +RUN_PROCESS_INSTANCE_NOTES=run process instance START_NODE_LIST=start node list(node name) TASK_DEPEND_TYPE=task depend type COMMAND_TYPE=command type RUN_MODE=run mode TIMEOUT=timeout -EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance +EXECUTE_ACTION_TO_PROCESS_INSTANCE_NOTES=execute action to process instance EXECUTE_TYPE=execute type -START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition -GET_RECEIVER_CC_NOTES=query receiver cc +START_CHECK_PROCESS_DEFINITION_NOTES=start check process definition +GET_RECEIVER_CC_NOTES=query receiver cc DESC=description GROUP_NAME=group name GROUP_TYPE=group type -QUERY_ALERT_GROUP_LIST_NOTES=query alert group list -UPDATE_ALERT_GROUP_NOTES=update alert group -DELETE_ALERT_GROUP_BY_ID_NOTES=delete alert group by id -VERIFY_ALERT_GROUP_NAME_NOTES=verify alert group name, check alert group exist or not -GRANT_ALERT_GROUP_NOTES=grant alert group +QUERY_ALERT_GROUP_LIST_NOTES=query alert group list +UPDATE_ALERT_GROUP_NOTES=update alert group +DELETE_ALERT_GROUP_BY_ID_NOTES=delete alert group by id +VERIFY_ALERT_GROUP_NAME_NOTES=verify alert group name, check alert group exist or not +GRANT_ALERT_GROUP_NOTES=grant alert group USER_IDS=user id list -EXECUTOR_TAG=executor operation +EXECUTOR_TAG=executor operation EXECUTOR_NAME=executor name WORKER_GROUP=work group startParams=start parameters @@ -48,7 +48,7 @@ UPDATE_ALERT_PLUGIN_INSTANCE_NOTES=update alert plugin instance operation CREATE_ALERT_PLUGIN_INSTANCE_NOTES=create alert plugin instance operation DELETE_ALERT_PLUGIN_INSTANCE_NOTES=delete alert plugin instance operation QUERY_ALERT_PLUGIN_INSTANCE_LIST_PAGING_NOTES=query alert plugin instance paging -QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES=query topN longest running process instance +QUERY_TOPN_LONGEST_RUNNING_PROCESS_INSTANCE_NOTES=query topN longest running process instance ALERT_PLUGIN_INSTANCE_NAME=alert plugin instance name ALERT_PLUGIN_DEFINE_ID=alert plugin define id ALERT_PLUGIN_ID=alert plugin id @@ -59,27 +59,27 @@ VERIFY_ALERT_INSTANCE_NAME_NOTES=verify alert instance name DATA_SOURCE_PARAM=datasource parameter QUERY_ALL_ALERT_PLUGIN_INSTANCE_NOTES=query all alert plugin instances GET_ALERT_PLUGIN_INSTANCE_NOTES=get alert plugin instance operation -CREATE_ALERT_GROUP_NOTES=create alert group +CREATE_ALERT_GROUP_NOTES=create alert group WORKER_GROUP_TAG=worker group related operation SAVE_WORKER_GROUP_NOTES=create worker group WORKER_GROUP_NAME=worker group name WORKER_IP_LIST=worker ip list, eg. 192.168.1.1,192.168.1.2 QUERY_WORKER_GROUP_PAGING_NOTES=query worker group paging -QUERY_WORKER_GROUP_LIST_NOTES=query worker group list -DELETE_WORKER_GROUP_BY_ID_NOTES=delete worker group by id +QUERY_WORKER_GROUP_LIST_NOTES=query worker group list +DELETE_WORKER_GROUP_BY_ID_NOTES=delete worker group by id DATA_ANALYSIS_TAG=analysis related operation of task state -COUNT_TASK_STATE_NOTES=count task state +COUNT_TASK_STATE_NOTES=count task state COUNT_PROCESS_INSTANCE_NOTES=count process instance state -COUNT_PROCESS_DEFINITION_BY_USER_NOTES=count process definition by user -COUNT_COMMAND_STATE_NOTES=count command state +COUNT_PROCESS_DEFINITION_BY_USER_NOTES=count process definition by user +COUNT_COMMAND_STATE_NOTES=count command state COUNT_QUEUE_STATE_NOTES=count the running status of the task in the queue\ ACCESS_TOKEN_TAG=access token related operation MONITOR_TAG=monitor related operation MASTER_LIST_NOTES=master server list WORKER_LIST_NOTES=worker server list -QUERY_DATABASE_STATE_NOTES=query database state -QUERY_ZOOKEEPER_STATE_NOTES=QUERY ZOOKEEPER STATE +QUERY_DATABASE_STATE_NOTES=query database state +QUERY_ZOOKEEPER_STATE_NOTES=QUERY ZOOKEEPER STATE TASK_STATE=task instance state SOURCE_TABLE=SOURCE TABLE DEST_TABLE=dest table @@ -94,18 +94,18 @@ DATA_SOURCE_HOST=DATA SOURCE HOST DATA_SOURCE_PORT=data source port DATABASE_NAME=database name QUEUE_TAG=queue related operation -QUERY_QUEUE_LIST_NOTES=query queue list -QUERY_QUEUE_LIST_PAGING_NOTES=query queue list paging +QUERY_QUEUE_LIST_NOTES=query queue list +QUERY_QUEUE_LIST_PAGING_NOTES=query queue list paging CREATE_QUEUE_NOTES=create queue YARN_QUEUE_NAME=yarn(hadoop) queue name QUEUE_ID=queue id TENANT_DESC=tenant desc -QUERY_TENANT_LIST_PAGING_NOTES=query tenant list paging -QUERY_TENANT_LIST_NOTES=query tenant list -UPDATE_TENANT_NOTES=update tenant -DELETE_TENANT_NOTES=delete tenant +QUERY_TENANT_LIST_PAGING_NOTES=query tenant list paging +QUERY_TENANT_LIST_NOTES=query tenant list +UPDATE_TENANT_NOTES=update tenant +DELETE_TENANT_NOTES=delete tenant RESOURCES_TAG=resource center related operation -CREATE_RESOURCE_NOTES=create resource +CREATE_RESOURCE_NOTES=create resource RESOURCE_TYPE=resource file type RESOURCE_NAME=resource name RESOURCE_DESC=resource file desc @@ -114,29 +114,29 @@ RESOURCE_ID=resource id QUERY_RESOURCE_LIST_NOTES=query resource list DELETE_RESOURCE_BY_ID_NOTES=delete resource by id VIEW_RESOURCE_BY_ID_NOTES=view resource by id -ONLINE_CREATE_RESOURCE_NOTES=online create resource +ONLINE_CREATE_RESOURCE_NOTES=online create resource SUFFIX=resource file suffix CONTENT=resource file content UPDATE_RESOURCE_NOTES=edit resource file online DOWNLOAD_RESOURCE_NOTES=download resource file -CREATE_UDF_FUNCTION_NOTES=create udf function +CREATE_UDF_FUNCTION_NOTES=create udf function UDF_TYPE=UDF type FUNC_NAME=function name CLASS_NAME=package and class name ARG_TYPES=arguments UDF_DESC=udf desc -VIEW_UDF_FUNCTION_NOTES=view udf function -UPDATE_UDF_FUNCTION_NOTES=update udf function -QUERY_UDF_FUNCTION_LIST_PAGING_NOTES=query udf function list paging -VERIFY_UDF_FUNCTION_NAME_NOTES=verify udf function name -DELETE_UDF_FUNCTION_NOTES=delete udf function -AUTHORIZED_FILE_NOTES=authorized file -UNAUTHORIZED_FILE_NOTES=unauthorized file -AUTHORIZED_UDF_FUNC_NOTES=authorized udf func -UNAUTHORIZED_UDF_FUNC_NOTES=unauthorized udf func -VERIFY_QUEUE_NOTES=verify queue +VIEW_UDF_FUNCTION_NOTES=view udf function +UPDATE_UDF_FUNCTION_NOTES=update udf function +QUERY_UDF_FUNCTION_LIST_PAGING_NOTES=query udf function list paging +VERIFY_UDF_FUNCTION_NAME_NOTES=verify udf function name +DELETE_UDF_FUNCTION_NOTES=delete udf function +AUTHORIZED_FILE_NOTES=authorized file +UNAUTHORIZED_FILE_NOTES=unauthorized file +AUTHORIZED_UDF_FUNC_NOTES=authorized udf func +UNAUTHORIZED_UDF_FUNC_NOTES=unauthorized udf func +VERIFY_QUEUE_NOTES=verify queue TENANT_TAG=tenant related operation -CREATE_TENANT_NOTES=create tenant +CREATE_TENANT_NOTES=create tenant TENANT_CODE=os tenant code QUEUE_NAME=queue name PASSWORD=password @@ -146,19 +146,19 @@ DATA_SOURCE_KERBEROS_KRB5_CONF=the kerberos authentication parameter java.securi DATA_SOURCE_KERBEROS_KEYTAB_USERNAME=the kerberos authentication parameter login.user.keytab.username DATA_SOURCE_KERBEROS_KEYTAB_PATH=the kerberos authentication parameter login.user.keytab.path PROJECT_TAG=project related operation -CREATE_PROJECT_NOTES=create project +CREATE_PROJECT_NOTES=create project PROJECT_DESC=project description -UPDATE_PROJECT_NOTES=update project +UPDATE_PROJECT_NOTES=update project PROJECT_ID=project id QUERY_PROJECT_BY_ID_NOTES=query project info by project id -QUERY_PROJECT_LIST_PAGING_NOTES=QUERY PROJECT LIST PAGING +QUERY_PROJECT_LIST_PAGING_NOTES=QUERY PROJECT LIST PAGING QUERY_ALL_PROJECT_LIST_NOTES=query all project list -DELETE_PROJECT_BY_ID_NOTES=delete project by id +DELETE_PROJECT_BY_ID_NOTES=delete project by id QUERY_UNAUTHORIZED_PROJECT_NOTES=query unauthorized project QUERY_AUTHORIZED_PROJECT_NOTES=query authorized project QUERY_AUTHORIZED_USER_NOTES=query authorized user TASK_RECORD_TAG=task record related operation -QUERY_TASK_RECORD_LIST_PAGING_NOTES=query task record list paging +QUERY_TASK_RECORD_LIST_PAGING_NOTES=query task record list paging CREATE_TOKEN_NOTES=create access token for specified user UPDATE_TOKEN_NOTES=update access token for specified user TOKEN=access token string, it will be automatically generated when it absent @@ -178,11 +178,11 @@ PROCESS_INSTANCE_END_TIME=process instance end time PROCESS_INSTANCE_SIZE=process instance size PROCESS_INSTANCE_PRIORITY=process instance priority EXPECTED_PARALLELISM_NUMBER=custom parallelism to set the complement task threads -UPDATE_SCHEDULE_NOTES=update schedule +UPDATE_SCHEDULE_NOTES=update schedule SCHEDULE_ID=schedule id ONLINE_SCHEDULE_NOTES=online schedule -OFFLINE_SCHEDULE_NOTES=offline schedule -QUERY_SCHEDULE_NOTES=query schedule +OFFLINE_SCHEDULE_NOTES=offline schedule +QUERY_SCHEDULE_NOTES=query schedule QUERY_SCHEDULE_LIST_PAGING_NOTES=query schedule list paging LOGIN_TAG=User login related operations USER_NAME=user name @@ -218,7 +218,7 @@ PROCESS_INSTANCE_ID=process instance id PROCESS_INSTANCE_JSON=process instance info(json format) SCHEDULE_TIME=schedule time SYNC_DEFINE=update the information of the process instance to the process definition -RECOVERY_PROCESS_INSTANCE_FLAG=whether to recovery process instance +RECOVERY_PROCESS_INSTANCE_FLAG=whether to recovery process instance PREVIEW_SCHEDULE_NOTES=preview schedule SEARCH_VAL=search val USER_ID=user id @@ -258,28 +258,28 @@ DELETE_PROCESS_INSTANCE_BY_ID_NOTES=delete process instance by process instance TASK_ID=task instance id PROCESS_INSTANCE_IDS=process_instance ids SKIP_LINE_NUM=skip line num -QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log +QUERY_TASK_INSTANCE_LOG_NOTES=query task instance log DOWNLOAD_TASK_INSTANCE_LOG_NOTES=download task instance log USERS_TAG=users related operation SCHEDULER_TAG=scheduler related operation -CREATE_SCHEDULE_NOTES=create schedule +CREATE_SCHEDULE_NOTES=create schedule CREATE_USER_NOTES=create user TENANT_ID=tenant id QUEUE=queue EMAIL=email PHONE=phone -QUERY_USER_LIST_NOTES=query user list +QUERY_USER_LIST_NOTES=query user list UPDATE_USER_NOTES=update user UPDATE_QUEUE_NOTES=update queue DELETE_USER_BY_ID_NOTES=delete user by id -GRANT_PROJECT_NOTES=GRANT PROJECT +GRANT_PROJECT_NOTES=GRANT PROJECT PROJECT_IDS=project ids(string format, multiple projects separated by ",") GRANT_PROJECT_BY_CODE_NOTES=GRANT PROJECT BY CODE REVOKE_PROJECT_NOTES=REVOKE PROJECT FOR USER PROJECT_CODE=project code GRANT_RESOURCE_NOTES=grant resource file RESOURCE_IDS=resource ids(string format, multiple resources separated by ",") -GET_USER_INFO_NOTES=get user info +GET_USER_INFO_NOTES=get user info LIST_USER_NOTES=list user VERIFY_USER_NAME_NOTES=verify user name UNAUTHORIZED_USER_NOTES=cancel authorization @@ -295,12 +295,12 @@ QUERY_UDF_FUNC_LIST_NOTES=query udf funciton list VERIFY_RESOURCE_NAME_NOTES=verify resource name GRANT_UDF_FUNC_NOTES=grant udf function UDF_IDS=udf ids(string format, multiple udf functions separated by ",") -GRANT_DATASOURCE_NOTES=grant datasource +GRANT_DATASOURCE_NOTES=grant datasource DATASOURCE_IDS=datasource ids(string format, multiple datasources separated by ",") QUERY_SUBPROCESS_INSTANCE_BY_TASK_ID_NOTES=query subprocess instance by task instance id QUERY_PARENT_PROCESS_INSTANCE_BY_SUB_PROCESS_INSTANCE_ID_NOTES=query parent process instance info by sub process instance id QUERY_PROCESS_INSTANCE_GLOBAL_VARIABLES_AND_LOCAL_VARIABLES_NOTES=query process instance global variables and local variables -VIEW_GANTT_NOTES=view gantt +VIEW_GANTT_NOTES=view gantt SUB_PROCESS_INSTANCE_ID=sub process instance id TASK_NAME=task instance name TASK_INSTANCE_TAG=task instance related operation @@ -316,9 +316,9 @@ DATA_SOURCE_ID=DATA SOURCE ID QUERY_DATA_SOURCE_NOTES=query data source by id QUERY_DATA_SOURCE_LIST_BY_TYPE_NOTES=query data source list by database type QUERY_DATA_SOURCE_LIST_PAGING_NOTES=query data source list paging -CONNECT_DATA_SOURCE_NOTES=CONNECT DATA SOURCE -CONNECT_DATA_SOURCE_TEST_NOTES=connect data source test -DELETE_DATA_SOURCE_NOTES=delete data source +CONNECT_DATA_SOURCE_NOTES=CONNECT DATA SOURCE +CONNECT_DATA_SOURCE_TEST_NOTES=connect data source test +DELETE_DATA_SOURCE_NOTES=delete data source VERIFY_DATA_SOURCE_NOTES=verify data source UNAUTHORIZED_DATA_SOURCE_NOTES=unauthorized data source AUTHORIZED_DATA_SOURCE_NOTES=authorized data source @@ -336,3 +336,5 @@ DELETE_PROCESS_DEFINITION_VERSION_NOTES=delete process definition version QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=query process definition versions SWITCH_PROCESS_DEFINITION_VERSION_NOTES=switch process definition version VERSION=version +TASK_GROUP_QUEUEID=task group queue id +TASK_GROUP_QUEUE_PRIORITY=task group queue priority diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index a0f1b8d47a..6343d89f30 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -333,3 +333,5 @@ DELETE_PROCESS_DEFINITION_VERSION_NOTES=删除流程历史版本 QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=查询流程历史版本信息 SWITCH_PROCESS_DEFINITION_VERSION_NOTES=切换流程版本 VERSION=版本号 +TASK_GROUP_QUEUEID=任务组队列id +TASK_GROUP_QUEUE_PRIORITY=任务队列优先级 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java index 06e7fc74b2..639bae1896 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java @@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; -import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -49,7 +48,7 @@ public class TaskGroupControllerTest extends AbstractControllerTest { MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("pageNo", "2"); paramsMap.add("pageSize", "2"); - MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-all") + MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-projectCode") .header(SESSION_ID, sessionId) .params(paramsMap)) .andExpect(status().isOk()) @@ -67,7 +66,7 @@ public class TaskGroupControllerTest extends AbstractControllerTest { paramsMap.add("pageNo", "1"); paramsMap.add("name", "TGQ"); paramsMap.add("pageSize", "10"); - MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-name") + MvcResult mvcResult = mockMvc.perform(get("/task-group/list-paging") .header(SESSION_ID, sessionId) .params(paramsMap)) .andExpect(status().isOk()) @@ -186,22 +185,4 @@ public class TaskGroupControllerTest extends AbstractControllerTest { logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); } - @Test - public void testWakeCompulsively() throws Exception { - - MultiValueMap paramsMap = new LinkedMultiValueMap<>(); - paramsMap.add("id", "1"); - paramsMap.add("taskId", "1"); - - MvcResult mvcResult = mockMvc.perform(post("/task-group/wake-task-compulsively") - .header(SESSION_ID, sessionId) - .params(paramsMap)) - .andExpect(status().isCreated()) - .andExpect(content().contentType(MediaType.APPLICATION_JSON)) - .andReturn(); - Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); - logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); - Assert.assertTrue(result != null && (result.isSuccess() || result.isStatus(Status.TASK_GROUP_CACHE_START_FAILED))); - logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString()); - } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java index 4ba950bb28..c3084f008e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java @@ -92,7 +92,7 @@ public class TaskGroupServiceTest { } private TaskGroup getTaskGroup() { - TaskGroup taskGroup = new TaskGroup(taskGroupName, taskGroupDesc, + TaskGroup taskGroup = new TaskGroup(taskGroupName,0, taskGroupDesc, 100, 1,1); return taskGroup; } @@ -109,9 +109,8 @@ public class TaskGroupServiceTest { TaskGroup taskGroup = getTaskGroup(); Mockito.when(taskGroupMapper.insert(taskGroup)).thenReturn(1); Mockito.when(taskGroupMapper.queryByName(loginUser.getId(), taskGroupName)).thenReturn(null); - Map result = taskGroupService.createTaskGroup(loginUser, taskGroupName, taskGroupDesc, 100); - logger.info(result.toString()); - Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + Map result = taskGroupService.createTaskGroup(loginUser,0, taskGroupName, taskGroupDesc, 100); + Assert.assertNotNull(result); } @@ -134,7 +133,7 @@ public class TaskGroupServiceTest { Mockito.eq(null), Mockito.eq(0))).thenReturn(page); // query all - Map result = taskGroupService.queryAllTaskGroup(loginUser, 1, 10); + Map result = taskGroupService.queryAllTaskGroup(loginUser, null, null,1,10); PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST); Assert.assertNotNull(pageInfo.getTotalList()); } @@ -171,7 +170,7 @@ public class TaskGroupServiceTest { TreeMap tm = new TreeMap<>(); tm.put(1, 1); - Map map1 = taskGroupService.wakeTaskcompulsively(getLoginUser(), 1); + Map map1 = taskGroupService.forceStartTask(getLoginUser(), 1); Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS)); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java index d963abe1c9..dfcc11614c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java @@ -88,6 +88,15 @@ public class TaskNode { */ private int retryInterval; + /** + * task group id + */ + private int taskGroupId; + /** + * task group id + */ + private int taskGroupPriority; + /** * params information */ @@ -465,4 +474,20 @@ public class TaskNode { public void setWaitStartTimeout(String waitStartTimeout) { this.waitStartTimeout = waitStartTimeout; } + + public int getTaskGroupId() { + return taskGroupId; + } + + public void setTaskGroupId(int taskGroupId) { + this.taskGroupId = taskGroupId; + } + + public int getTaskGroupPriority() { + return taskGroupPriority; + } + + public void setTaskGroupPriority(int taskGroupPriority) { + this.taskGroupPriority = taskGroupPriority; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java index 6c6e8ff0e4..5841fa3d60 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java @@ -199,6 +199,10 @@ public class TaskDefinition { * task group id */ private int taskGroupId; + /** + * task group id + */ + private int taskGroupPriority; public TaskDefinition() { } @@ -502,4 +506,12 @@ public class TaskDefinition { + ", updateTime=" + updateTime + '}'; } + + public int getTaskGroupPriority() { + return taskGroupPriority; + } + + public void setTaskGroupPriority(int taskGroupPriority) { + this.taskGroupPriority = taskGroupPriority; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java index b7692c3abe..3c2e0210f7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java @@ -67,9 +67,14 @@ public class TaskGroup implements Serializable { */ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date updateTime; + /** + * project Id + */ + private long projectCode; - public TaskGroup(String name, String description, int groupSize, int userId,int status) { + public TaskGroup(String name,long projectCode, String description, int groupSize, int userId,int status) { this.name = name; + this.projectCode = projectCode; this.description = description; this.groupSize = groupSize; this.userId = userId; @@ -173,4 +178,12 @@ public class TaskGroup implements Serializable { public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } + + public long getProjectCode() { + return projectCode; + } + + public void setProjectCode(long projectCode) { + this.projectCode = projectCode; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java index c208dcb626..6ab97aeacd 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java @@ -17,15 +17,15 @@ package org.apache.dolphinscheduler.dao.entity; -import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; - -import java.io.Serializable; -import java.util.Date; - import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; +import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus; + +import java.io.Serializable; +import java.util.Date; /** * Task Group Queue @@ -45,6 +45,21 @@ public class TaskGroupQueue implements Serializable { * TaskInstance name */ private String taskName; + /** + * project name + */ + @TableField(exist = false) + private String projectName; + /** + * project code + */ + @TableField(exist = false) + private String projectCode; + /** + * process instance name + */ + @TableField(exist = false) + private String processInstanceName; /** * taskGroup id */ @@ -162,16 +177,16 @@ public class TaskGroupQueue implements Serializable { @Override public String toString() { return "TaskGroupQueue{" - + "id=" + id - + ", taskId=" + taskId - + ", taskName='" + taskName + '\'' - + ", groupId=" + groupId - + ", processId=" + processId - + ", priority=" + priority - + ", status=" + status - + ", createTime=" + createTime - + ", updateTime=" + updateTime - + '}'; + + "id=" + id + + ", taskId=" + taskId + + ", taskName='" + taskName + '\'' + + ", groupId=" + groupId + + ", processId=" + processId + + ", priority=" + priority + + ", status=" + status + + ", createTime=" + createTime + + ", updateTime=" + updateTime + + '}'; } public TaskGroupQueueStatus getStatus() { @@ -197,4 +212,28 @@ public class TaskGroupQueue implements Serializable { public void setInQueue(int inQueue) { this.inQueue = inQueue; } + + public String getProjectName() { + return projectName; + } + + public void setProjectName(String projectName) { + this.projectName = projectName; + } + + public String getProcessInstanceName() { + return processInstanceName; + } + + public void setProcessInstanceName(String processInstanceName) { + this.processInstanceName = processInstanceName; + } + + public String getProjectCode() { + return projectCode; + } + + public void setProjectCode(String projectCode) { + this.projectCode = projectCode; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index e5f4e60e85..976060cd7a 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -84,6 +84,12 @@ public class TaskInstance implements Serializable { @TableField(exist = false) private String processInstanceName; + /** + * process instance name + */ + @TableField(exist = false) + private int taskGroupPriority; + /** * state */ @@ -736,4 +742,12 @@ public class TaskInstance implements Serializable { public boolean isFirstRun() { return endTime == null; } + + public int getTaskGroupPriority() { + return taskGroupPriority; + } + + public void setTaskGroupPriority(int taskGroupPriority) { + this.taskGroupPriority = taskGroupPriority; + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java index 26511d0b99..6fe80bda99 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.dao.entity.TaskGroup; import org.apache.ibatis.annotations.Param; @@ -60,7 +61,7 @@ public interface TaskGroupMapper extends BaseMapper { * @return result page */ IPage queryTaskGroupPaging(IPage page, @Param("userId") int userId, - @Param("name") String name, @Param("status") int status); + @Param("name") String name, @Param("status") Integer status); /** * query by task group name @@ -75,4 +76,5 @@ public interface TaskGroupMapper extends BaseMapper { int selectCountByIdStatus(@Param("id") int id,@Param("status") int status); + IPage queryTaskGroupPagingByProjectCode(Page page, @Param("projectCode") Long projectCode); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java index 3b2e7d03b1..24938e4f8d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.dao.mapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue; import org.apache.ibatis.annotations.Param; @@ -79,11 +81,16 @@ public interface TaskGroupQueueMapper extends BaseMapper { void updateInQueue(@Param("inQueue") int inQueue, @Param("id") int id); - void updateForceStart(@Param("taskId") int taskId, @Param("forceStart") int forceStart); + void updateForceStart(@Param("queueId") int queueId, @Param("forceStart") int forceStart); int updateInQueueLimit1(@Param("oldValue") int oldValue, @Param("newValue") int newValue , @Param("groupId") int id, @Param("status") int status); int updateInQueueCAS(@Param("oldValue") int oldValue, @Param("newValue") int newValue, @Param("id") int id); + void modifyPriority(@Param("queueId") int queueId, @Param("priority") int priority); + + IPage queryTaskGroupQueueByTaskGroupIdPaging(Page page, @Param("taskName")String taskName + ,@Param("processName") String processName,@Param("status") Integer status,@Param("groupId") int groupId + ,@Param("projects") List projects); } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml index 075ce6c5f3..3dd654a531 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml @@ -25,13 +25,14 @@ + - id,name,description,group_size,use_size,create_time,update_time + id,name,description,project_code,group_size,use_size,create_time,update_time + + update t_ds_task_group diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml index a26471f06f..b9ff9b11ff 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml @@ -78,9 +78,15 @@ + update t_ds_task_group_queue + set priority = #{priority} + where id = #{queueId} + + + update t_ds_task_group_queue set force_start = #{forceStart} - where task_id = #{taskId} + where id = #{queueId} @@ -117,4 +123,32 @@ where task_id = #{taskId} + + diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index e9e6a5113c..bf8819478c 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -477,7 +477,8 @@ CREATE TABLE t_ds_task_definition timeout_notify_strategy tinyint(4) DEFAULT NULL, timeout int(11) DEFAULT '0', delay_time int(11) DEFAULT '0', - task_group_id int(11) DEFAULT NULL, + task_group_id int(11) DEFAULT NULL, + task_group_priority tinyint(4) DEFAULT '0', resource_ids text, create_time datetime NOT NULL, update_time datetime DEFAULT NULL, @@ -512,6 +513,7 @@ CREATE TABLE t_ds_task_definition_log resource_ids text, operator int(11) DEFAULT NULL, task_group_id int(11) DEFAULT NULL, + task_group_priority tinyint(4) DEFAULT '0', operate_time datetime DEFAULT NULL, create_time datetime NOT NULL, update_time datetime DEFAULT NULL, @@ -1067,6 +1069,7 @@ CREATE TABLE t_ds_task_group name varchar(100) DEFAULT NULL , description varchar(200) DEFAULT NULL , group_size int(11) NOT NULL , + project_code bigint(20) DEFAULT '0', use_size int(11) DEFAULT '0' , user_id int(11) DEFAULT NULL , project_id int(11) DEFAULT NULL , diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index eae6e185e9..0bdf2ac664 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -478,6 +478,7 @@ CREATE TABLE `t_ds_task_definition` ( `delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute', `resource_ids` text COMMENT 'resource id, separated by comma', `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', + `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', PRIMARY KEY (`id`,`code`) @@ -510,6 +511,7 @@ CREATE TABLE `t_ds_task_definition_log` ( `resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id', + `task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority', `operate_time` datetime DEFAULT NULL COMMENT 'operate time', `create_time` datetime NOT NULL COMMENT 'create time', `update_time` datetime NOT NULL COMMENT 'update time', @@ -1051,7 +1053,7 @@ CREATE TABLE `t_ds_task_group` ( `group_size` int (11) NOT NULL COMMENT'group size', `use_size` int (11) DEFAULT '0' COMMENT 'used size', `user_id` int(11) DEFAULT NULL COMMENT 'creator id', - `project_id` int(11) DEFAULT NULL COMMENT 'project id', + `project_code` bigint(20) DEFAULT 0 COMMENT 'project code', `status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 0c489cf6fa..a4d2c2e747 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -386,6 +386,7 @@ CREATE TABLE t_ds_task_definition ( timeout int DEFAULT '0' , delay_time int DEFAULT '0' , task_group_id int DEFAULT NULL, + task_group_priority int(4) DEFAULT '0', resource_ids text , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , @@ -418,6 +419,7 @@ CREATE TABLE t_ds_task_definition_log ( resource_ids text , operator int DEFAULT NULL , task_group_id int DEFAULT NULL, + task_group_priority int(4) DEFAULT '0', operate_time timestamp DEFAULT NULL , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , @@ -1015,6 +1017,7 @@ CREATE TABLE t_ds_task_group ( name varchar(100) DEFAULT NULL , description varchar(200) DEFAULT NULL , group_size int NOT NULL , + project_code bigint DEFAULT '0' , use_size int DEFAULT '0' , user_id int DEFAULT NULL , project_id int DEFAULT NULL , diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java index 2ee8b2b7e2..aaa7b54d7b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java @@ -24,6 +24,7 @@ import java.util.Date; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -44,6 +45,7 @@ public class TaskGroupMapperTest extends BaseDaoTest { public TaskGroup insertOne() { TaskGroup taskGroup = new TaskGroup(); taskGroup.setName("task group"); + taskGroup.setId(1); taskGroup.setUserId(1); taskGroup.setStatus(1); taskGroup.setGroupSize(10); @@ -52,7 +54,7 @@ public class TaskGroupMapperTest extends BaseDaoTest { taskGroup.setUpdateTime(date); taskGroup.setUpdateTime(date); - taskGroupMapper.insert(taskGroup); + int i = taskGroupMapper.insert(taskGroup); return taskGroup; } diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java index af203d2626..0cbd532023 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java @@ -26,6 +26,7 @@ import java.util.List; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mock; import org.springframework.beans.factory.annotation.Autowired; public class TaskGroupQueueMapperTest extends BaseDaoTest { diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java deleted file mode 100644 index a0b2e1a5c4..0000000000 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse; - -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; -import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; -import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; -import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; -import org.apache.dolphinscheduler.spi.enums.DbType; -import org.apache.dolphinscheduler.spi.utils.Constants; - -import java.sql.DriverManager; -import java.util.HashMap; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class}) -public class ClickHouseDataSourceProcessorTest { - - private ClickHouseDataSourceProcessor clickHouseDatasourceProcessor = new ClickHouseDataSourceProcessor(); - - @Test - public void testCreateConnectionParams() { - Map props = new HashMap<>(); - props.put("serverTimezone", "utc"); - ClickHouseDataSourceParamDTO clickhouseConnectionParam = new ClickHouseDataSourceParamDTO(); - clickhouseConnectionParam.setUserName("user"); - clickhouseConnectionParam.setPassword("password"); - clickhouseConnectionParam.setHost("localhost"); - clickhouseConnectionParam.setPort(8123); - clickhouseConnectionParam.setDatabase("default"); - clickhouseConnectionParam.setOther(props); - PowerMockito.mockStatic(PasswordUtils.class); - PowerMockito.when(PasswordUtils.encodePassword(Mockito.anyString())).thenReturn("test"); - ClickHouseConnectionParam connectionParams = (ClickHouseConnectionParam) clickHouseDatasourceProcessor - .createConnectionParams(clickhouseConnectionParam); - Assert.assertNotNull(connectionParams); - Assert.assertEquals("jdbc:clickhouse://localhost:8123", connectionParams.getAddress()); - Assert.assertEquals("jdbc:clickhouse://localhost:8123/default", connectionParams.getJdbcUrl()); - } - - @Test - public void testCreateConnectionParams2() { - String connectionParamJson = "{\"address\":\"jdbc:clickhouse://localhost:8123\",\"database\":\"default\"," - + "\"jdbcUrl\":\"jdbc:clickhouse://localhost:8123/default\",\"user\":\"default\",\"password\":\"123456\"}"; - ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) clickHouseDatasourceProcessor - .createConnectionParams(connectionParamJson); - Assert.assertNotNull(clickhouseConnectionParam); - Assert.assertEquals("default", clickhouseConnectionParam.getUser()); - Assert.assertEquals("123456", clickhouseConnectionParam.getPassword()); - } - - @Test - public void testGetDatasourceDriver() { - Assert.assertNotNull(clickHouseDatasourceProcessor.getDatasourceDriver()); - Assert.assertEquals(Constants.COM_CLICKHOUSE_JDBC_DRIVER, clickHouseDatasourceProcessor.getDatasourceDriver()); - } - - @Test - public void testGetJdbcUrl() { - ClickHouseConnectionParam connectionParam = new ClickHouseConnectionParam(); - connectionParam.setUser("default"); - connectionParam.setJdbcUrl("jdbc:clickhouse://localhost:8123/default"); - connectionParam.setOther("other=other1"); - String jdbcUrl = clickHouseDatasourceProcessor.getJdbcUrl(connectionParam); - Assert.assertEquals("jdbc:clickhouse://localhost:8123/default?other=other1", jdbcUrl); - } - - @Test - public void testGetDbType() { - Assert.assertEquals(DbType.CLICKHOUSE, clickHouseDatasourceProcessor.getDbType()); - } - - @Test - public void testGetValidationQuery() { - Assert.assertEquals(Constants.CLICKHOUSE_VALIDATION_QUERY, clickHouseDatasourceProcessor.getValidationQuery()); - } -} \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index c25d9ad5e0..fbc4fa8223 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -804,10 +804,6 @@ public class WorkflowExecuteThread { && taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) { notifyProcessHostUpdate(taskInstance); } - TaskDefinition taskDefinition = processService.findTaskDefinition( - taskInstance.getTaskCode(), - taskInstance.getTaskDefinitionVersion()); - taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId()); // package task instance before submit processService.packageTaskInstance(taskInstance, processInstance); @@ -920,6 +916,10 @@ public class WorkflowExecuteThread { //set task param taskInstance.setTaskParams(taskNode.getTaskParams()); + //set task group and priority + taskInstance.setTaskGroupId(taskNode.getTaskGroupId()); + taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority()); + // task instance priority if (taskNode.getTaskInstancePriority() == null) { taskInstance.setTaskInstancePriority(Priority.MEDIUM); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index c6258ff4e8..dc8ec8c97f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2510,6 +2510,8 @@ public class ProcessService { taskDefinitionLog.getTimeout()))); taskNode.setDelayTime(taskDefinitionLog.getDelayTime()); taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList()))); + taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId()); + taskNode.setTaskGroupPriority(taskDefinitionLog.getTaskGroupPriority()); taskNodeList.add(taskNode); } }