Browse Source

[Feature][JsonSplit-api] fix api run error (#5989)

* fix api run error

* fix ut

Co-authored-by: JinyLeeChina <297062848@qq.com>
2.0.7-release
JinyLeeChina 3 years ago committed by GitHub
parent
commit
9ca51cf0e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
  2. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
  3. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java
  4. 30
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java
  5. 10
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java
  6. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
  7. 48
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
  8. 57
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
  9. 6
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java
  10. 27
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
  11. 14
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
  12. 8
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml
  13. 6
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml
  14. 2
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java

@ -269,8 +269,8 @@ public class ProcessDefinitionController extends BaseController {
*/ */
@ApiOperation(value = "queryVersions", notes = "QUERY_PROCESS_DEFINITION_VERSIONS_NOTES") @ApiOperation(value = "queryVersions", notes = "QUERY_PROCESS_DEFINITION_VERSIONS_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1") @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
}) })
@GetMapping(value = "/versions") @GetMapping(value = "/versions")
@ -448,10 +448,10 @@ public class ProcessDefinitionController extends BaseController {
*/ */
@ApiOperation(value = "queryListPaging", notes = "QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES") @ApiOperation(value = "queryListPaging", notes = "QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"), @ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100") @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
}) })
@GetMapping(value = "/list-paging") @GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

@ -104,8 +104,8 @@ public class ProcessInstanceController extends BaseController {
@ApiImplicitParam(name = "host", value = "HOST", type = "String"), @ApiImplicitParam(name = "host", value = "HOST", type = "String"),
@ApiImplicitParam(name = "startDate", value = "START_DATE", type = "String"), @ApiImplicitParam(name = "startDate", value = "START_DATE", type = "String"),
@ApiImplicitParam(name = "endDate", value = "END_DATE", type = "String"), @ApiImplicitParam(name = "endDate", value = "END_DATE", type = "String"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100") @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
}) })
@GetMapping(value = "list-paging") @GetMapping(value = "list-paging")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProjectController.java

@ -149,7 +149,7 @@ public class ProjectController extends BaseController {
@ApiOperation(value = "queryProjectListPaging", notes = "QUERY_PROJECT_LIST_PAGING_NOTES") @ApiOperation(value = "queryProjectListPaging", notes = "QUERY_PROJECT_LIST_PAGING_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "20"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1") @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1")
}) })
@GetMapping(value = "/list-paging") @GetMapping(value = "/list-paging")

30
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.CREATE_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINE_BY_CODE_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINE_BY_CODE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINITION_VERSION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_TASK_DEFINITION_VERSION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DETAIL_OF_TASK_DEFINITION_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_LIST_PAGING_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_LIST_PAGING_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_VERSIONS_ERROR; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_DEFINITION_VERSIONS_ERROR;
@ -128,8 +129,8 @@ public class TaskDefinitionController extends BaseController {
*/ */
@ApiOperation(value = "queryVersions", notes = "QUERY_TASK_DEFINITION_VERSIONS_NOTES") @ApiOperation(value = "queryVersions", notes = "QUERY_TASK_DEFINITION_VERSIONS_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10"),
@ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1") @ApiImplicitParam(name = "taskDefinitionCode", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1")
}) })
@GetMapping(value = "/versions") @GetMapping(value = "/versions")
@ -256,10 +257,10 @@ public class TaskDefinitionController extends BaseController {
*/ */
@ApiOperation(value = "queryTaskDefinitionListPaging", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES") @ApiOperation(value = "queryTaskDefinitionListPaging", notes = "QUERY_TASK_DEFINITION_LIST_PAGING_NOTES")
@ApiImplicitParams({ @ApiImplicitParams({
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "100"), @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"), @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", required = false, type = "String"),
@ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"), @ApiImplicitParam(name = "userId", value = "USER_ID", required = false, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "100") @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
}) })
@GetMapping(value = "/list-paging") @GetMapping(value = "/list-paging")
@ResponseStatus(HttpStatus.OK) @ResponseStatus(HttpStatus.OK)
@ -279,4 +280,25 @@ public class TaskDefinitionController extends BaseController {
searchVal = ParameterUtils.handleEscapes(searchVal); searchVal = ParameterUtils.handleEscapes(searchVal);
return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, pageNo, pageSize, userId); return taskDefinitionService.queryTaskDefinitionListPaging(loginUser, projectCode, searchVal, pageNo, pageSize, userId);
} }
/**
* gen task code list
*
* @param loginUser login user
* @param genNum gen num
* @return task code list
*/
@ApiOperation(value = "genTaskCodeList", notes = "GEN_TASK_CODE_LIST_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "genNum", value = "GEN_NUM", required = true, dataType = "Int", example = "1")
})
@GetMapping(value = "/gen-task-code-list")
@ResponseStatus(HttpStatus.OK)
@ApiException(LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result genTaskCodeList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("pageNo") Integer genNum) {
Map<String, Object> result = taskDefinitionService.genTaskCodeList(loginUser, genNum);
return returnDataList(result);
}
} }

