Browse Source

[Feature-#6422] [api-server] task group queue (#7491)

* add task group

* modify task group

* pull dev

* add license header

* modify code style

* fix code style

* fix sql error

* fix error

* fix test

* fix test

* fix test

* fix test

* fix code style

* fix ut

* code style

* fix unit test

* test ut

* ut

* add unittest

* test ut

* modify back ut

* majorization code

* fix conflict

* fix ut

* add task group api

* reset file

* fix ut

* fix lost column

* fix ut

* fix ut

* fix ut

* fix ut

* delete duplicate code

* fix code style 、name

* fix ut

* fix mapper

Co-authored-by: wangxj <wangxj31>
3.0.0/version-upgrade
wangxj3 3 years ago committed by GitHub
parent
commit
9e763ad0d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 97
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java
  2. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java
  3. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java
  4. 43
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java
  5. 84
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java
  6. 2
      dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
  7. 2
      dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
  8. 23
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java
  9. 11
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java
  10. 25
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java
  11. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  12. 15
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java
  13. 49
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java
  14. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  15. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java
  16. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java
  17. 13
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml
  18. 36
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml
  19. 3
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  20. 4
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  21. 3
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  22. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java
  23. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java
  24. 101
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java
  25. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
  26. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

97
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskGroupController.java

@ -17,13 +17,10 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
@ -43,13 +40,17 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
import static org.apache.dolphinscheduler.api.enums.Status.CLOSE_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_LIST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_GROUP_QUEUE_LIST_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.START_TASK_GROUP_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_TASK_GROUP_ERROR;
/**
* task group controller
@ -72,9 +73,10 @@ public class TaskGroupController extends BaseController {
* @param name project id
* @return result and msg code
*/
@ApiOperation(value = "createTaskGroup", notes = "CREATE_TAKS_GROUP_NOTE")
@ApiOperation(value = "create", notes = "CREATE_TAKS_GROUP_NOTE")
@ApiImplicitParams({
@ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", type = "Long"),
@ApiImplicitParam(name = "description", value = "DESCRIPTION", dataType = "String"),
@ApiImplicitParam(name = "groupSize", value = "GROUPSIZE", dataType = "Int"),
@ -85,9 +87,10 @@ public class TaskGroupController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("name") String name,
@RequestParam(value = "projectCode", required = false, defaultValue = "0") Long projectcode,
@RequestParam("description") String description,
@RequestParam("groupSize") Integer groupSize) {
Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, name, description, groupSize);
Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, projectcode, name, description, groupSize);
return returnDataList(result);
}
@ -101,7 +104,7 @@ public class TaskGroupController extends BaseController {
* @param name project id
* @return result and msg code
*/
@ApiOperation(value = "updateTaskGroup", notes = "UPDATE_TAKS_GROUP_NOTE")
@ApiOperation(value = "update", notes = "UPDATE_TAKS_GROUP_NOTE")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "id", dataType = "Int"),
@ApiImplicitParam(name = "name", value = "NAME", dataType = "String"),
@ -130,19 +133,22 @@ public class TaskGroupController extends BaseController {
* @param pageSize page size
* @return queue list
*/
@ApiOperation(value = "queryAllTaskGroup", notes = "QUERY_ALL_TASK_GROUP_NOTES")
@ApiOperation(value = "list-paging", notes = "QUERY_ALL_TASK_GROUP_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "name", value = "NAME", required = false, dataType = "String"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20")
})
@GetMapping(value = "/query-list-all")
@GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryAllTaskGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "name", required = false) String name,
@RequestParam(value = "status", required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, pageNo, pageSize);
Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, name, status, pageNo, pageSize);
return returnDataList(result);
}
@ -178,7 +184,7 @@ public class TaskGroupController extends BaseController {
*
* @param loginUser login user
* @param pageNo page number
* @param name project id
* @param projectCode project id
* @param pageSize page size
* @return queue list
*/
@ -186,17 +192,17 @@ public class TaskGroupController extends BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"),
@ApiImplicitParam(name = "name", value = "PROJECT_ID", required = true, dataType = "String")
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, dataType = "String")
})
@GetMapping(value = "/query-list-by-name")
@GetMapping(value = "/query-list-by-projectCode")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_GROUP_LIST_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskGroupByName(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result queryTaskGroupByCode(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer pageNo,
@RequestParam(value = "name", required = false) String name,
@RequestParam(value = "projectCode", required = false) Long projectCode,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupService.queryTaskGroupByName(loginUser, pageNo, pageSize, name);
Map<String, Object> result = taskGroupService.queryTaskGroupByProjectCode(loginUser, pageNo, pageSize, projectCode);
return returnDataList(result);
}
@ -247,20 +253,43 @@ public class TaskGroupController extends BaseController {
* force start task without task group
*
* @param loginUser login user
* @param taskId task id
* @param queueId task group queue id
* @return result
*/
@ApiOperation(value = "forceStart", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "queueId", value = "TASK_GROUP_QUEUEID", required = true, dataType = "Int")
})
@PostMapping(value = "/forceStart")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(START_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result forceStart(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "queueId") Integer queueId) {
Map<String, Object> result = taskGroupService.forceStartTask(loginUser, queueId);
return returnDataList(result);
}
/**
* force start task without task group
*
* @param loginUser login user
* @param queueId task group queue id
* @return result
*/
@ApiOperation(value = "wakeCompulsively", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
@ApiOperation(value = "modifyPriority", notes = "WAKE_TASK_COMPULSIVELY_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskId", value = "TASKID", required = true, dataType = "Int")
@ApiImplicitParam(name = "queueId", value = "TASK_GROUP_QUEUEID", required = true, dataType = "Int"),
@ApiImplicitParam(name = "priority", value = "TASK_GROUP_QUEUE_PRIORITY", required = true, dataType = "Int")
})
@PostMapping(value = "/wake-task-compulsively")
@PostMapping(value = "/modifyPriority")
@ResponseStatus(HttpStatus.CREATED)
@ApiException(START_TASK_GROUP_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result wakeCompulsively(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "taskId") Integer taskId) {
Map<String, Object> result = taskGroupService.wakeTaskcompulsively(loginUser, taskId);
public Result modifyPriority(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "queueId") Integer queueId,
@RequestParam(value = "priority") Integer priority) {
Map<String, Object> result = taskGroupService.modifyPriority(loginUser, queueId,priority);
return returnDataList(result);
}
@ -287,9 +316,13 @@ public class TaskGroupController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTasksByGroupId(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("groupId") Integer groupId,
@RequestParam(value = "taskInstanceName",required = false) String taskName,
@RequestParam(value = "processInstanceName",required = false) String processName,
@RequestParam(value = "status",required = false) Integer status,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, groupId, pageNo, pageSize);
Map<String, Object> result = taskGroupQueueService.queryTasksByGroupId(loginUser, taskName,processName,status,
groupId, pageNo, pageSize);
return returnDataList(result);
}

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupQueueService.java

