Browse Source

[Feature][JsonSplit-api] taskService methon (#6017)

* fix api run error

* fix ut

* api of ProcessDefinition/TaskDefinition

* taskService methon

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
ab07a31403
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 61
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  2. 32
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  3. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  4. 130
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  5. 91
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  6. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  7. 14
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
  8. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java
  9. 43
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
  10. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java
  11. 28
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
  12. 47
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml
  13. 20
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java
  14. 34
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java

61
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -122,6 +122,7 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user info
* @param projectCode project code
* @param code task definition code
* @param pageNo the task definition version list current page number
* @param pageSize the task definition version list page size
* @param code the task definition code
@ -129,9 +130,9 @@ public class TaskDefinitionController extends BaseController {
*/
@ApiOperation(value = "queryVersions", notes = "QUERY_TASK_DEFINITION_VERSIONS_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
@GetMapping(value = "/versions")
@ResponseStatus(HttpStatus.OK)
@ -139,11 +140,14 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskDefinitionVersions(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "code") long code,
@RequestParam(value = "pageNo") int pageNo,
@RequestParam(value = "pageSize") int pageSize,
@RequestParam(value = "code") long code) {
Map<String, Object> result = taskDefinitionService.queryTaskDefinitionVersions(loginUser, projectCode, pageNo, pageSize, code);
return returnDataList(result);
@RequestParam(value = "pageSize") int pageSize) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
}
return taskDefinitionService.queryTaskDefinitionVersions(loginUser, projectCode, code, pageNo, pageSize);
}
/**
@ -249,6 +253,7 @@ public class TaskDefinitionController extends BaseController {
*
* @param loginUser login user
* @param projectCode project code
* @param taskType taskType
* @param searchVal search value
* @param userId user id
* @param pageNo page number
@ -257,6 +262,7 @@ public class TaskDefinitionController extends BaseController {
*/
@ApiOperation(value = "queryTaskDefinitionListPaging", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = false, type = "String"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ -268,6 +274,7 @@ public class TaskDefinitionController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskDefinitionListPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskType", required = false) String taskType,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@ -276,47 +283,9 @@ public class TaskDefinitionController extends BaseController {
if (!result.checkResult()) {
return result;
}
taskType = ParameterUtils.handleEscapes(taskType);
searchVal = ParameterUtils.handleEscapes(searchVal);
return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, userId, pageNo, pageSize);
}
/**
* query task definition list paging by taskType
*
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param taskType taskType
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return task definition page
*/
@ApiOperation(value = "queryTaskDefinitionByTaskType", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, type = "String"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
@GetMapping(value = "/task-type-list-paging")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_DEFINITION_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryTaskDefinitionByTaskType(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskType", required = true) String taskType,
@RequestParam(value = "searchVal", required = false) String searchVal,
@RequestParam(value = "userId", required = false, defaultValue = "0") Integer userId,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
if (!result.checkResult()) {
return result;
}
searchVal = ParameterUtils.handleEscapes(searchVal);
return taskDefinitionService.queryTaskDefinitionByTaskType(loginUser, projectCode, taskType, searchVal, userId, pageNo, pageSize);
return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, taskType, searchVal, userId, pageNo, pageSize);
}
/**

32
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -91,16 +91,16 @@ public interface TaskDefinitionService {
*
* @param loginUser login user info to check auth
* @param projectCode project code
* @param taskCode task definition code
* @param pageNo page number
* @param pageSize page size
* @param taskCode task definition code
* @return the pagination task definition versions info of the certain task definition
*/
Map<String, Object> queryTaskDefinitionVersions(User loginUser,
long projectCode,
int pageNo,
int pageSize,
long taskCode);
Result queryTaskDefinitionVersions(User loginUser,
long projectCode,
long taskCode,
int pageNo,
int pageSize);
/**
* delete the certain task definition version by version and code
@ -128,24 +128,6 @@ public interface TaskDefinitionService {
long projectCode,
long taskCode);
/**
* query task definition list paging
*
* @param loginUser login user
* @param projectCode project code
* @param searchVal search value
* @param userId user id
* @param pageNo page number
* @param pageSize page size
* @return task definition page
*/
Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize);
/**
* query task definition list paging
*
@ -158,7 +140,7 @@ public interface TaskDefinitionService {
* @param pageSize page size
* @return task definition page
*/
Result queryTaskDefinitionByTaskType(User loginUser,
Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String taskType,
String searchVal,

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -1347,12 +1347,9 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
putMsg(result, resultStatus);
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code);
PageInfo<ProcessDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<ProcessDefinitionLog> page = new Page<>(pageNo, pageSize);
IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, processDefinition.getCode());
IPage<ProcessDefinitionLog> processDefinitionVersionsPaging = processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code);
List<ProcessDefinitionLog> processDefinitionLogs = processDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(processDefinitionLogs);