10
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java

@ -145,5 +145,15 @@ public interface TaskDefinitionService {
Integer pageNo, Integer pageNo,
Integer pageSize, Integer pageSize,
Integer userId); Integer userId);
/**
* gen task code list
*
* @param loginUser login user
* @param genNum gen num
* @return task code list
*/
Map<String, Object> genTaskCodeList(User loginUser,
Integer genNum);
} }

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

@ -77,6 +77,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -254,7 +255,16 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro
return result; return result;
} }
if (graphHasCycle(processService.transformTask(taskRelationList))) { List<TaskNode> taskNodeList = processService.transformTask(taskRelationList);
if (taskNodeList.size() != taskRelationList.size()) {
Set<Long> postTaskCodes = taskRelationList.stream().map(ProcessTaskRelationLog::getPostTaskCode).collect(Collectors.toSet());
Set<Long> taskNodeCodes = taskNodeList.stream().map(TaskNode::getCode).collect(Collectors.toSet());
Collection<Long> codes = CollectionUtils.subtract(postTaskCodes, taskNodeCodes);
logger.error("the task code is not exit");
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(codes, Constants.COMMA));
return result;
}
if (graphHasCycle(taskNodeList)) {
logger.error("process DAG has cycle"); logger.error("process DAG has cycle");
putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); putMsg(result, Status.PROCESS_NODE_HAS_CYCLE);
return result; return result;

48
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