@ -35,8 +35,8 @@ public interface TaskGroupQueueService {
* @return tasks list
*/
Map<String, Object> queryTasksByGroupId(User loginUser, int groupId, int pageNo,
int pageSize);
Map<String, Object> queryTasksByGroupId(User loginUser, String taskName
, String processName, Integer status, int groupId, int pageNo,int pageSize);
/**
* query tasks in task group queue by project id
@ -65,5 +65,7 @@ public interface TaskGroupQueueService {
*/
boolean deleteByTaskId(int taskId);
void forceStartTask(int taskId,int forceStart);
void forceStartTask(int queueId,int forceStart);
void modifyPriority(Integer queueId, Integer priority);
}

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskGroupService.java

@ -35,7 +35,7 @@ public interface TaskGroupService {
* @param groupSize task group total size
* @return the result code and msg
*/
Map<String, Object> createTaskGroup(User loginUser, String name,
Map<String, Object> createTaskGroup(User loginUser, long projectcode,String name,
String description, int groupSize);
/**
@ -66,7 +66,7 @@ public interface TaskGroupService {
* @param pageSize page size
* @return the result code and msg
*/
Map<String, Object> queryAllTaskGroup(User loginUser, int pageNo, int pageSize);
Map<String, Object> queryAllTaskGroup(User loginUser, String name,Integer status, int pageNo, int pageSize);
/**
* query all task group by status
@ -88,7 +88,7 @@ public interface TaskGroupService {
* @param name name
* @return the result code and msg
*/
Map<String, Object> queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name);
Map<String, Object> queryTaskGroupByProjectCode(User loginUser, int pageNo, int pageSize, Long projectCode);
/**
* query all task group by id
@ -110,7 +110,7 @@ public interface TaskGroupService {
* @param status status
* @return the result code and msg
*/
Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status);
Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, Integer status);
/**
* close a task group
@ -136,5 +136,7 @@ public interface TaskGroupService {
* @param taskId task id
* @return result
*/
Map<String, Object> wakeTaskcompulsively(User loginUser, int taskId);
Map<String, Object> forceStartTask(User loginUser, int taskId);
Map<String, Object> modifyPriority(User loginUser, Integer queueId, Integer priority);
}