130
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.TaskDefinitionService;
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -36,6 +37,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
@ -52,6 +54,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task definition service impl
*/
@ -78,6 +83,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Autowired
private ProcessService processService;
@Autowired
private UserMapper userMapper;
/**
* create task definition
*
@ -159,7 +167,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
totalSuccessNumber++;
}
for (TaskDefinitionLog taskDefinitionToUpdate : updateTaskDefinitionLogs) {
TaskDefinition task = taskDefinitionMapper.queryByDefinitionCode(taskDefinitionToUpdate.getCode());
TaskDefinition task = taskDefinitionMapper.queryByCode(taskDefinitionToUpdate.getCode());
if (task == null) {
newTaskDefinitionLogs.add(taskDefinitionToUpdate);
} else {
@ -203,7 +211,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionName(project.getCode(), taskName);
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName);
} else {
@ -268,7 +276,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode);
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
@ -332,7 +340,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE);
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByDefinitionCode(taskCode);
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
@ -340,46 +348,123 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version);
taskDefinitionLog.setUserId(loginUser.getId());
taskDefinitionLog.setUpdateTime(new Date());
taskDefinitionMapper.updateById(taskDefinitionLog);
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
int switchVersion = taskDefinitionMapper.updateById(taskDefinitionLog);
if (switchVersion > 0) {
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR);
}
return result;
}
@Override
public Map<String, Object> queryTaskDefinitionVersions(User loginUser, long projectCode, int pageNo, int pageSize, long taskCode) {
return null;
public Result queryTaskDefinitionVersions(User loginUser,
long projectCode,
long taskCode,
int pageNo,
int pageSize) {
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result, resultStatus);
return result;
}
PageInfo<TaskDefinitionLog> pageInfo = new PageInfo<>(pageNo, pageSize);
Page<TaskDefinitionLog> page = new Page<>(pageNo, pageSize);
IPage<TaskDefinitionLog> taskDefinitionVersionsPaging = taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode);
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(taskDefinitionLogs);
pageInfo.setTotal((int) taskDefinitionVersionsPaging.getTotal());
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
@Override
public Map<String, Object> deleteByCodeAndVersion(User loginUser, long projectCode, long taskCode, int version) {
return null;
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
} else {
int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version);
if (delete > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.DELETE_TASK_DEFINITION_VERSION_ERROR);
}
}
return result;
}
@Override
public Map<String, Object> queryTaskDefinitionDetail(User loginUser, long projectCode, long taskCode) {
return null;
}
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@Override
public Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize) {
return null;
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
} else {
result.put(Constants.DATA_LIST, taskDefinition);
putMsg(result, Status.SUCCESS);
}
return result;
}
@Override
public Result queryTaskDefinitionByTaskType(User loginUser,
public Result queryTaskDefinitionListPaging(User loginUser,
long projectCode,
String taskType,
String searchVal,
Integer userId,
Integer pageNo,
Integer pageSize) {
return null;
Result result = new Result();
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectCode);
Status resultStatus = (Status) checkResult.get(Constants.STATUS);
if (resultStatus != Status.SUCCESS) {
putMsg(result, resultStatus);
return result;
}
if (StringUtils.isNotBlank(taskType)) {
taskType = taskType.toUpperCase();
}
Page<TaskDefinition> page = new Page<>(pageNo, pageSize);
IPage<TaskDefinition> taskDefinitionIPage = taskDefinitionMapper.queryDefineListPaging(
page, projectCode, taskType, searchVal, userId, isAdmin(loginUser));
if (StringUtils.isNotBlank(taskType)) {
List<TaskDefinition> records = taskDefinitionIPage.getRecords();
for (TaskDefinition pd : records) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(pd.getCode(), pd.getVersion());
User user = userMapper.selectById(taskDefinitionLog.getOperator());
pd.setModifyBy(user.getUserName());
}
taskDefinitionIPage.setRecords(records);
}
PageInfo<TaskDefinition> pageInfo = new PageInfo<>(pageNo, pageSize);
pageInfo.setTotal((int) taskDefinitionIPage.getTotal());
pageInfo.setTotalList(taskDefinitionIPage.getRecords());
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
return result;
}
@Override
@ -405,4 +490,3 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
}

91
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -22,14 +22,12 @@ import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
@ -37,6 +35,7 @@ import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -51,48 +50,6 @@ import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TaskDefinitionServiceImplTest {
String taskDefinitionJson = "{\n"
+ " \"type\": \"SQL\",\n"
+ " \"id\": \"tasks-27297\",\n"
+ " \"name\": \"SQL\",\n"
+ " \"params\": {\n"
+ " \"type\": \"MYSQL\",\n"
+ " \"datasource\": 1,\n"
+ " \"sql\": \"select * from test\",\n"
+ " \"udfs\": \"\",\n"
+ " \"sqlType\": \"1\",\n"
+ " \"title\": \"\",\n"
+ " \"receivers\": \"\",\n"
+ " \"receiversCc\": \"\",\n"
+ " \"showType\": \"TABLE\",\n"
+ " \"localParams\": [\n"
+ " \n"
+ " ],\n"
+ " \"connParams\": \"\",\n"
+ " \"preStatements\": [\n"
+ " \n"
+ " ],\n"
+ " \"postStatements\": [\n"
+ " \n"
+ " ]\n"
+ " },\n"
+ " \"description\": \"\",\n"
+ " \"runFlag\": \"NORMAL\",\n"
+ " \"dependence\": {\n"
+ " \n"
+ " },\n"
+ " \"maxRetryTimes\": \"0\",\n"
+ " \"retryInterval\": \"1\",\n"
+ " \"timeout\": {\n"
+ " \"strategy\": \"\",\n"
+ " \"enable\": false\n"
+ " },\n"
+ " \"taskInstancePriority\": \"MEDIUM\",\n"
+ " \"workerGroupId\": -1,\n"
+ " \"preTasks\": [\n"
+ " \"dependent\"\n"
+ " ]\n"
+ "}\n";
@InjectMocks
private TaskDefinitionServiceImpl taskDefinitionService;
@ -103,12 +60,6 @@ public class TaskDefinitionServiceImplTest {
@Mock
private TaskDefinitionLogMapper taskDefinitionLogMapper;
@Mock
private ProcessDefinitionMapper processDefineMapper;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private ProjectMapper projectMapper;
@ -118,6 +69,10 @@ public class TaskDefinitionServiceImplTest {
@Mock
private ProcessService processService;
@Mock
private ProcessTaskRelationMapper processTaskRelationMapper;
;
@Test
public void createTaskDefinition() {
long projectCode = 1L;
@ -144,13 +99,13 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
.createTaskDefinition(loginUser, projectCode, createTaskDefinitionJson);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
public void updateTaskDefinition () {
public void updateTaskDefinition() {
String taskDefinitionJson = "{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
@ -173,7 +128,7 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
Mockito.when(processService.isTaskOnline(taskCode)).thenReturn(Boolean.FALSE);
Mockito.when(taskDefinitionMapper.queryByDefinitionCode(taskCode)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.insert(Mockito.any(TaskDefinitionLog.class))).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode)).thenReturn(1);
@ -197,13 +152,11 @@ public class TaskDefinitionServiceImplTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
Mockito.when(taskDefinitionMapper.queryByDefinitionName(project.getCode(), taskName))
.thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.queryByName(project.getCode(), taskName))
.thenReturn(new TaskDefinition());
Map<String, Object> relation = taskDefinitionService
.queryTaskDefinitionByName(loginUser, projectCode, taskName);
.queryTaskDefinitionByName(loginUser, projectCode, taskName);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@ -222,17 +175,15 @@ public class TaskDefinitionServiceImplTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
Mockito.when(processTaskRelationMapper.queryByTaskCode(Mockito.anyLong()))
.thenReturn(new ArrayList<>());
Mockito.when(taskDefinitionMapper.deleteByCode(Mockito.anyLong()))
.thenReturn(1);
.thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.deleteTaskDefinitionByCode(loginUser, projectCode, 11L);
.deleteTaskDefinitionByCode(loginUser, projectCode, Mockito.anyLong());
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}
@Test
@ -252,16 +203,14 @@ public class TaskDefinitionServiceImplTest {
putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
TaskNode taskNode = JSONUtils.parseObject(taskDefinitionJson, TaskNode.class);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version))
.thenReturn(new TaskDefinitionLog());
Mockito.when(taskDefinitionMapper.queryByDefinitionCode(taskCode))
.thenReturn(new TaskDefinition());
.thenReturn(new TaskDefinitionLog());
Mockito.when(taskDefinitionMapper.queryByCode(taskCode))
.thenReturn(new TaskDefinition());
Mockito.when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1);
Map<String, Object> relation = taskDefinitionService
.switchVersion(loginUser, projectCode, taskCode, version);
.switchVersion(loginUser, projectCode, taskCode, version);
Assert.assertEquals(Status.SUCCESS, relation.get(Constants.STATUS));
}

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -167,7 +167,7 @@ public class ProcessDefinition {
@TableField(exist = false)
private int warningGroupId;
public ProcessDefinition(){}
public ProcessDefinition() {}
public ProcessDefinition(long projectCode,
String name,

14
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java

@ -177,6 +177,12 @@ public class TaskDefinition {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date updateTime;
/**
* modify user name
*/
@TableField(exist = false)
private String modifyBy;
public TaskDefinition() {
}
@ -401,6 +407,14 @@ public class TaskDefinition {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
}
public String getModifyBy() {
return modifyBy;
}
public void setModifyBy(String modifyBy) {
this.modifyBy = modifyBy;
}
@Override
public boolean equals(Object o) {
if (o == null) {

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinitionLog.java

@ -70,6 +70,7 @@ public class TaskDefinitionLog extends TaskDefinition {
this.setFailRetryInterval(taskDefinition.getFailRetryInterval());
this.setFailRetryTimes(taskDefinition.getFailRetryTimes());
this.setFlag(taskDefinition.getFlag());
this.setModifyBy(taskDefinition.getModifyBy());
}
public int getOperator() {

43
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java

@ -26,44 +26,34 @@ import java.util.Collection;
import java.util.List;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
/**
* task definition log mapper interface
*/
public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
/**
* query task definition log by name
*
* @param projectCode projectCode
* @param name name
* @return task definition log list
*/
List<TaskDefinitionLog> queryByDefinitionName(@Param("projectCode") long projectCode,
@Param("taskDefinitionName") String name);
/**
* query max version for definition
*
* @param taskDefinitionCode taskDefinitionCode
* @param code taskDefinitionCode
*/
Integer queryMaxVersionForDefinition(@Param("taskDefinitionCode") long taskDefinitionCode);
Integer queryMaxVersionForDefinition(@Param("code") long code);
/**
* query task definition log
*
* @param taskDefinitionCode taskDefinitionCode
* @param code taskDefinitionCode
* @param version version
* @return task definition log
*/
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("taskDefinitionCode") long taskDefinitionCode,
TaskDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code,
@Param("version") int version);
/**
*
* @param taskDefinitions
* @return
* @param taskDefinitions taskDefinition list
* @return list
*/
List<TaskDefinitionLog> queryByTaskDefinitions(@Param("taskDefinitions") Collection<TaskDefinition> taskDefinitions);
@ -75,4 +65,21 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
*/
int batchInsert(@Param("taskDefinitionLogs") List<TaskDefinitionLog> taskDefinitionLogs);
/**
* delete the certain task definition version by task definition code and version
*
* @param code task definition code
* @param version task definition version
* @return delete result
*/
int deleteByCodeAndVersion(@Param("code") long code, @Param("version") int version);
/**
* query the paging task definition version list by pagination info
*
* @param page pagination info
* @param code process definition code
* @return the paging task definition version list
*/
IPage<TaskDefinitionLog> queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code") long code);
}

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.dao.mapper;
import org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.ibatis.annotations.MapKey;
import org.apache.ibatis.annotations.Param;
@ -27,6 +28,7 @@ import java.util.List;
import java.util.Map;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
/**
* task definition mapper interface
@ -40,24 +42,16 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* @param name name
* @return task definition
*/
TaskDefinition queryByDefinitionName(@Param("projectCode") long projectCode,
@Param("taskDefinitionName") String name);
/**
* query task definition by id
*
* @param taskDefinitionId taskDefinitionId
* @return task definition
*/
TaskDefinition queryByDefinitionId(@Param("taskDefinitionId") int taskDefinitionId);
TaskDefinition queryByName(@Param("projectCode") long projectCode,
@Param("name") String name);
/**
* query task definition by code
*
* @param taskDefinitionCode taskDefinitionCode
* @param code taskDefinitionCode
* @return task definition
*/
TaskDefinition queryByDefinitionCode(@Param("taskDefinitionCode") long taskDefinitionCode);
TaskDefinition queryByCode(@Param("code") long code);
/**
* query all task definition list
@ -67,14 +61,6 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
*/
List<TaskDefinition> queryAllDefinitionList(@Param("projectCode") long projectCode);
/**
* query task definition by ids
*
* @param ids ids
* @return task definition list
*/
List<TaskDefinition> queryDefinitionListByIdList(@Param("ids") Integer[] ids);
/**
* count task definition group by user
*
@ -114,4 +100,22 @@ public interface TaskDefinitionMapper extends BaseMapper<TaskDefinition> {
* @return int
*/
int batchInsert(@Param("taskDefinitions") List<TaskDefinitionLog> taskDefinitions);
/**
* task definition page
*
* @param page page
* @param taskType taskType
* @param searchVal searchVal
* @param userId userId
* @param projectCode projectCode
* @param isAdmin isAdmin
* @return task definition IPage
*/
IPage<TaskDefinition> queryDefineListPaging(IPage<TaskDefinition> page,
@Param("projectCode") long projectCode,
@Param("taskType") String taskType,
@Param("searchVal") String searchVal,
@Param("userId") int userId,
@Param("isAdmin") boolean isAdmin);
}