@ -101,19 +101,23 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} }
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class); List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
if (taskDefinitionLogs.isEmpty()) {
logger.error("taskDefinitionJson invalid: {}", taskDefinitionJson);
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJson);
return result;
}
int totalSuccessNumber = 0; int totalSuccessNumber = 0;
List<Long> totalSuccessCode = new ArrayList<>(); List<Long> totalSuccessCode = new ArrayList<>();
List<TaskDefinitionLog> taskDefinitionLogsList = new ArrayList<>(); Date now = new Date();
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) { for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
checkTaskDefinition(result, taskDefinitionLog); checkTaskDefinition(result, taskDefinitionLog);
if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result; return result;
} }
taskDefinitionLog.setProjectCode(projectCode); taskDefinitionLog.setProjectCode(projectCode);
taskDefinitionLog.setUserId(loginUser.getId()); taskDefinitionLog.setUserId(loginUser.getId());
taskDefinitionLog.setVersion(1); taskDefinitionLog.setVersion(1);
Date now = new Date();
taskDefinitionLog.setCreateTime(now); taskDefinitionLog.setCreateTime(now);
taskDefinitionLog.setUpdateTime(now); taskDefinitionLog.setUpdateTime(now);
long code = 0L; long code = 0L;
@ -127,19 +131,18 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
} }
taskDefinitionLog.setOperator(loginUser.getId()); taskDefinitionLog.setOperator(loginUser.getId());
taskDefinitionLog.setOperateTime(now); taskDefinitionLog.setOperateTime(now);
taskDefinitionLogsList.add(taskDefinitionLog);
totalSuccessCode.add(code); totalSuccessCode.add(code);
totalSuccessNumber++; totalSuccessNumber++;
} }
int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogsList); int insert = taskDefinitionMapper.batchInsert(taskDefinitionLogs);
int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogsList); int logInsert = taskDefinitionLogMapper.batchInsert(taskDefinitionLogs);
if ((logInsert & insert) == 0) { if ((logInsert & insert) == 0) {
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR); putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
return result; return result;
} }
Map<String, Object> resData = new HashMap<>(); Map<String, Object> resData = new HashMap<>();
resData.put("total", totalSuccessNumber); resData.put("total", totalSuccessNumber);
resData.put("code",totalSuccessCode); resData.put("code", totalSuccessCode);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, resData); result.put(Constants.DATA_LIST, resData);
return result; return result;
@ -190,9 +193,9 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode); List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) { if (!processTaskRelationList.isEmpty()) {
Set<Long> processDefinitionCodes = processTaskRelationList Set<Long> processDefinitionCodes = processTaskRelationList
.stream() .stream()
.map(ProcessTaskRelation::getProcessDefinitionCode) .map(ProcessTaskRelation::getProcessDefinitionCode)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, StringUtils.join(processDefinitionCodes, ",")); putMsg(result, Status.PROCESS_TASK_RELATION_EXIST, StringUtils.join(processDefinitionCodes, ","));
return result; return result;
} }
@ -234,7 +237,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJson, TaskDefinitionLog.class); TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJson, TaskDefinitionLog.class);
checkTaskDefinition(result, taskDefinitionToUpdate); checkTaskDefinition(result, taskDefinitionToUpdate);
if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID if (result.get(Constants.STATUS) == DATA_IS_NOT_VALID
|| result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) { || result.get(Constants.STATUS) == Status.PROCESS_NODE_S_PARAMETER_INVALID) {
return result; return result;
} }
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
@ -338,5 +341,28 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
Integer userId) { Integer userId) {
return null; return null;
} }
@Override
public Map<String, Object> genTaskCodeList(User loginUser, Integer genNum) {
Map<String, Object> result = new HashMap<>();
if (genNum == null || genNum < 1 || genNum > 100) {
logger.error("the genNum must be great than 1 and less than 100");
putMsg(result, Status.DATA_IS_NOT_VALID, genNum);
return result;
}
List<Long> taskCodes = new ArrayList<>();
try {
for (int i = 0; i < genNum; i++) {
taskCodes.add(SnowFlakeUtils.getInstance().nextId());
}
} catch (SnowFlakeException e) {
logger.error("Task code get error, ", e);
putMsg(result, Status.INTERNAL_SERVER_ERROR_ARGS, "Error generating task definition code");
}
putMsg(result, Status.SUCCESS);
// return processDefinitionCode
result.put(Constants.DATA_LIST, taskCodes);
return result;
}
} }