43
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupQueueServiceImpl.java

@ -17,25 +17,26 @@
package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupQueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* task group queue service
@ -49,6 +50,9 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
@Autowired
private TaskInstanceMapper taskInstanceMapper;
@Autowired
private ProjectService projectService;
private static final Logger logger = LoggerFactory.getLogger(TaskGroupQueueServiceImpl.class);
/**
@ -61,8 +65,22 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
* @return tasks list
*/
@Override
public Map<String, Object> queryTasksByGroupId(User loginUser, int groupId, int pageNo, int pageSize) {
return this.doQuery(loginUser, pageNo, pageSize, groupId);
public Map<String, Object> queryTasksByGroupId(User loginUser, String taskName
, String processName, Integer status, int groupId, int pageNo, int pageSize) {
Map<String, Object> result = new HashMap<>();
Page<TaskGroupQueue> page = new Page<>(pageNo, pageSize);
Map<String, Object> objectMap = this.projectService.queryAuthorizedProject(loginUser, loginUser.getId());
List<Project> projects = (List<Project>)objectMap.get(Constants.DATA_LIST);
IPage<TaskGroupQueue> taskGroupQueue = taskGroupQueueMapper.queryTaskGroupQueueByTaskGroupIdPaging(page, taskName
,processName,status,groupId,projects);
PageInfo<TaskGroupQueue> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) taskGroupQueue.getTotal());
pageInfo.setTotalList(taskGroupQueue.getRecords());
result.put(Constants.DATA_LIST, pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
/**
@ -124,7 +142,12 @@ public class TaskGroupQueueServiceImpl extends BaseServiceImpl implements TaskGr
}
@Override
public void forceStartTask(int taskId,int forceStart) {
taskGroupQueueMapper.updateForceStart(taskId,forceStart);
public void forceStartTask(int queueId,int forceStart) {
taskGroupQueueMapper.updateForceStart(queueId,forceStart);
}
@Override
public void modifyPriority(Integer queueId, Integer priority) {
taskGroupQueueMapper.modifyPriority(queueId,priority);
}
}

84
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskGroupServiceImpl.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.api.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskGroupQueueService;
import org.apache.dolphinscheduler.api.service.TaskGroupService;
@ -28,6 +30,10 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
@ -35,14 +41,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task Group Service
*/
@ -70,7 +68,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* @return the result code and msg
*/
@Override
public Map<String, Object> createTaskGroup(User loginUser, String name, String description, int groupSize) {
public Map<String, Object> createTaskGroup(User loginUser, long projectcode, String name, String description, int groupSize) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
@ -82,18 +80,20 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
if (groupSize <= 0) {
putMsg(result, Status.TASK_GROUP_SIZE_ERROR);
return result;
}
TaskGroup taskGroup1 = taskGroupMapper.queryByName(loginUser.getId(), name);
if (taskGroup1 != null) {
putMsg(result, Status.TASK_GROUP_NAME_EXSIT);
return result;
}
TaskGroup taskGroup = new TaskGroup(name, description,
TaskGroup taskGroup = new TaskGroup(name, projectcode, description,
groupSize, loginUser.getId(), Flag.YES.getCode());
int insert = taskGroupMapper.insert(taskGroup);
logger.info("insert result:{}", insert);
if (taskGroupMapper.insert(taskGroup) > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.CREATE_TASK_GROUP_ERROR);
return result;
}
return result;
}
@ -149,8 +149,8 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* @return the result code and msg
*/
@Override
public Map<String, Object> queryAllTaskGroup(User loginUser, int pageNo, int pageSize) {
return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), null, 0);
public Map<String, Object> queryAllTaskGroup(User loginUser, String name, Integer status, int pageNo, int pageSize) {
return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, status);
}
/**
@ -177,8 +177,28 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* @return the result code and msg
*/
@Override
public Map<String, Object> queryTaskGroupByName(User loginUser, int pageNo, int pageSize, String name) {
return this.doQuery(loginUser, pageNo, pageSize, loginUser.getId(), name, 0);
public Map<String, Object> queryTaskGroupByProjectCode(User loginUser, int pageNo, int pageSize, Long projectCode) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
Page<TaskGroup> page = new Page<>(pageNo, pageSize);
IPage<TaskGroup> taskGroupPaging = taskGroupMapper.queryTaskGroupPagingByProjectCode(page, projectCode);
return getStringObjectMap(pageNo, pageSize, result, taskGroupPaging);
}
private Map<String, Object> getStringObjectMap(int pageNo, int pageSize, Map<String, Object> result, IPage<TaskGroup> taskGroupPaging) {
PageInfo<TaskGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal();
List<TaskGroup> list = taskGroupPaging == null ? new ArrayList<TaskGroup>() : taskGroupPaging.getRecords();
pageInfo.setTotal(total);
pageInfo.setTotalList(list);
result.put(Constants.DATA_LIST, pageInfo);
logger.info("select result:{}", taskGroupPaging);
putMsg(result, Status.SUCCESS);
return result;
}
/**
@ -212,7 +232,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
*/
@Override
public Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, int status) {
public Map<String, Object> doQuery(User loginUser, int pageNo, int pageSize, int userId, String name, Integer status) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
@ -220,16 +240,7 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
Page<TaskGroup> page = new Page<>(pageNo, pageSize);
IPage<TaskGroup> taskGroupPaging = taskGroupMapper.queryTaskGroupPaging(page, userId, name, status);
PageInfo<TaskGroup> pageInfo = new PageInfo<>(pageNo, pageSize);
int total = taskGroupPaging == null ? 0 : (int) taskGroupPaging.getTotal();
List<TaskGroup> list = taskGroupPaging == null ? new ArrayList<TaskGroup>() : taskGroupPaging.getRecords();
pageInfo.setTotal(total);
pageInfo.setTotalList(list);
result.put(Constants.DATA_LIST, pageInfo);
logger.info("select result:{}", taskGroupPaging);
putMsg(result, Status.SUCCESS);
return result;
return getStringObjectMap(pageNo, pageSize, result, taskGroupPaging);
}
/**
@ -282,16 +293,27 @@ public class TaskGroupServiceImpl extends BaseServiceImpl implements TaskGroupSe
* wake a task manually
*
* @param loginUser
* @param taskId task id
* @param queueId task group queue id
* @return result
*/
@Override
public Map<String, Object> wakeTaskcompulsively(User loginUser, int taskId) {
public Map<String, Object> forceStartTask(User loginUser, int queueId) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
taskGroupQueueService.forceStartTask(queueId, Flag.YES.getCode());
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public Map<String, Object> modifyPriority(User loginUser, Integer queueId, Integer priority) {
Map<String, Object> result = new HashMap<>();
if (isNotAdmin(loginUser, result)) {
return result;
}
taskGroupQueueService.forceStartTask(taskId,Flag.YES.getCode());
taskGroupQueueService.modifyPriority(queueId, priority);
putMsg(result, Status.SUCCESS);
return result;
}

2
dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties

@ -336,3 +336,5 @@ DELETE_PROCESS_DEFINITION_VERSION_NOTES=delete process definition version
QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=query process definition versions
SWITCH_PROCESS_DEFINITION_VERSION_NOTES=switch process definition version
VERSION=version
TASK_GROUP_QUEUEID=task group queue id
TASK_GROUP_QUEUE_PRIORITY=task group queue priority

2
dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@ -333,3 +333,5 @@ DELETE_PROCESS_DEFINITION_VERSION_NOTES=删除流程历史版本
QUERY_PROCESS_DEFINITION_VERSIONS_NOTES=查询流程历史版本信息
SWITCH_PROCESS_DEFINITION_VERSION_NOTES=切换流程版本
VERSION=版本号
TASK_GROUP_QUEUEID=任务组队列id
TASK_GROUP_QUEUE_PRIORITY=任务队列优先级

23
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TaskGroupControllerTest.java

@ -22,7 +22,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -49,7 +48,7 @@ public class TaskGroupControllerTest extends AbstractControllerTest {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("pageNo", "2");
paramsMap.add("pageSize", "2");
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-all")
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-projectCode")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
@ -67,7 +66,7 @@ public class TaskGroupControllerTest extends AbstractControllerTest {
paramsMap.add("pageNo", "1");
paramsMap.add("name", "TGQ");
paramsMap.add("pageSize", "10");
MvcResult mvcResult = mockMvc.perform(get("/task-group/query-list-by-name")
MvcResult mvcResult = mockMvc.perform(get("/task-group/list-paging")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
@ -186,22 +185,4 @@ public class TaskGroupControllerTest extends AbstractControllerTest {
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}
@Test
public void testWakeCompulsively() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("id", "1");
paramsMap.add("taskId", "1");
MvcResult mvcResult = mockMvc.perform(post("/task-group/wake-task-compulsively")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isCreated())
.andExpect(content().contentType(MediaType.APPLICATION_JSON))
.andReturn();
Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
Assert.assertTrue(result != null && (result.isSuccess() || result.isStatus(Status.TASK_GROUP_CACHE_START_FAILED)));
logger.info("update queue return result:{}", mvcResult.getResponse().getContentAsString());
}
}

11
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskGroupServiceTest.java

@ -92,7 +92,7 @@ public class TaskGroupServiceTest {
}
private TaskGroup getTaskGroup() {
TaskGroup taskGroup = new TaskGroup(taskGroupName, taskGroupDesc,
TaskGroup taskGroup = new TaskGroup(taskGroupName,0, taskGroupDesc,
100, 1,1);
return taskGroup;
}
@ -109,9 +109,8 @@ public class TaskGroupServiceTest {
TaskGroup taskGroup = getTaskGroup();
Mockito.when(taskGroupMapper.insert(taskGroup)).thenReturn(1);
Mockito.when(taskGroupMapper.queryByName(loginUser.getId(), taskGroupName)).thenReturn(null);
Map<String, Object> result = taskGroupService.createTaskGroup(loginUser, taskGroupName, taskGroupDesc, 100);
logger.info(result.toString());
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
Map<String, Object> result = taskGroupService.createTaskGroup(loginUser,0, taskGroupName, taskGroupDesc, 100);
Assert.assertNotNull(result);
}
@ -134,7 +133,7 @@ public class TaskGroupServiceTest {
Mockito.eq(null), Mockito.eq(0))).thenReturn(page);
// query all
Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, 1, 10);
Map<String, Object> result = taskGroupService.queryAllTaskGroup(loginUser, null, null,1,10);
PageInfo<TaskGroup> pageInfo = (PageInfo<TaskGroup>) result.get(Constants.DATA_LIST);
Assert.assertNotNull(pageInfo.getTotalList());
}
@ -171,7 +170,7 @@ public class TaskGroupServiceTest {
TreeMap<Integer, Integer> tm = new TreeMap<>();
tm.put(1, 1);
Map<String, Object> map1 = taskGroupService.wakeTaskcompulsively(getLoginUser(), 1);
Map<String, Object> map1 = taskGroupService.forceStartTask(getLoginUser(), 1);
Assert.assertEquals(Status.SUCCESS, map1.get(Constants.STATUS));
}

25
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/TaskNode.java

@ -88,6 +88,15 @@ public class TaskNode {
*/
private int retryInterval;
/**
* task group id
*/
private int taskGroupId;
/**
* task group id
*/
private int taskGroupPriority;
/**
* params information
*/
@ -465,4 +474,20 @@ public class TaskNode {
public void setWaitStartTimeout(String waitStartTimeout) {
this.waitStartTimeout = waitStartTimeout;
}
public int getTaskGroupId() {
return taskGroupId;
}
public void setTaskGroupId(int taskGroupId) {
this.taskGroupId = taskGroupId;
}
public int getTaskGroupPriority() {
return taskGroupPriority;
}
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -199,6 +199,10 @@ public class TaskDefinition {
* task group id
*/
private int taskGroupId;
/**
* task group id
*/
private int taskGroupPriority;
public TaskDefinition() {
}
@ -502,4 +506,12 @@ public class TaskDefinition {
+ ", updateTime=" + updateTime
+ '}';
}
public int getTaskGroupPriority() {
return taskGroupPriority;
}
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
}

15
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroup.java

@ -67,9 +67,14 @@ public class TaskGroup implements Serializable {
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
/**
* project Id
*/
private long projectCode;
public TaskGroup(String name, String description, int groupSize, int userId,int status) {
public TaskGroup(String name,long projectCode, String description, int groupSize, int userId,int status) {
this.name = name;
this.projectCode = projectCode;
this.description = description;
this.groupSize = groupSize;
this.userId = userId;
@ -173,4 +178,12 @@ public class TaskGroup implements Serializable {
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
}

49
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskGroupQueue.java

@ -17,15 +17,15 @@
package org.apache.dolphinscheduler.dao.entity;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import java.io.Serializable;
import java.util.Date;
/**
* Task Group Queue
@ -45,6 +45,21 @@ public class TaskGroupQueue implements Serializable {
* TaskInstance name
*/
private String taskName;
/**
* project name
*/
@TableField(exist = false)
private String projectName;
/**
* project code
*/
@TableField(exist = false)
private String projectCode;
/**
* process instance name
*/
@TableField(exist = false)
private String processInstanceName;
/**
* taskGroup id
*/
@ -197,4 +212,28 @@ public class TaskGroupQueue implements Serializable {
public void setInQueue(int inQueue) {
this.inQueue = inQueue;
}
public String getProjectName() {
return projectName;
}
public void setProjectName(String projectName) {
this.projectName = projectName;
}
public String getProcessInstanceName() {
return processInstanceName;
}
public void setProcessInstanceName(String processInstanceName) {
this.processInstanceName = processInstanceName;
}
public String getProjectCode() {
return projectCode;
}
public void setProjectCode(String projectCode) {
this.projectCode = projectCode;
}
}

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -84,6 +84,12 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private String processInstanceName;
/**
* process instance name
*/
@TableField(exist = false)
private int taskGroupPriority;
/**
* state
*/
@ -736,4 +742,12 @@ public class TaskInstance implements Serializable {
public boolean isFirstRun() {
return endTime == null;
}
public int getTaskGroupPriority() {
return taskGroupPriority;
}
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.ibatis.annotations.Param;
@ -60,7 +61,7 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
* @return result page
*/
IPage<TaskGroup> queryTaskGroupPaging(IPage<TaskGroup> page, @Param("userId") int userId,
@Param("name") String name, @Param("status") int status);
@Param("name") String name, @Param("status") Integer status);
/**
* query by task group name
@ -75,4 +76,5 @@ public interface TaskGroupMapper extends BaseMapper<TaskGroup> {
int selectCountByIdStatus(@Param("id") int id,@Param("status") int status);
IPage<TaskGroup> queryTaskGroupPagingByProjectCode(Page<TaskGroup> page, @Param("projectCode") Long projectCode);
}

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.dao.mapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.ibatis.annotations.Param;
@ -79,11 +81,16 @@ public interface TaskGroupQueueMapper extends BaseMapper<TaskGroupQueue> {
void updateInQueue(@Param("inQueue") int inQueue, @Param("id") int id);
void updateForceStart(@Param("taskId") int taskId, @Param("forceStart") int forceStart);
void updateForceStart(@Param("queueId") int queueId, @Param("forceStart") int forceStart);
int updateInQueueLimit1(@Param("oldValue") int oldValue, @Param("newValue") int newValue
, @Param("groupId") int id, @Param("status") int status);
int updateInQueueCAS(@Param("oldValue") int oldValue, @Param("newValue") int newValue, @Param("id") int id);
void modifyPriority(@Param("queueId") int queueId, @Param("priority") int priority);
IPage<TaskGroupQueue> queryTaskGroupQueueByTaskGroupIdPaging(Page<TaskGroupQueue> page, @Param("taskName")String taskName
,@Param("processName") String processName,@Param("status") Integer status,@Param("groupId") int groupId
,@Param("projects") List<Project> projects);
}

13
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapper.xml

@ -25,13 +25,14 @@
<result property="groupSize" column="group_size" jdbcType="INTEGER"/>
<result property="useSize" column="use_size" jdbcType="INTEGER"/>
<result property="userId" column="user_id" jdbcType="INTEGER"/>
<result property="projectCode" column="project_code" jdbcType="INTEGER"/>
<result property="status" column="status" jdbcType="INTEGER"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
</resultMap>
<sql id = "baseSql">
id,name,description,group_size,use_size,create_time,update_time
id,name,description,project_code,group_size,use_size,create_time,update_time
</sql>
<select id="queryTaskGroupPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
@ -43,7 +44,7 @@
<if test="userId != 0">
and user_id = #{userId}
</if>
<if test="status != 0">
<if test="status != null">
and status = #{status}
</if>
<if test="name != null and name != '' ">
@ -52,6 +53,14 @@
</where>
</select>
<select id="queryTaskGroupPagingByProjectCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroup">
select
<include refid="baseSql">
</include>
from t_ds_task_group
where project_code in ( #{projectCode} , 0)
</select>
<!--modify data by id-->
<update id="updateTaskGroupResource">
update t_ds_task_group

36
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapper.xml

@ -78,9 +78,15 @@
</update>
<update id="updateForceStart">
update t_ds_task_group_queue
set priority = #{priority}
where id = #{queueId}
</update>
<update id="modifyPriority">
update t_ds_task_group_queue
set force_start = #{forceStart}
where task_id = #{taskId}
where id = #{queueId}
</update>
<update id="updateInQueueLimit1">
@ -117,4 +123,32 @@
where task_id = #{taskId}
</select>
<select id="queryTaskGroupQueueByTaskGroupIdPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskGroupQueue">
select
queue.id, queue.task_name, queue.group_id, queue.process_id, queue.priority, queue.status
, queue.force_start, queue.create_time, queue.update_time,
process.name as processInstanceName,p.name as projectName,p.code as projectCode
from t_ds_task_group_queue queue
left join t_ds_process_instance process on queue.process_id = process.id
left join t_ds_process_definition p_f on process.process_definition_code = p_f.code
and process.process_definition_version = p_f.version
join t_ds_project as p on p_f.project_code = p.code
where queue.group_id = #{groupId}
<if test="taskName != null and taskName != ''">
and task_name =#{taskName}
</if>
<if test="processName != null and processName != ''">
and process.name =#{processName}
</if>
<if test="status != null">
and queue.status =#{status}
</if>
<if test="projects != null and projects.size() > 0">
and p.code in
<foreach collection="projects" index="index" item="i" open="(" separator="," close=")">
#{i.code}
</foreach>
</if>
</select>
</mapper>

3
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -478,6 +478,7 @@ CREATE TABLE t_ds_task_definition
timeout int(11) DEFAULT '0',
delay_time int(11) DEFAULT '0',
task_group_id int(11) DEFAULT NULL,
task_group_priority tinyint(4) DEFAULT '0',
resource_ids text,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
@ -512,6 +513,7 @@ CREATE TABLE t_ds_task_definition_log
resource_ids text,
operator int(11) DEFAULT NULL,
task_group_id int(11) DEFAULT NULL,
task_group_priority tinyint(4) DEFAULT '0',
operate_time datetime DEFAULT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
@ -1067,6 +1069,7 @@ CREATE TABLE t_ds_task_group
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
group_size int(11) NOT NULL ,
project_code bigint(20) DEFAULT '0',
use_size int(11) DEFAULT '0' ,
user_id int(11) DEFAULT NULL ,
project_id int(11) DEFAULT NULL ,

4
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -478,6 +478,7 @@ CREATE TABLE `t_ds_task_definition` (
`delay_time` int(11) DEFAULT '0' COMMENT 'delay execution time,unit: minute',
`resource_ids` text COMMENT 'resource id, separated by comma',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
PRIMARY KEY (`id`,`code`)
@ -510,6 +511,7 @@ CREATE TABLE `t_ds_task_definition_log` (
`resource_ids` text DEFAULT NULL COMMENT 'resource id, separated by comma',
`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`task_group_id` int(11) DEFAULT NULL COMMENT 'task group id',
`task_group_priority` tinyint(4) DEFAULT 1 COMMENT 'task group priority',
`operate_time` datetime DEFAULT NULL COMMENT 'operate time',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime NOT NULL COMMENT 'update time',
@ -1051,7 +1053,7 @@ CREATE TABLE `t_ds_task_group` (
`group_size` int (11) NOT NULL COMMENT'group size',
`use_size` int (11) DEFAULT '0' COMMENT 'used size',
`user_id` int(11) DEFAULT NULL COMMENT 'creator id',
`project_id` int(11) DEFAULT NULL COMMENT 'project id',
`project_code` bigint(20) DEFAULT 0 COMMENT 'project code',
`status` tinyint(4) DEFAULT '1' COMMENT '0 not available, 1 available',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

3
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -386,6 +386,7 @@ CREATE TABLE t_ds_task_definition (
timeout int DEFAULT '0' ,
delay_time int DEFAULT '0' ,
task_group_id int DEFAULT NULL,
task_group_priority int(4) DEFAULT '0',
resource_ids text ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
@ -418,6 +419,7 @@ CREATE TABLE t_ds_task_definition_log (
resource_ids text ,
operator int DEFAULT NULL ,
task_group_id int DEFAULT NULL,
task_group_priority int(4) DEFAULT '0',
operate_time timestamp DEFAULT NULL ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
@ -1015,6 +1017,7 @@ CREATE TABLE t_ds_task_group (
name varchar(100) DEFAULT NULL ,
description varchar(200) DEFAULT NULL ,
group_size int NOT NULL ,
project_code bigint DEFAULT '0' ,
use_size int DEFAULT '0' ,
user_id int DEFAULT NULL ,
project_id int DEFAULT NULL ,

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupMapperTest.java

@ -24,6 +24,7 @@ import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -44,6 +45,7 @@ public class TaskGroupMapperTest extends BaseDaoTest {
public TaskGroup insertOne() {
TaskGroup taskGroup = new TaskGroup();
taskGroup.setName("task group");
taskGroup.setId(1);
taskGroup.setUserId(1);
taskGroup.setStatus(1);
taskGroup.setGroupSize(10);
@ -52,7 +54,7 @@ public class TaskGroupMapperTest extends BaseDaoTest {
taskGroup.setUpdateTime(date);
taskGroup.setUpdateTime(date);
taskGroupMapper.insert(taskGroup);
int i = taskGroupMapper.insert(taskGroup);
return taskGroup;
}

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskGroupQueueMapperTest.java

@ -26,6 +26,7 @@ import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;
import org.springframework.beans.factory.annotation.Autowired;
public class TaskGroupQueueMapperTest extends BaseDaoTest {

101
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessorTest.java

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
public class ClickHouseDataSourceProcessorTest {
private ClickHouseDataSourceProcessor clickHouseDatasourceProcessor = new ClickHouseDataSourceProcessor();
@Test
public void testCreateConnectionParams() {
Map<String, String> props = new HashMap<>();
props.put("serverTimezone", "utc");
ClickHouseDataSourceParamDTO clickhouseConnectionParam = new ClickHouseDataSourceParamDTO();
clickhouseConnectionParam.setUserName("user");
clickhouseConnectionParam.setPassword("password");
clickhouseConnectionParam.setHost("localhost");
clickhouseConnectionParam.setPort(8123);
clickhouseConnectionParam.setDatabase("default");
clickhouseConnectionParam.setOther(props);
PowerMockito.mockStatic(PasswordUtils.class);
PowerMockito.when(PasswordUtils.encodePassword(Mockito.anyString())).thenReturn("test");
ClickHouseConnectionParam connectionParams = (ClickHouseConnectionParam) clickHouseDatasourceProcessor
.createConnectionParams(clickhouseConnectionParam);
Assert.assertNotNull(connectionParams);
Assert.assertEquals("jdbc:clickhouse://localhost:8123", connectionParams.getAddress());
Assert.assertEquals("jdbc:clickhouse://localhost:8123/default", connectionParams.getJdbcUrl());
}
@Test
public void testCreateConnectionParams2() {
String connectionParamJson = "{\"address\":\"jdbc:clickhouse://localhost:8123\",\"database\":\"default\","
+ "\"jdbcUrl\":\"jdbc:clickhouse://localhost:8123/default\",\"user\":\"default\",\"password\":\"123456\"}";
ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) clickHouseDatasourceProcessor
.createConnectionParams(connectionParamJson);
Assert.assertNotNull(clickhouseConnectionParam);
Assert.assertEquals("default", clickhouseConnectionParam.getUser());
Assert.assertEquals("123456", clickhouseConnectionParam.getPassword());
}
@Test
public void testGetDatasourceDriver() {
Assert.assertNotNull(clickHouseDatasourceProcessor.getDatasourceDriver());
Assert.assertEquals(Constants.COM_CLICKHOUSE_JDBC_DRIVER, clickHouseDatasourceProcessor.getDatasourceDriver());
}
@Test
public void testGetJdbcUrl() {
ClickHouseConnectionParam connectionParam = new ClickHouseConnectionParam();
connectionParam.setUser("default");
connectionParam.setJdbcUrl("jdbc:clickhouse://localhost:8123/default");
connectionParam.setOther("other=other1");
String jdbcUrl = clickHouseDatasourceProcessor.getJdbcUrl(connectionParam);
Assert.assertEquals("jdbc:clickhouse://localhost:8123/default?other=other1", jdbcUrl);
}
@Test
public void testGetDbType() {
Assert.assertEquals(DbType.CLICKHOUSE, clickHouseDatasourceProcessor.getDbType());
}
@Test
public void testGetValidationQuery() {
Assert.assertEquals(Constants.CLICKHOUSE_VALIDATION_QUERY, clickHouseDatasourceProcessor.getValidationQuery());
}
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java

@ -804,10 +804,6 @@ public class WorkflowExecuteThread {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
TaskDefinition taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
taskInstance.setTaskGroupId(taskDefinition.getTaskGroupId());
// package task instance before submit
processService.packageTaskInstance(taskInstance, processInstance);
@ -920,6 +916,10 @@ public class WorkflowExecuteThread {
//set task param
taskInstance.setTaskParams(taskNode.getTaskParams());
//set task group and priority
taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
taskInstance.setTaskGroupPriority(taskNode.getTaskGroupPriority());
// task instance priority
if (taskNode.getTaskInstancePriority() == null) {
taskInstance.setTaskInstancePriority(Priority.MEDIUM);

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2510,6 +2510,8 @@ public class ProcessService {
taskDefinitionLog.getTimeout())));
taskNode.setDelayTime(taskDefinitionLog.getDelayTime());
taskNode.setPreTasks(JSONUtils.toJsonString(code.getValue().stream().map(taskDefinitionLogMap::get).map(TaskDefinition::getCode).collect(Collectors.toList())));
taskNode.setTaskGroupId(taskDefinitionLog.getTaskGroupId());
taskNode.setTaskGroupPriority(taskDefinitionLog.getTaskGroupPriority());
taskNodeList.add(taskNode);
}
}

Loading…
Cancel
Save