28
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml

@ -23,27 +23,16 @@
worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, operator, operate_time, create_time, update_time
</sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params,
td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
td.timeout, td.delay_time, td.resource_ids, td.operator,td.operate_time, td.create_time, td.update_time,
u.user_name,p.name as project_name
from t_ds_task_definition_log td
JOIN t_ds_user u ON td.user_id = u.id
JOIN t_ds_project p ON td.project_code = p.code
WHERE p.code = #{projectCode}
and td.name = #{taskDefinitionName}
</select>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version)
from t_ds_task_definition_log
WHERE code = #{taskDefinitionCode}
WHERE code = #{code}
</select>
<select id="queryByDefinitionCodeAndVersion" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_task_definition_log
WHERE code = #{taskDefinitionCode}
WHERE code = #{code}
and version = #{version}
</select>
<select id="queryByTaskDefinitions" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
@ -73,4 +62,17 @@
#{taskDefinitionLog.createTime},#{taskDefinitionLog.updateTime})
</foreach>
</insert>
<delete id="deleteByCodeAndVersion">
delete
from t_ds_task_definition_log
where code = #{code}
and version = #{version}
</delete>
<select id="queryTaskDefinitionVersionsPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog">
select
<include refid="baseSql"/>
from t_ds_task_definition_log
where code = #{code}
order by version desc
</select>
</mapper>