57
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.api.service.impl.TaskDefinitionServiceImpl;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@ -132,29 +133,13 @@ public class TaskDefinitionServiceImplTest {
putMsg(result, Status.SUCCESS, projectCode); putMsg(result, Status.SUCCESS, projectCode);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result); Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
String createTaskDefinitionJson = "[{\n" String createTaskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"name\": \"test12111\",\n" + "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\"description\": \"test\",\n" + "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\"taskType\": \"SHELL\",\n" + "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\"flag\": 0,\n" + "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"taskParams\": \n" + "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"{\\\"resourceList\\\":[],\n" + "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
+ "\\\"localParams\\\":[],\n"
+ "\\\"rawScript\\\":\\\"echo 11\\\",\n"
+ "\\\"conditionResult\\\":\n"
+ "{\\\"successNode\\\":[\\\"\\\"],\n"
+ "\\\"failedNode\\\":[\\\"\\\"]},\n"
+ "\\\"dependence\\\":{}}\",\n"
+ "\"taskPriority\": 0,\n"
+ "\"workerGroup\": \"default\",\n"
+ "\"failRetryTimes\": 0,\n"
+ "\"failRetryInterval\": 1,\n"
+ "\"timeoutFlag\": 1, \n"
+ "\"timeoutNotifyStrategy\": 0,\n"
+ "\"timeout\": 0, \n"
+ "\"delayTime\": 0,\n"
+ "\"resourceIds\":\"\" \n"
+ "}] ";
List<TaskDefinition> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class); List<TaskDefinition> taskDefinitions = JSONUtils.toList(createTaskDefinitionJson, TaskDefinition.class);
Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1); Mockito.when(taskDefinitionMapper.batchInsert(Mockito.anyList())).thenReturn(1);
Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1); Mockito.when(taskDefinitionLogMapper.batchInsert(Mockito.anyList())).thenReturn(1);
@ -314,4 +299,30 @@ public class TaskDefinitionServiceImplTest {
return project; return project;
} }
@Test
public void checkJson() {
String taskDefinitionJson = "[{\"name\":\"detail_up\",\"description\":\"\",\"taskType\":\"SHELL\",\"taskParams\":"
+ "\"{\\\"resourceList\\\":[],\\\"localParams\\\":[{\\\"prop\\\":\\\"datetime\\\",\\\"direct\\\":\\\"IN\\\","
+ "\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"${system.datetime}\\\"}],\\\"rawScript\\\":"
+ "\\\"echo ${datetime}\\\",\\\"conditionResult\\\":\\\"{\\\\\\\"successNode\\\\\\\":[\\\\\\\"\\\\\\\"],"
+ "\\\\\\\"failedNode\\\\\\\":[\\\\\\\"\\\\\\\"]}\\\",\\\"dependence\\\":{}}\",\"flag\":0,\"taskPriority\":0,"
+ "\"workerGroup\":\"default\",\"failRetryTimes\":0,\"failRetryInterval\":0,\"timeoutFlag\":0,"
+ "\"timeoutNotifyStrategy\":0,\"timeout\":0,\"delayTime\":0,\"resourceIds\":\"\"}]";
List<TaskDefinitionLog> taskDefinitionLogs = JSONUtils.toList(taskDefinitionJson, TaskDefinitionLog.class);
Assert.assertFalse(taskDefinitionLogs.isEmpty());
String taskParams = "{\"resourceList\":[],\"localParams\":[{\"prop\":\"datetime\",\"direct\":\"IN\",\"type\":\"VARCHAR\","
+ "\"value\":\"${system.datetime}\"}],\"rawScript\":\"echo ${datetime}\",\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],"
+ "\\\"failedNode\\\":[\\\"\\\"]}\",\"dependence\":{}}";
ShellParameters parameters = JSONUtils.parseObject(taskParams, ShellParameters.class);
Assert.assertNotNull(parameters);
}
@Test
public void genTaskCodeList() {
User loginUser = new User();
loginUser.setId(-1);
loginUser.setUserType(UserType.GENERAL_USER);
Map<String, Object> genTaskCodeList = taskDefinitionService.genTaskCodeList(loginUser, 10);
Assert.assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS));
}
} }

6
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessDefinition.java

@ -288,10 +288,9 @@ public class ProcessDefinition {
} }
public void setGlobalParams(String globalParams) { public void setGlobalParams(String globalParams) {
if (globalParams == null) { this.globalParamList = JSONUtils.toList(globalParams, Property.class);
if (this.globalParamList == null) {
this.globalParamList = new ArrayList<>(); this.globalParamList = new ArrayList<>();
} else {
this.globalParamList = JSONUtils.toList(globalParams, Property.class);
} }
this.globalParams = globalParams; this.globalParams = globalParams;
} }
@ -301,7 +300,6 @@ public class ProcessDefinition {
} }
public void setGlobalParamList(List<Property> globalParamList) { public void setGlobalParamList(List<Property> globalParamList) {
this.globalParams = JSONUtils.toJsonString(globalParamList);
this.globalParamList = globalParamList; this.globalParamList = globalParamList;
} }

27
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java

@ -36,56 +36,53 @@ public interface ProcessDefinitionLogMapper extends BaseMapper<ProcessDefinition
* query process definition log by name * query process definition log by name
* *
* @param projectCode projectCode * @param projectCode projectCode
* @param name process name * @param name process definition name
* @return process definition log list * @return process definition log list
*/ */
List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode") long projectCode, List<ProcessDefinitionLog> queryByDefinitionName(@Param("projectCode") long projectCode, @Param("name") String name);
@Param("processDefinitionName") String name);
/** /**
* query process definition log list * query process definition log list
* *
* @param processDefinitionCode processDefinitionCode * @param code process definition code
* @return process definition log list * @return process definition log list
*/ */
List<ProcessDefinitionLog> queryByDefinitionCode(@Param("processDefinitionCode") long processDefinitionCode); List<ProcessDefinitionLog> queryByDefinitionCode(@Param("code") long code);
/** /**
* query max version for definition * query max version for definition
*/ */
Integer queryMaxVersionForDefinition(@Param("processDefinitionCode") long processDefinitionCode); Integer queryMaxVersionForDefinition(@Param("code") long code);
/** /**
* query max version definition log * query max version definition log
*/ */
ProcessDefinitionLog queryMaxVersionDefinitionLog(@Param("processDefinitionCode") long processDefinitionCode); ProcessDefinitionLog queryMaxVersionDefinitionLog(@Param("code") long code);
/** /**
* query the certain process definition version info by process definition code and version number * query the certain process definition version info by process definition code and version number
* *
* @param processDefinitionCode process definition code * @param code process definition code
* @param version version number * @param version version number
* @return the process definition version info * @return the process definition version info
*/ */
ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long processDefinitionCode, ProcessDefinitionLog queryByDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
@Param("version") int version);
/** /**
* query the paging process definition version list by pagination info * query the paging process definition version list by pagination info
* *
* @param page pagination info * @param page pagination info
* @param processDefinitionCode process definition code * @param code process definition code
* @return the paging process definition version list * @return the paging process definition version list
*/ */
IPage<ProcessDefinitionLog> queryProcessDefinitionVersionsPaging(Page<ProcessDefinitionLog> page, IPage<ProcessDefinitionLog> queryProcessDefinitionVersionsPaging(Page<ProcessDefinitionLog> page, @Param("code") long code);
@Param("processDefinitionCode") long processDefinitionCode);
/** /**
* delete the certain process definition version by process definition id and version number * delete the certain process definition version by process definition id and version number
* *
* @param processDefinitionCode process definition code * @param code process definition code
* @param version version number * @param version version number
* @return delete result * @return delete result
*/ */
int deleteByProcessDefinitionCodeAndVersion(@Param("processDefinitionCode") long processDefinitionCode, @Param("version") int version); int deleteByProcessDefinitionCodeAndVersion(@Param("code") long code, @Param("version") int version);
} }

14
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml

@ -34,26 +34,26 @@
JOIN t_ds_user u ON pd.user_id = u.id JOIN t_ds_user u ON pd.user_id = u.id
JOIN t_ds_project p ON pd.project_code = p.code JOIN t_ds_project p ON pd.project_code = p.code
WHERE p.code = #{projectCode} WHERE p.code = #{projectCode}
and pd.name = #{processDefinitionName} and pd.name = #{name}
</select> </select>
<select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> <select id="queryByDefinitionCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_process_definition_log from t_ds_process_definition_log
WHERE code = #{processDefinitionCode} WHERE code = #{code}
</select> </select>
<select id="queryByDefinitionCodeAndVersion" <select id="queryByDefinitionCodeAndVersion"
resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_process_definition_log from t_ds_process_definition_log
where code = #{processDefinitionCode} where code = #{code}
and version = #{version} and version = #{version}
</select> </select>
<select id="queryMaxVersionForDefinition" resultType="java.lang.Integer"> <select id="queryMaxVersionForDefinition" resultType="java.lang.Integer">
select max(version) select max(version)
from t_ds_process_definition_log from t_ds_process_definition_log
where code = #{processDefinitionCode} where code = #{code}
</select> </select>
<select id="queryMaxVersionDefinitionLog" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog"> <select id="queryMaxVersionDefinitionLog" resultType="org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog">
@ -61,7 +61,7 @@
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_process_definition_log from t_ds_process_definition_log
where code = #{processDefinitionCode} order by version desc limit 1 where code = #{code} order by version desc limit 1
</select> </select>
<select id="queryProcessDefinitionVersionsPaging" <select id="queryProcessDefinitionVersionsPaging"
@ -69,14 +69,14 @@
select select
<include refid="baseSql"/> <include refid="baseSql"/>
from t_ds_process_definition_log from t_ds_process_definition_log
where code = #{processDefinitionCode} where code = #{code}
order by version desc order by version desc
</select> </select>
<delete id="deleteByProcessDefinitionCodeAndVersion"> <delete id="deleteByProcessDefinitionCodeAndVersion">
delete delete
from t_ds_process_definition_log from t_ds_process_definition_log
where code = #{processDefinitionCode} where code = #{code}
and version = #{version} and version = #{version}
</delete> </delete>
</mapper> </mapper>