47
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapper.xml

@ -23,12 +23,12 @@
worker_group, fail_retry_times, fail_retry_interval, timeout_flag, timeout_notify_strategy, timeout, delay_time,
resource_ids, create_time, update_time
</sql>
<select id="queryByDefinitionName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
<select id="queryByName" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
<include refid="baseSql"/>
from t_ds_task_definition
WHERE project_code = #{projectCode}
and `name` = #{taskDefinitionName}
and `name` = #{name}
</select>
<select id="queryAllDefinitionList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
@ -38,16 +38,6 @@
order by create_time desc
</select>
<select id="queryDefinitionListByIdList" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
<include refid="baseSql"/>
from t_ds_task_definition
where id in
<foreach collection="ids" index="index" item="i" open="(" separator="," close=")">
#{i}
</foreach>
</select>
<select id="countDefinitionGroupByUser" resultType="org.apache.dolphinscheduler.dao.entity.DefinitionGroupByUser">
SELECT td.user_id as user_id, tu.user_name as user_name, count(0) as count
FROM t_ds_task_definition td
@ -61,20 +51,12 @@
</if>
group by td.user_id,tu.user_name
</select>
<select id="queryByDefinitionId" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params,
td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
td.timeout, td.delay_time, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name
from t_ds_task_definition td
JOIN t_ds_user u ON td.user_id = u.id
JOIN t_ds_project p ON td.project_code = p.code
WHERE td.id = #{taskDefinitionId}
</select>
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
<select id="queryByCode" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select
<include refid="baseSql"/>
from t_ds_task_definition
where code = #{taskDefinitionCode}
where code = #{code}
</select>
<select id="listResources" resultType="java.util.HashMap">
SELECT id,resource_ids
@ -104,4 +86,23 @@
#{taskDefinition.delayTime},#{taskDefinition.resourceIds},#{taskDefinition.createTime},#{taskDefinition.updateTime})
</foreach>
</insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinition">
select td.id, td.code, td.name, td.version, td.description, td.project_code, td.user_id, td.task_type, td.task_params,
td.flag, td.task_priority, td.worker_group, td.fail_retry_times, td.fail_retry_interval, td.timeout_flag, td.timeout_notify_strategy,
td.timeout, td.delay_time, td.resource_ids, td.create_time, td.update_time, u.user_name,p.name as project_name
from t_ds_task_definition td
JOIN t_ds_user u ON td.user_id = u.id
JOIN t_ds_project p ON td.project_code = p.code
where td.project_code = #{projectCode}
<if test=" taskType != null and taskType != ''">
and td.task_type = #{taskType}
</if>
<if test=" searchVal != null and searchVal != ''">
and td.name like concat('%', #{searchVal}, '%')
</if>
<if test=" userId != 0">
and td.user_id = #{userId}
</if>
order by td.update_time desc
</select>
</mapper>

20
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapperTest.java

@ -78,26 +78,6 @@ public class TaskDefinitionLogMapperTest {
Assert.assertNotEquals(taskDefinitionLog.getId(), 0);
}
@Test
public void testQueryByDefinitionName() {
User user = new User();
user.setUserName("un");
userMapper.insert(user);
User un = userMapper.queryByUserNameAccurately("un");
Project project = new Project();
project.setCode(1L);
project.setCreateTime(new Date());
project.setUpdateTime(new Date());
projectMapper.insert(project);
TaskDefinitionLog taskDefinitionLog = insertOne(un.getId());
List<TaskDefinitionLog> taskDefinitionLogs = taskDefinitionLogMapper
.queryByDefinitionName(taskDefinitionLog.getProjectCode(), taskDefinitionLog.getName());
Assert.assertNotEquals(taskDefinitionLogs.size(), 0);
}
@Test
public void testQueryMaxVersionForDefinition() {
TaskDefinitionLog taskDefinitionLog = insertOne();

34
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionMapperTest.java

@ -79,38 +79,16 @@ public class TaskDefinitionMapperTest {
@Test
public void testQueryByDefinitionName() {
TaskDefinition taskDefinition = insertOne();
TaskDefinition result = taskDefinitionMapper.queryByDefinitionName(taskDefinition.getProjectCode()
TaskDefinition result = taskDefinitionMapper.queryByName(taskDefinition.getProjectCode()
, taskDefinition.getName());
Assert.assertNotNull(result);
}
@Test
public void testQueryByDefinitionId() {
User user = new User();
user.setUserName("un");
userMapper.insert(user);
User un = userMapper.queryByUserNameAccurately("un");
Project project = new Project();
project.setCode(1L);
project.setCreateTime(new Date());
project.setUpdateTime(new Date());
projectMapper.insert(project);
TaskDefinition taskDefinition = insertOne(un.getId());
TaskDefinition td = taskDefinitionMapper.queryByDefinitionName(taskDefinition.getProjectCode()
, taskDefinition.getName());
TaskDefinition result = taskDefinitionMapper.queryByDefinitionId(td.getId());
Assert.assertNotNull(result);
}
@Test
public void testQueryByDefinitionCode() {
TaskDefinition taskDefinition = insertOne();
TaskDefinition result = taskDefinitionMapper.queryByDefinitionCode(taskDefinition.getCode());
TaskDefinition result = taskDefinitionMapper.queryByCode(taskDefinition.getCode());
Assert.assertNotNull(result);
}
@ -123,14 +101,6 @@ public class TaskDefinitionMapperTest {
}
@Test
public void testQueryDefinitionListByIdList() {
TaskDefinition taskDefinition = insertOne();
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.queryDefinitionListByIdList(new Integer[]{taskDefinition.getId()});
Assert.assertNotEquals(taskDefinitions.size(), 0);
}
@Test
public void testCountDefinitionGroupByUser() {
User user = new User();

Loading…
Cancel
Save