8
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml

@ -35,10 +35,10 @@
create_time, update_time) create_time, update_time)
values values
<foreach collection="taskRelationList" item="relation" separator=","> <foreach collection="taskRelationList" item="relation" separator=",">
(#{relation.name},#{relation.process_definition_version},#{relation.project_code},#{relation.process_definition_code}, (#{relation.name},#{relation.processDefinitionVersion},#{relation.projectCode},#{relation.processDefinitionCode},
#{relation.pre_task_code},#{relation.pre_task_version},#{relation.post_task_code},#{relation.post_task_version}, #{relation.preTaskCode},#{relation.preTaskVersion},#{relation.postTaskCode},#{relation.postTaskVersion},
#{relation.condition_type},#{relation.condition_params},#{relation.operator},#{relation.operate_time}, #{relation.conditionType},#{relation.conditionParams},#{relation.operator},#{relation.operateTime},
#{relation.create_time},#{relation.update_time}) #{relation.createTime},#{relation.updateTime})
</foreach> </foreach>
</insert> </insert>
</mapper> </mapper>

6
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml

@ -64,9 +64,9 @@
pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, create_time, update_time) pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, create_time, update_time)
values values
<foreach collection="taskRelationList" item="relation" separator=","> <foreach collection="taskRelationList" item="relation" separator=",">
(#{relation.name},#{relation.process_definition_version},#{relation.project_code},#{relation.process_definition_code}, (#{relation.name},#{relation.processDefinitionVersion},#{relation.projectCode},#{relation.processDefinitionCode},
#{relation.pre_task_code},#{relation.pre_task_version},#{relation.post_task_code},#{relation.post_task_version}, #{relation.preTaskCode},#{relation.preTaskVersion},#{relation.postTaskCode},#{relation.postTaskVersion},
#{relation.condition_type},#{relation.condition_params},#{relation.create_time},#{relation.update_time}) #{relation.conditionType},#{relation.conditionParams},#{relation.createTime},#{relation.updateTime})
</foreach> </foreach>
</insert> </insert>
</mapper> </mapper>

2
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -2319,7 +2319,7 @@ public class ProcessService {
taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval()); taskNode.setRetryInterval(taskDefinitionLog.getFailRetryInterval());
Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams()); Map<String, Object> taskParamsMap = taskNode.taskParamsToJsonObj(taskDefinitionLog.getTaskParams());
taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT)); taskNode.setConditionResult((String) taskParamsMap.get(Constants.CONDITION_RESULT));
taskNode.setDependence((String) taskParamsMap.get(Constants.DEPENDENCE)); taskNode.setDependence(JSONUtils.toJsonString(taskParamsMap.get(Constants.DEPENDENCE)));
taskParamsMap.remove(Constants.CONDITION_RESULT); taskParamsMap.remove(Constants.CONDITION_RESULT);
taskParamsMap.remove(Constants.DEPENDENCE); taskParamsMap.remove(Constants.DEPENDENCE);
taskNode.setParams(JSONUtils.toJsonString(taskParamsMap)); taskNode.setParams(JSONUtils.toJsonString(taskParamsMap));

Loading…
Cancel
Save